linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH 1/5] bpf: Clear callee saved regs after updating REG0
@ 2022-08-08 15:53 David Vernet
  2022-08-08 15:53 ` [PATCH 2/5] bpf: Define new BPF_MAP_TYPE_USER_RINGBUF map type David Vernet
                   ` (4 more replies)
  0 siblings, 5 replies; 28+ messages in thread
From: David Vernet @ 2022-08-08 15:53 UTC (permalink / raw)
  To: bpf
  Cc: ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, joannelkoong, linux-kernel,
	Kernel-team

In the verifier, we currently reset all of the registers containing caller
saved args before updating the callee's return register (REG0). In a
follow-on patch, we will need to be able to be able to inspect the caller
saved registers when updating REG0 to determine if a dynptr that's passed
to a helper function was allocated by a helper, or allocated by a program.

This patch therefore updates check_helper_call() to clear the caller saved
regs after updating REG0.

Signed-off-by: David Vernet <void@manifault.com>
---
 kernel/bpf/verifier.c | 15 ++++++++++-----
 1 file changed, 10 insertions(+), 5 deletions(-)

diff --git a/kernel/bpf/verifier.c b/kernel/bpf/verifier.c
index 096fdac70165..938ba1536249 100644
--- a/kernel/bpf/verifier.c
+++ b/kernel/bpf/verifier.c
@@ -7348,11 +7348,9 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
 	if (err)
 		return err;
 
-	/* reset caller saved regs */
-	for (i = 0; i < CALLER_SAVED_REGS; i++) {
-		mark_reg_not_init(env, regs, caller_saved[i]);
-		check_reg_arg(env, caller_saved[i], DST_OP_NO_MARK);
-	}
+	/* reset return reg */
+	mark_reg_not_init(env, regs, BPF_REG_0);
+	check_reg_arg(env, BPF_REG_0, DST_OP_NO_MARK);
 
 	/* helper call returns 64-bit value. */
 	regs[BPF_REG_0].subreg_def = DEF_NOT_SUBREG;
@@ -7488,6 +7486,13 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
 		regs[BPF_REG_0].ref_obj_id = dynptr_id;
 	}
 
+	/* reset remaining caller saved regs */
+	BUILD_BUG_ON(caller_saved[0] != BPF_REG_0);
+	for (i = 1; i < CALLER_SAVED_REGS; i++) {
+		mark_reg_not_init(env, regs, caller_saved[i]);
+		check_reg_arg(env, caller_saved[i], DST_OP_NO_MARK);
+	}
+
 	do_refine_retval_range(regs, fn->ret_type, func_id, &meta);
 
 	err = check_map_func_compatibility(env, meta.map_ptr, func_id);
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [PATCH 2/5] bpf: Define new BPF_MAP_TYPE_USER_RINGBUF map type
  2022-08-08 15:53 [PATCH 1/5] bpf: Clear callee saved regs after updating REG0 David Vernet
@ 2022-08-08 15:53 ` David Vernet
  2022-08-11 23:22   ` Andrii Nakryiko
  2022-08-11 23:29   ` Andrii Nakryiko
  2022-08-08 15:53 ` [PATCH 3/5] bpf: Add bpf_user_ringbuf_drain() helper David Vernet
                   ` (3 subsequent siblings)
  4 siblings, 2 replies; 28+ messages in thread
From: David Vernet @ 2022-08-08 15:53 UTC (permalink / raw)
  To: bpf
  Cc: ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, joannelkoong, linux-kernel,
	Kernel-team

We want to support a ringbuf map type where samples are published from
user-space to BPF programs. BPF currently supports a kernel -> user-space
circular ringbuffer via the BPF_MAP_TYPE_RINGBUF map type. We'll need to
define a new map type for user-space -> kernel, as none of the helpers
exported for BPF_MAP_TYPE_RINGBUF will apply to a user-space producer
ringbuffer, and we'll want to add one or more helper functions that would
not apply for a kernel-producer ringbuffer.

This patch therefore adds a new BPF_MAP_TYPE_USER_RINGBUF map type
definition. The map type is useless in its current form, as there is no way
to access or use it for anything until we add more BPF helpers. A follow-on
patch will therefore add a new helper function that allows BPF programs to
run callbacks on samples that are published to the ringbuffer.

Signed-off-by: David Vernet <void@manifault.com>
---
 include/linux/bpf_types.h      |  1 +
 include/uapi/linux/bpf.h       |  1 +
 kernel/bpf/ringbuf.c           | 70 +++++++++++++++++++++++++++++-----
 kernel/bpf/verifier.c          |  3 ++
 tools/include/uapi/linux/bpf.h |  1 +
 tools/lib/bpf/libbpf.c         |  1 +
 6 files changed, 68 insertions(+), 9 deletions(-)

diff --git a/include/linux/bpf_types.h b/include/linux/bpf_types.h
index 2b9112b80171..2c6a4f2562a7 100644
--- a/include/linux/bpf_types.h
+++ b/include/linux/bpf_types.h
@@ -126,6 +126,7 @@ BPF_MAP_TYPE(BPF_MAP_TYPE_STRUCT_OPS, bpf_struct_ops_map_ops)
 #endif
 BPF_MAP_TYPE(BPF_MAP_TYPE_RINGBUF, ringbuf_map_ops)
 BPF_MAP_TYPE(BPF_MAP_TYPE_BLOOM_FILTER, bloom_filter_map_ops)
+BPF_MAP_TYPE(BPF_MAP_TYPE_USER_RINGBUF, user_ringbuf_map_ops)
 
 BPF_LINK_TYPE(BPF_LINK_TYPE_RAW_TRACEPOINT, raw_tracepoint)
 BPF_LINK_TYPE(BPF_LINK_TYPE_TRACING, tracing)
diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h
index 7bf9ba1329be..a341f877b230 100644
--- a/include/uapi/linux/bpf.h
+++ b/include/uapi/linux/bpf.h
@@ -909,6 +909,7 @@ enum bpf_map_type {
 	BPF_MAP_TYPE_INODE_STORAGE,
 	BPF_MAP_TYPE_TASK_STORAGE,
 	BPF_MAP_TYPE_BLOOM_FILTER,
+	BPF_MAP_TYPE_USER_RINGBUF,
 };
 
 /* Note that tracing related programs such as
diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
index ded4faeca192..29e2de42df15 100644
--- a/kernel/bpf/ringbuf.c
+++ b/kernel/bpf/ringbuf.c
@@ -38,12 +38,32 @@ struct bpf_ringbuf {
 	struct page **pages;
 	int nr_pages;
 	spinlock_t spinlock ____cacheline_aligned_in_smp;
-	/* Consumer and producer counters are put into separate pages to allow
-	 * mapping consumer page as r/w, but restrict producer page to r/o.
-	 * This protects producer position from being modified by user-space
-	 * application and ruining in-kernel position tracking.
+	/* Consumer and producer counters are put into separate pages to
+	 * allow each position to be mapped with different permissions.
+	 * This prevents a user-space application from modifying the
+	 * position and ruining in-kernel tracking. The permissions of the
+	 * pages depend on who is producing samples: user-space or the
+	 * kernel.
+	 *
+	 * Kernel-producer
+	 * ---------------
+	 * The producer position and data pages are mapped as r/o in
+	 * userspace. For this approach, bits in the header of samples are
+	 * used to signal to user-space, and to other producers, whether a
+	 * sample is currently being written.
+	 *
+	 * User-space producer
+	 * -------------------
+	 * Only the page containing the consumer position, and whether the
+	 * ringbuffer is currently being consumed via a 'busy' bit, are
+	 * mapped r/o in user-space. Sample headers may not be used to
+	 * communicate any information between kernel consumers, as a
+	 * user-space application could modify its contents at any time.
 	 */
-	unsigned long consumer_pos __aligned(PAGE_SIZE);
+	struct {
+		unsigned long consumer_pos;
+		atomic_t busy;
+	} __aligned(PAGE_SIZE);
 	unsigned long producer_pos __aligned(PAGE_SIZE);
 	char data[] __aligned(PAGE_SIZE);
 };
@@ -141,6 +161,7 @@ static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node)
 
 	rb->mask = data_sz - 1;
 	rb->consumer_pos = 0;
+	atomic_set(&rb->busy, 0);
 	rb->producer_pos = 0;
 
 	return rb;
@@ -224,15 +245,23 @@ static int ringbuf_map_get_next_key(struct bpf_map *map, void *key,
 	return -ENOTSUPP;
 }
 
-static int ringbuf_map_mmap(struct bpf_map *map, struct vm_area_struct *vma)
+static int ringbuf_map_mmap(struct bpf_map *map, struct vm_area_struct *vma,
+			    bool kernel_producer)
 {
 	struct bpf_ringbuf_map *rb_map;
 
 	rb_map = container_of(map, struct bpf_ringbuf_map, map);
 
 	if (vma->vm_flags & VM_WRITE) {
-		/* allow writable mapping for the consumer_pos only */
-		if (vma->vm_pgoff != 0 || vma->vm_end - vma->vm_start != PAGE_SIZE)
+		if (kernel_producer) {
+			/* allow writable mapping for the consumer_pos only */
+			if (vma->vm_pgoff != 0 || vma->vm_end - vma->vm_start != PAGE_SIZE)
+				return -EPERM;
+		/* For user ringbufs, disallow writable mappings to the
+		 * consumer pointer, and allow writable mappings to both the
+		 * producer position, and the ring buffer data itself.
+		 */
+		} else if (vma->vm_pgoff == 0)
 			return -EPERM;
 	} else {
 		vma->vm_flags &= ~VM_MAYWRITE;
@@ -242,6 +271,16 @@ static int ringbuf_map_mmap(struct bpf_map *map, struct vm_area_struct *vma)
 				   vma->vm_pgoff + RINGBUF_PGOFF);
 }
 
+static int ringbuf_map_mmap_kern(struct bpf_map *map, struct vm_area_struct *vma)
+{
+	return ringbuf_map_mmap(map, vma, true);
+}
+
+static int ringbuf_map_mmap_user(struct bpf_map *map, struct vm_area_struct *vma)
+{
+	return ringbuf_map_mmap(map, vma, false);
+}
+
 static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb)
 {
 	unsigned long cons_pos, prod_pos;
@@ -269,7 +308,7 @@ const struct bpf_map_ops ringbuf_map_ops = {
 	.map_meta_equal = bpf_map_meta_equal,
 	.map_alloc = ringbuf_map_alloc,
 	.map_free = ringbuf_map_free,
-	.map_mmap = ringbuf_map_mmap,
+	.map_mmap = ringbuf_map_mmap_kern,
 	.map_poll = ringbuf_map_poll,
 	.map_lookup_elem = ringbuf_map_lookup_elem,
 	.map_update_elem = ringbuf_map_update_elem,
@@ -278,6 +317,19 @@ const struct bpf_map_ops ringbuf_map_ops = {
 	.map_btf_id = &ringbuf_map_btf_ids[0],
 };
 
+BTF_ID_LIST_SINGLE(user_ringbuf_map_btf_ids, struct, bpf_ringbuf_map)
+const struct bpf_map_ops user_ringbuf_map_ops = {
+	.map_meta_equal = bpf_map_meta_equal,
+	.map_alloc = ringbuf_map_alloc,
+	.map_free = ringbuf_map_free,
+	.map_mmap = ringbuf_map_mmap_user,
+	.map_lookup_elem = ringbuf_map_lookup_elem,
+	.map_update_elem = ringbuf_map_update_elem,
+	.map_delete_elem = ringbuf_map_delete_elem,
+	.map_get_next_key = ringbuf_map_get_next_key,
+	.map_btf_id = &user_ringbuf_map_btf_ids[0],
+};
+
 /* Given pointer to ring buffer record metadata and struct bpf_ringbuf itself,
  * calculate offset from record metadata to ring buffer in pages, rounded
  * down. This page offset is stored as part of record metadata and allows to
diff --git a/kernel/bpf/verifier.c b/kernel/bpf/verifier.c
index 938ba1536249..4a9562c62af0 100644
--- a/kernel/bpf/verifier.c
+++ b/kernel/bpf/verifier.c
@@ -6196,6 +6196,8 @@ static int check_map_func_compatibility(struct bpf_verifier_env *env,
 		    func_id != BPF_FUNC_ringbuf_discard_dynptr)
 			goto error;
 		break;
+	case BPF_MAP_TYPE_USER_RINGBUF:
+		goto error;
 	case BPF_MAP_TYPE_STACK_TRACE:
 		if (func_id != BPF_FUNC_get_stackid)
 			goto error;
@@ -12681,6 +12683,7 @@ static int check_map_prog_compatibility(struct bpf_verifier_env *env,
 			}
 			break;
 		case BPF_MAP_TYPE_RINGBUF:
+		case BPF_MAP_TYPE_USER_RINGBUF:
 		case BPF_MAP_TYPE_INODE_STORAGE:
 		case BPF_MAP_TYPE_SK_STORAGE:
 		case BPF_MAP_TYPE_TASK_STORAGE:
diff --git a/tools/include/uapi/linux/bpf.h b/tools/include/uapi/linux/bpf.h
index 59a217ca2dfd..ce0ce713faf9 100644
--- a/tools/include/uapi/linux/bpf.h
+++ b/tools/include/uapi/linux/bpf.h
@@ -909,6 +909,7 @@ enum bpf_map_type {
 	BPF_MAP_TYPE_INODE_STORAGE,
 	BPF_MAP_TYPE_TASK_STORAGE,
 	BPF_MAP_TYPE_BLOOM_FILTER,
+	BPF_MAP_TYPE_USER_RINGBUF,
 };
 
 /* Note that tracing related programs such as
diff --git a/tools/lib/bpf/libbpf.c b/tools/lib/bpf/libbpf.c
index 77e3797cf75a..9c1f2d09f44e 100644
--- a/tools/lib/bpf/libbpf.c
+++ b/tools/lib/bpf/libbpf.c
@@ -163,6 +163,7 @@ static const char * const map_type_name[] = {
 	[BPF_MAP_TYPE_INODE_STORAGE]		= "inode_storage",
 	[BPF_MAP_TYPE_TASK_STORAGE]		= "task_storage",
 	[BPF_MAP_TYPE_BLOOM_FILTER]		= "bloom_filter",
+	[BPF_MAP_TYPE_USER_RINGBUF]             = "user_ringbuf",
 };
 
 static const char * const prog_type_name[] = {
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [PATCH 3/5] bpf: Add bpf_user_ringbuf_drain() helper
  2022-08-08 15:53 [PATCH 1/5] bpf: Clear callee saved regs after updating REG0 David Vernet
  2022-08-08 15:53 ` [PATCH 2/5] bpf: Define new BPF_MAP_TYPE_USER_RINGBUF map type David Vernet
@ 2022-08-08 15:53 ` David Vernet
  2022-08-08 21:55   ` kernel test robot
  2022-08-11 23:22   ` Andrii Nakryiko
  2022-08-08 15:53 ` [PATCH 4/5] bpf: Add libbpf logic for user-space ring buffer David Vernet
                   ` (2 subsequent siblings)
  4 siblings, 2 replies; 28+ messages in thread
From: David Vernet @ 2022-08-08 15:53 UTC (permalink / raw)
  To: bpf
  Cc: ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, joannelkoong, linux-kernel,
	Kernel-team

Now that we have a BPF_MAP_TYPE_USER_RINGBUF map type, we need to add a
helper function that allows BPF programs to drain samples from the ring
buffer, and invoke a callback for each. This patch adds a new
bpf_user_ringbuf_drain() helper that provides this abstraction.

In order to support this, we needed to also add a new PTR_TO_DYNPTR
register type to reflect a dynptr that was allocated by a helper function
and passed to a BPF program. The verifier currently only supports
PTR_TO_DYNPTR registers that are also DYNPTR_TYPE_LOCAL and MEM_ALLOC.

Signed-off-by: David Vernet <void@manifault.com>
---
 include/linux/bpf.h            |   6 +-
 include/uapi/linux/bpf.h       |   8 ++
 kernel/bpf/helpers.c           |   2 +
 kernel/bpf/ringbuf.c           | 157 +++++++++++++++++++++++++++++++--
 kernel/bpf/verifier.c          |  57 +++++++++++-
 tools/include/uapi/linux/bpf.h |   8 ++
 6 files changed, 228 insertions(+), 10 deletions(-)

diff --git a/include/linux/bpf.h b/include/linux/bpf.h
index 20c26aed7896..4d0d5f2a3ac8 100644
--- a/include/linux/bpf.h
+++ b/include/linux/bpf.h
@@ -401,7 +401,7 @@ enum bpf_type_flag {
 	/* DYNPTR points to memory local to the bpf program. */
 	DYNPTR_TYPE_LOCAL	= BIT(8 + BPF_BASE_TYPE_BITS),
 
-	/* DYNPTR points to a ringbuf record. */
+	/* DYNPTR points to a kernel-produced ringbuf record. */
 	DYNPTR_TYPE_RINGBUF	= BIT(9 + BPF_BASE_TYPE_BITS),
 
 	/* Size is known at compile time. */
@@ -606,6 +606,7 @@ enum bpf_reg_type {
 	PTR_TO_MEM,		 /* reg points to valid memory region */
 	PTR_TO_BUF,		 /* reg points to a read/write buffer */
 	PTR_TO_FUNC,		 /* reg points to a bpf program function */
+	PTR_TO_DYNPTR,		 /* reg points to a dynptr */
 	__BPF_REG_TYPE_MAX,
 
 	/* Extended reg_types. */
@@ -2410,6 +2411,7 @@ extern const struct bpf_func_proto bpf_loop_proto;
 extern const struct bpf_func_proto bpf_copy_from_user_task_proto;
 extern const struct bpf_func_proto bpf_set_retval_proto;
 extern const struct bpf_func_proto bpf_get_retval_proto;
+extern const struct bpf_func_proto bpf_user_ringbuf_drain_proto;
 
 const struct bpf_func_proto *tracing_prog_func_proto(
   enum bpf_func_id func_id, const struct bpf_prog *prog);
@@ -2554,7 +2556,7 @@ enum bpf_dynptr_type {
 	BPF_DYNPTR_TYPE_INVALID,
 	/* Points to memory that is local to the bpf program */
 	BPF_DYNPTR_TYPE_LOCAL,
-	/* Underlying data is a ringbuf record */
+	/* Underlying data is a kernel-produced ringbuf record */
 	BPF_DYNPTR_TYPE_RINGBUF,
 };
 
diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h
index a341f877b230..ca125648d7fd 100644
--- a/include/uapi/linux/bpf.h
+++ b/include/uapi/linux/bpf.h
@@ -5332,6 +5332,13 @@ union bpf_attr {
  *		**-EACCES** if the SYN cookie is not valid.
  *
  *		**-EPROTONOSUPPORT** if CONFIG_IPV6 is not builtin.
+ *
+ * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags)
+ *	Description
+ *		Drain samples from the specified user ringbuffer, and invoke the
+ *		provided callback for each such sample.
+ *	Return
+ *		An error if a sample could not be drained.
  */
 #define __BPF_FUNC_MAPPER(FN)		\
 	FN(unspec),			\
@@ -5542,6 +5549,7 @@ union bpf_attr {
 	FN(tcp_raw_gen_syncookie_ipv6),	\
 	FN(tcp_raw_check_syncookie_ipv4),	\
 	FN(tcp_raw_check_syncookie_ipv6),	\
+	FN(user_ringbuf_drain),	        \
 	/* */
 
 /* integer value in 'imm' field of BPF_CALL instruction selects which helper
diff --git a/kernel/bpf/helpers.c b/kernel/bpf/helpers.c
index 1f961f9982d2..b8097876014c 100644
--- a/kernel/bpf/helpers.c
+++ b/kernel/bpf/helpers.c
@@ -1647,6 +1647,8 @@ bpf_base_func_proto(enum bpf_func_id func_id)
 		return &bpf_dynptr_write_proto;
 	case BPF_FUNC_dynptr_data:
 		return &bpf_dynptr_data_proto;
+	case BPF_FUNC_user_ringbuf_drain:
+		return &bpf_user_ringbuf_drain_proto;
 	default:
 		break;
 	}
diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
index 29e2de42df15..fc589fc8eb7c 100644
--- a/kernel/bpf/ringbuf.c
+++ b/kernel/bpf/ringbuf.c
@@ -291,16 +291,33 @@ static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb)
 }
 
 static __poll_t ringbuf_map_poll(struct bpf_map *map, struct file *filp,
-				 struct poll_table_struct *pts)
+				 struct poll_table_struct *pts,
+				 bool kernel_producer)
 {
 	struct bpf_ringbuf_map *rb_map;
+	bool buffer_empty;
 
 	rb_map = container_of(map, struct bpf_ringbuf_map, map);
 	poll_wait(filp, &rb_map->rb->waitq, pts);
 
-	if (ringbuf_avail_data_sz(rb_map->rb))
-		return EPOLLIN | EPOLLRDNORM;
-	return 0;
+	buffer_empty = !ringbuf_avail_data_sz(rb_map->rb);
+
+	if (kernel_producer)
+		return buffer_empty ? 0 : EPOLLIN | EPOLLRDNORM;
+	else
+		return buffer_empty ? EPOLLOUT | EPOLLWRNORM : 0;
+}
+
+static __poll_t ringbuf_map_poll_kern(struct bpf_map *map, struct file *filp,
+				      struct poll_table_struct *pts)
+{
+	return ringbuf_map_poll(map, filp, pts, true);
+}
+
+static __poll_t ringbuf_map_poll_user(struct bpf_map *map, struct file *filp,
+				      struct poll_table_struct *pts)
+{
+	return ringbuf_map_poll(map, filp, pts, false);
 }
 
 BTF_ID_LIST_SINGLE(ringbuf_map_btf_ids, struct, bpf_ringbuf_map)
@@ -309,7 +326,7 @@ const struct bpf_map_ops ringbuf_map_ops = {
 	.map_alloc = ringbuf_map_alloc,
 	.map_free = ringbuf_map_free,
 	.map_mmap = ringbuf_map_mmap_kern,
-	.map_poll = ringbuf_map_poll,
+	.map_poll = ringbuf_map_poll_kern,
 	.map_lookup_elem = ringbuf_map_lookup_elem,
 	.map_update_elem = ringbuf_map_update_elem,
 	.map_delete_elem = ringbuf_map_delete_elem,
@@ -323,6 +340,7 @@ const struct bpf_map_ops user_ringbuf_map_ops = {
 	.map_alloc = ringbuf_map_alloc,
 	.map_free = ringbuf_map_free,
 	.map_mmap = ringbuf_map_mmap_user,
+	.map_poll = ringbuf_map_poll_user,
 	.map_lookup_elem = ringbuf_map_lookup_elem,
 	.map_update_elem = ringbuf_map_update_elem,
 	.map_delete_elem = ringbuf_map_delete_elem,
@@ -605,3 +623,132 @@ const struct bpf_func_proto bpf_ringbuf_discard_dynptr_proto = {
 	.arg1_type	= ARG_PTR_TO_DYNPTR | DYNPTR_TYPE_RINGBUF | OBJ_RELEASE,
 	.arg2_type	= ARG_ANYTHING,
 };
+
+static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample,
+				   u32 *size)
+{
+	unsigned long cons_pos, prod_pos;
+	u32 sample_len, total_len;
+	u32 *hdr;
+	int err;
+	int busy = 0;
+
+	/* If another consumer is already consuming a sample, wait for them to
+	 * finish.
+	 */
+	if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
+		return -EBUSY;
+
+	/* Synchronizes with smp_store_release() in user-space. */
+	prod_pos = smp_load_acquire(&rb->producer_pos);
+	/* Synchronizes with smp_store_release() in
+	 * __bpf_user_ringbuf_sample_release().
+	 */
+	cons_pos = smp_load_acquire(&rb->consumer_pos);
+	if (cons_pos >= prod_pos) {
+		atomic_set(&rb->busy, 0);
+		return -ENODATA;
+	}
+
+	hdr = (u32 *)((uintptr_t)rb->data + (cons_pos & rb->mask));
+	sample_len = *hdr;
+
+	/* Check that the sample can fit into a dynptr. */
+	err = bpf_dynptr_check_size(sample_len);
+	if (err) {
+		atomic_set(&rb->busy, 0);
+		return err;
+	}
+
+	/* Check that the sample fits within the region advertised by the
+	 * consumer position.
+	 */
+	total_len = sample_len + BPF_RINGBUF_HDR_SZ;
+	if (total_len > prod_pos - cons_pos) {
+		atomic_set(&rb->busy, 0);
+		return -E2BIG;
+	}
+
+	/* Check that the sample fits within the data region of the ring buffer.
+	 */
+	if (total_len > rb->mask + 1) {
+		atomic_set(&rb->busy, 0);
+		return -E2BIG;
+	}
+
+	/* consumer_pos is updated when the sample is released.
+	 */
+
+	*sample = (void *)((uintptr_t)rb->data +
+			   ((cons_pos + BPF_RINGBUF_HDR_SZ) & rb->mask));
+	*size = sample_len;
+
+	return 0;
+}
+
+static void
+__bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size,
+				  u64 flags)
+{
+
+
+	/* To release the ringbuffer, just increment the producer position to
+	 * signal that a new sample can be consumed. The busy bit is cleared by
+	 * userspace when posting a new sample to the ringbuffer.
+	 */
+	smp_store_release(&rb->consumer_pos, rb->consumer_pos + size +
+			  BPF_RINGBUF_HDR_SZ);
+
+	if (flags & BPF_RB_FORCE_WAKEUP || !(flags & BPF_RB_NO_WAKEUP))
+		irq_work_queue(&rb->work);
+
+	atomic_set(&rb->busy, 0);
+}
+
+BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map,
+	   void *, callback_fn, void *, callback_ctx, u64, flags)
+{
+	struct bpf_ringbuf *rb;
+	long num_samples = 0, ret = 0;
+	int err;
+	bpf_callback_t callback = (bpf_callback_t)callback_fn;
+	u64 wakeup_flags = BPF_RB_NO_WAKEUP | BPF_RB_FORCE_WAKEUP;
+
+	if (unlikely(flags & ~wakeup_flags))
+		return -EINVAL;
+
+	/* The two wakeup flags are mutually exclusive. */
+	if (unlikely((flags & wakeup_flags) == wakeup_flags))
+		return -EINVAL;
+
+	rb = container_of(map, struct bpf_ringbuf_map, map)->rb;
+	do {
+		u32 size;
+		void *sample;
+
+		err = __bpf_user_ringbuf_poll(rb, &sample, &size);
+
+		if (!err) {
+			struct bpf_dynptr_kern dynptr;
+
+			bpf_dynptr_init(&dynptr, sample, BPF_DYNPTR_TYPE_LOCAL,
+					0, size);
+			ret = callback((u64)&dynptr,
+					(u64)(uintptr_t)callback_ctx, 0, 0, 0);
+
+			__bpf_user_ringbuf_sample_release(rb, size, flags);
+			num_samples++;
+		}
+	} while (err == 0 && num_samples < 4096 && ret == 0);
+
+	return num_samples;
+}
+
+const struct bpf_func_proto bpf_user_ringbuf_drain_proto = {
+	.func		= bpf_user_ringbuf_drain,
+	.ret_type	= RET_INTEGER,
+	.arg1_type	= ARG_CONST_MAP_PTR,
+	.arg2_type	= ARG_PTR_TO_FUNC,
+	.arg3_type	= ARG_PTR_TO_STACK_OR_NULL,
+	.arg4_type	= ARG_ANYTHING,
+};
diff --git a/kernel/bpf/verifier.c b/kernel/bpf/verifier.c
index 4a9562c62af0..99dfdc3ed187 100644
--- a/kernel/bpf/verifier.c
+++ b/kernel/bpf/verifier.c
@@ -555,6 +555,7 @@ static const char *reg_type_str(struct bpf_verifier_env *env,
 		[PTR_TO_BUF]		= "buf",
 		[PTR_TO_FUNC]		= "func",
 		[PTR_TO_MAP_KEY]	= "map_key",
+		[PTR_TO_DYNPTR]		= "dynptr_ptr",
 	};
 
 	if (type & PTR_MAYBE_NULL) {
@@ -5656,6 +5657,12 @@ static const struct bpf_reg_types stack_ptr_types = { .types = { PTR_TO_STACK }
 static const struct bpf_reg_types const_str_ptr_types = { .types = { PTR_TO_MAP_VALUE } };
 static const struct bpf_reg_types timer_types = { .types = { PTR_TO_MAP_VALUE } };
 static const struct bpf_reg_types kptr_types = { .types = { PTR_TO_MAP_VALUE } };
+static const struct bpf_reg_types dynptr_types = {
+	.types = {
+		PTR_TO_STACK,
+		PTR_TO_DYNPTR | MEM_ALLOC | DYNPTR_TYPE_LOCAL,
+	}
+};
 
 static const struct bpf_reg_types *compatible_reg_types[__BPF_ARG_TYPE_MAX] = {
 	[ARG_PTR_TO_MAP_KEY]		= &map_key_value_types,
@@ -5682,7 +5689,7 @@ static const struct bpf_reg_types *compatible_reg_types[__BPF_ARG_TYPE_MAX] = {
 	[ARG_PTR_TO_CONST_STR]		= &const_str_ptr_types,
 	[ARG_PTR_TO_TIMER]		= &timer_types,
 	[ARG_PTR_TO_KPTR]		= &kptr_types,
-	[ARG_PTR_TO_DYNPTR]		= &stack_ptr_types,
+	[ARG_PTR_TO_DYNPTR]		= &dynptr_types,
 };
 
 static int check_reg_type(struct bpf_verifier_env *env, u32 regno,
@@ -6025,6 +6032,13 @@ static int check_func_arg(struct bpf_verifier_env *env, u32 arg,
 		err = check_mem_size_reg(env, reg, regno, true, meta);
 		break;
 	case ARG_PTR_TO_DYNPTR:
+		/* We only need to check for initialized / uninitialized helper
+		 * dynptr args if the dynptr is not MEM_ALLOC, as the assumption
+		 * is that if it is, that a helper function initialized the
+		 * dynptr on behalf of the BPF program.
+		 */
+		if (reg->type & MEM_ALLOC)
+			break;
 		if (arg_type & MEM_UNINIT) {
 			if (!is_dynptr_reg_valid_uninit(env, reg)) {
 				verbose(env, "Dynptr has to be an uninitialized dynptr\n");
@@ -6197,7 +6211,9 @@ static int check_map_func_compatibility(struct bpf_verifier_env *env,
 			goto error;
 		break;
 	case BPF_MAP_TYPE_USER_RINGBUF:
-		goto error;
+		if (func_id != BPF_FUNC_user_ringbuf_drain)
+			goto error;
+		break;
 	case BPF_MAP_TYPE_STACK_TRACE:
 		if (func_id != BPF_FUNC_get_stackid)
 			goto error;
@@ -6317,6 +6333,10 @@ static int check_map_func_compatibility(struct bpf_verifier_env *env,
 		if (map->map_type != BPF_MAP_TYPE_RINGBUF)
 			goto error;
 		break;
+	case BPF_FUNC_user_ringbuf_drain:
+		if (map->map_type != BPF_MAP_TYPE_USER_RINGBUF)
+			goto error;
+		break;
 	case BPF_FUNC_get_stackid:
 		if (map->map_type != BPF_MAP_TYPE_STACK_TRACE)
 			goto error;
@@ -6901,6 +6921,29 @@ static int set_find_vma_callback_state(struct bpf_verifier_env *env,
 	return 0;
 }
 
+static int set_user_ringbuf_callback_state(struct bpf_verifier_env *env,
+					   struct bpf_func_state *caller,
+					   struct bpf_func_state *callee,
+					   int insn_idx)
+{
+	/* bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void
+	 *			  callback_ctx, u64 flags);
+	 * callback_fn(struct bpf_dynptr_t* dynptr, void *callback_ctx);
+	 */
+	__mark_reg_not_init(env, &callee->regs[BPF_REG_0]);
+	callee->regs[BPF_REG_1].type = PTR_TO_DYNPTR | DYNPTR_TYPE_LOCAL | MEM_ALLOC;
+	__mark_reg_known_zero(&callee->regs[BPF_REG_1]);
+	callee->regs[BPF_REG_2] = caller->regs[BPF_REG_3];
+
+	/* unused */
+	__mark_reg_not_init(env, &callee->regs[BPF_REG_3]);
+	__mark_reg_not_init(env, &callee->regs[BPF_REG_4]);
+	__mark_reg_not_init(env, &callee->regs[BPF_REG_5]);
+
+	callee->in_callback_fn = true;
+	return 0;
+}
+
 static int prepare_func_exit(struct bpf_verifier_env *env, int *insn_idx)
 {
 	struct bpf_verifier_state *state = env->cur_state;
@@ -7345,6 +7388,10 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
 			}
 		}
 		break;
+	case BPF_FUNC_user_ringbuf_drain:
+		err = __check_func_call(env, insn, insn_idx_p, meta.subprogno,
+					set_user_ringbuf_callback_state);
+		break;
 	}
 
 	if (err)
@@ -7477,11 +7524,15 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
 		/* Find the id of the dynptr we're acquiring a reference to */
 		for (i = 0; i < MAX_BPF_FUNC_REG_ARGS; i++) {
 			if (arg_type_is_dynptr(fn->arg_type[i])) {
+				struct bpf_reg_state *reg = &regs[BPF_REG_1 + i];
+
 				if (dynptr_id) {
 					verbose(env, "verifier internal error: multiple dynptr args in func\n");
 					return -EFAULT;
 				}
-				dynptr_id = stack_slot_get_id(env, &regs[BPF_REG_1 + i]);
+
+				if (!(reg->type & MEM_ALLOC))
+					dynptr_id = stack_slot_get_id(env, reg);
 			}
 		}
 		/* For release_reference() */
diff --git a/tools/include/uapi/linux/bpf.h b/tools/include/uapi/linux/bpf.h
index ce0ce713faf9..e5e4ab12177c 100644
--- a/tools/include/uapi/linux/bpf.h
+++ b/tools/include/uapi/linux/bpf.h
@@ -5332,6 +5332,13 @@ union bpf_attr {
  *		**-EACCES** if the SYN cookie is not valid.
  *
  *		**-EPROTONOSUPPORT** if CONFIG_IPV6 is not builtin.
+ *
+ * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags)
+ *	Description
+ *		Drain samples from the specified user ringbuffer, and invoke the
+ *		provided callback for each such sample.
+ *	Return
+ *		An error if a sample could not be drained.
  */
 #define __BPF_FUNC_MAPPER(FN)		\
 	FN(unspec),			\
@@ -5542,6 +5549,7 @@ union bpf_attr {
 	FN(tcp_raw_gen_syncookie_ipv6),	\
 	FN(tcp_raw_check_syncookie_ipv4),	\
 	FN(tcp_raw_check_syncookie_ipv6),	\
+	FN(bpf_user_ringbuf_drain),	\
 	/* */
 
 /* integer value in 'imm' field of BPF_CALL instruction selects which helper
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [PATCH 4/5] bpf: Add libbpf logic for user-space ring buffer
  2022-08-08 15:53 [PATCH 1/5] bpf: Clear callee saved regs after updating REG0 David Vernet
  2022-08-08 15:53 ` [PATCH 2/5] bpf: Define new BPF_MAP_TYPE_USER_RINGBUF map type David Vernet
  2022-08-08 15:53 ` [PATCH 3/5] bpf: Add bpf_user_ringbuf_drain() helper David Vernet
@ 2022-08-08 15:53 ` David Vernet
  2022-08-11 23:39   ` Andrii Nakryiko
  2022-08-08 15:53 ` [PATCH 5/5] selftests/bpf: Add selftests validating the user ringbuf David Vernet
  2022-08-08 18:14 ` [PATCH 1/5] bpf: Clear callee saved regs after updating REG0 Joanne Koong
  4 siblings, 1 reply; 28+ messages in thread
From: David Vernet @ 2022-08-08 15:53 UTC (permalink / raw)
  To: bpf
  Cc: ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, joannelkoong, linux-kernel,
	Kernel-team

Now that all of the logic is in place in the kernel to support user-space
produced ringbuffers, we can add the user-space logic to libbpf.

Signed-off-by: David Vernet <void@manifault.com>
---
 kernel/bpf/ringbuf.c          |   7 +-
 tools/lib/bpf/libbpf.c        |  10 +-
 tools/lib/bpf/libbpf.h        |  19 +++
 tools/lib/bpf/libbpf.map      |   6 +
 tools/lib/bpf/libbpf_probes.c |   1 +
 tools/lib/bpf/ringbuf.c       | 216 ++++++++++++++++++++++++++++++++++
 6 files changed, 256 insertions(+), 3 deletions(-)

diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
index fc589fc8eb7c..a10558e79ec8 100644
--- a/kernel/bpf/ringbuf.c
+++ b/kernel/bpf/ringbuf.c
@@ -639,7 +639,9 @@ static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample,
 	if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
 		return -EBUSY;
 
-	/* Synchronizes with smp_store_release() in user-space. */
+	/* Synchronizes with smp_store_release() in __ring_buffer_user__commit()
+	 * in user-space.
+	 */
 	prod_pos = smp_load_acquire(&rb->producer_pos);
 	/* Synchronizes with smp_store_release() in
 	 * __bpf_user_ringbuf_sample_release().
@@ -695,6 +697,9 @@ __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size,
 	/* To release the ringbuffer, just increment the producer position to
 	 * signal that a new sample can be consumed. The busy bit is cleared by
 	 * userspace when posting a new sample to the ringbuffer.
+	 *
+	 * Synchronizes with smp_load_acquire() in ring_buffer_user__reserve()
+	 * in user-space.
 	 */
 	smp_store_release(&rb->consumer_pos, rb->consumer_pos + size +
 			  BPF_RINGBUF_HDR_SZ);
diff --git a/tools/lib/bpf/libbpf.c b/tools/lib/bpf/libbpf.c
index 9c1f2d09f44e..f7fe09dce643 100644
--- a/tools/lib/bpf/libbpf.c
+++ b/tools/lib/bpf/libbpf.c
@@ -2367,6 +2367,12 @@ static size_t adjust_ringbuf_sz(size_t sz)
 	return sz;
 }
 
+static bool map_is_ringbuf(const struct bpf_map *map)
+{
+	return map->def.type == BPF_MAP_TYPE_RINGBUF ||
+	       map->def.type == BPF_MAP_TYPE_USER_RINGBUF;
+}
+
 static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def)
 {
 	map->def.type = def->map_type;
@@ -2381,7 +2387,7 @@ static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def
 	map->btf_value_type_id = def->value_type_id;
 
 	/* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */
-	if (map->def.type == BPF_MAP_TYPE_RINGBUF)
+	if (map_is_ringbuf(map))
 		map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries);
 
 	if (def->parts & MAP_DEF_MAP_TYPE)
@@ -4363,7 +4369,7 @@ int bpf_map__set_max_entries(struct bpf_map *map, __u32 max_entries)
 	map->def.max_entries = max_entries;
 
 	/* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */
-	if (map->def.type == BPF_MAP_TYPE_RINGBUF)
+	if (map_is_ringbuf(map))
 		map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries);
 
 	return 0;
diff --git a/tools/lib/bpf/libbpf.h b/tools/lib/bpf/libbpf.h
index 61493c4cddac..6d1d0539b08d 100644
--- a/tools/lib/bpf/libbpf.h
+++ b/tools/lib/bpf/libbpf.h
@@ -1009,6 +1009,7 @@ LIBBPF_API int bpf_tc_query(const struct bpf_tc_hook *hook,
 
 /* Ring buffer APIs */
 struct ring_buffer;
+struct ring_buffer_user;
 
 typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size);
 
@@ -1028,6 +1029,24 @@ LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms);
 LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb);
 LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb);
 
+struct ring_buffer_user_opts {
+	size_t sz; /* size of this struct, for forward/backward compatibility */
+};
+
+#define ring_buffer_user_opts__last_field sz
+
+LIBBPF_API struct ring_buffer_user *
+ring_buffer_user__new(int map_fd, const struct ring_buffer_user_opts *opts);
+LIBBPF_API void *ring_buffer_user__reserve(struct ring_buffer_user *rb,
+					   uint32_t size);
+LIBBPF_API void *ring_buffer_user__poll(struct ring_buffer_user *rb,
+					uint32_t size, int timeout_ms);
+LIBBPF_API void ring_buffer_user__submit(struct ring_buffer_user *rb,
+					 void *sample);
+LIBBPF_API void ring_buffer_user__discard(struct ring_buffer_user *rb,
+					  void *sample);
+LIBBPF_API void ring_buffer_user__free(struct ring_buffer_user *rb);
+
 /* Perf buffer APIs */
 struct perf_buffer;
 
diff --git a/tools/lib/bpf/libbpf.map b/tools/lib/bpf/libbpf.map
index 119e6e1ea7f1..8db11040df1b 100644
--- a/tools/lib/bpf/libbpf.map
+++ b/tools/lib/bpf/libbpf.map
@@ -365,4 +365,10 @@ LIBBPF_1.0.0 {
 		libbpf_bpf_map_type_str;
 		libbpf_bpf_prog_type_str;
 		perf_buffer__buffer;
+		ring_buffer_user__discard;
+		ring_buffer_user__free;
+		ring_buffer_user__new;
+		ring_buffer_user__poll;
+		ring_buffer_user__reserve;
+		ring_buffer_user__submit;
 };
diff --git a/tools/lib/bpf/libbpf_probes.c b/tools/lib/bpf/libbpf_probes.c
index 6d495656f554..f3a8e8e74eb8 100644
--- a/tools/lib/bpf/libbpf_probes.c
+++ b/tools/lib/bpf/libbpf_probes.c
@@ -231,6 +231,7 @@ static int probe_map_create(enum bpf_map_type map_type)
 			return btf_fd;
 		break;
 	case BPF_MAP_TYPE_RINGBUF:
+	case BPF_MAP_TYPE_USER_RINGBUF:
 		key_size = 0;
 		value_size = 0;
 		max_entries = 4096;
diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c
index 8bc117bcc7bc..86e3c11d8486 100644
--- a/tools/lib/bpf/ringbuf.c
+++ b/tools/lib/bpf/ringbuf.c
@@ -39,6 +39,17 @@ struct ring_buffer {
 	int ring_cnt;
 };
 
+struct ring_buffer_user {
+	struct epoll_event event;
+	unsigned long *consumer_pos;
+	unsigned long *producer_pos;
+	void *data;
+	unsigned long mask;
+	size_t page_size;
+	int map_fd;
+	int epoll_fd;
+};
+
 static void ringbuf_unmap_ring(struct ring_buffer *rb, struct ring *r)
 {
 	if (r->consumer_pos) {
@@ -300,3 +311,208 @@ int ring_buffer__epoll_fd(const struct ring_buffer *rb)
 {
 	return rb->epoll_fd;
 }
+
+static void __user_ringbuf_unmap_ring(struct ring_buffer_user *rb)
+{
+	if (rb->consumer_pos) {
+		munmap(rb->consumer_pos, rb->page_size);
+		rb->consumer_pos = NULL;
+	}
+	if (rb->producer_pos) {
+		munmap(rb->producer_pos, rb->page_size + 2 * (rb->mask + 1));
+		rb->producer_pos = NULL;
+	}
+}
+
+void ring_buffer_user__free(struct ring_buffer_user *rb)
+{
+	if (!rb)
+		return;
+
+	__user_ringbuf_unmap_ring(rb);
+
+	if (rb->epoll_fd >= 0)
+		close(rb->epoll_fd);
+
+	free(rb);
+}
+
+static int __ring_buffer_user_map(struct ring_buffer_user *rb, int map_fd)
+{
+
+	struct bpf_map_info info;
+	__u32 len = sizeof(info);
+	void *tmp;
+	struct epoll_event *rb_epoll;
+	int err;
+
+	memset(&info, 0, sizeof(info));
+
+	err = bpf_obj_get_info_by_fd(map_fd, &info, &len);
+	if (err) {
+		err = -errno;
+		pr_warn("user ringbuf: failed to get map info for fd=%d: %d\n",
+			map_fd, err);
+		return libbpf_err(err);
+	}
+
+	if (info.type != BPF_MAP_TYPE_USER_RINGBUF) {
+		pr_warn("user ringbuf: map fd=%d is not BPF_MAP_TYPE_USER_RINGBUF\n",
+			map_fd);
+		return libbpf_err(-EINVAL);
+	}
+
+	rb->map_fd = map_fd;
+	rb->mask = info.max_entries - 1;
+
+	/* Map read-only consumer page */
+	tmp = mmap(NULL, rb->page_size, PROT_READ, MAP_SHARED, map_fd, 0);
+	if (tmp == MAP_FAILED) {
+		err = -errno;
+		pr_warn("user ringbuf: failed to mmap consumer page for map fd=%d: %d\n",
+			map_fd, err);
+		return libbpf_err(err);
+	}
+	rb->consumer_pos = tmp;
+
+	/* Map read-write the producer page and data pages. We map the data
+	 * region as twice the total size of the ringbuffer to allow the simple
+	 * reading and writing of samples that wrap around the end of the
+	 * buffer.  See the kernel implementation for details.
+	 */
+	tmp = mmap(NULL, rb->page_size + 2 * info.max_entries,
+		   PROT_READ | PROT_WRITE, MAP_SHARED, map_fd, rb->page_size);
+	if (tmp == MAP_FAILED) {
+		err = -errno;
+		pr_warn("user ringbuf: failed to mmap data pages for map fd=%d: %d\n",
+			map_fd, err);
+		return libbpf_err(err);
+	}
+
+	rb->producer_pos = tmp;
+	rb->data = tmp + rb->page_size;
+
+	rb_epoll = &rb->event;
+	rb_epoll->events = EPOLLOUT;
+	if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, rb_epoll) < 0) {
+		err = -errno;
+		pr_warn("user ringbuf: failed to epoll add map fd=%d: %d\n",
+			map_fd, err);
+		return libbpf_err(err);
+	}
+
+	return 0;
+}
+
+struct ring_buffer_user *
+ring_buffer_user__new(int map_fd, const struct ring_buffer_user_opts *opts)
+{
+	struct ring_buffer_user *rb;
+	int err;
+
+	if (!OPTS_VALID(opts, ring_buffer_opts))
+		return errno = EINVAL, NULL;
+
+	rb = calloc(1, sizeof(*rb));
+	if (!rb)
+		return errno = ENOMEM, NULL;
+
+	rb->page_size = getpagesize();
+
+	rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
+	if (rb->epoll_fd < 0) {
+		err = -errno;
+		pr_warn("user ringbuf: failed to create epoll instance: %d\n",
+			err);
+		goto err_out;
+	}
+
+	err = __ring_buffer_user_map(rb, map_fd);
+	if (err)
+		goto err_out;
+
+	return rb;
+
+err_out:
+	ring_buffer_user__free(rb);
+	return errno = -err, NULL;
+}
+
+static void __ring_buffer_user__commit(struct ring_buffer_user *rb)
+{
+	uint32_t *hdr;
+	uint32_t total_len;
+	unsigned long prod_pos;
+
+	prod_pos = *rb->producer_pos;
+	hdr = rb->data + (prod_pos & rb->mask);
+
+	total_len = *hdr + BPF_RINGBUF_HDR_SZ;
+
+	/* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_poll() in
+	 * the kernel.
+	 */
+	smp_store_release(rb->producer_pos, prod_pos + total_len);
+}
+
+/* Discard a previously reserved sample into the ring buffer. Because the user
+ * ringbuffer is assumed to be single producer, this can simply be a no-op, and
+ * the producer pointer is left in the same place as when it was reserved.
+ */
+void ring_buffer_user__discard(struct ring_buffer_user *rb, void *sample)
+{}
+
+/* Submit a previously reserved sample into the ring buffer.
+ */
+void ring_buffer_user__submit(struct ring_buffer_user *rb, void *sample)
+{
+	__ring_buffer_user__commit(rb);
+}
+
+/* Reserve a pointer to a sample in the user ring buffer. This function is *not*
+ * thread safe, and the ring-buffer supports only a single producer.
+ */
+void *ring_buffer_user__reserve(struct ring_buffer_user *rb, uint32_t size)
+{
+	uint32_t *hdr;
+	/* 64-bit to avoid overflow in case of extreme application behavior */
+	size_t avail_size, total_size, max_size;
+	unsigned long cons_pos, prod_pos;
+
+	/* Synchronizes with smp_store_release() in __bpf_user_ringbuf_poll() in
+	 * the kernel.
+	 */
+	cons_pos = smp_load_acquire(rb->consumer_pos);
+	/* Synchronizes with smp_store_release() in __ring_buffer_user__commit()
+	 */
+	prod_pos = smp_load_acquire(rb->producer_pos);
+
+	max_size = rb->mask + 1;
+	avail_size = max_size - (prod_pos - cons_pos);
+	total_size = size + BPF_RINGBUF_HDR_SZ;
+
+	if (total_size > max_size || avail_size < total_size)
+		return NULL;
+
+	hdr = rb->data + (prod_pos & rb->mask);
+	*hdr = size;
+
+	/* Producer pos is updated when a sample is submitted. */
+
+	return (void *)rb->data + ((prod_pos + BPF_RINGBUF_HDR_SZ) & rb->mask);
+}
+
+/* Poll for available space in the ringbuffer, and reserve a record when it
+ * becomes available.
+ */
+void *ring_buffer_user__poll(struct ring_buffer_user *rb, uint32_t size,
+			     int timeout_ms)
+{
+	int cnt;
+
+	cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, timeout_ms);
+	if (cnt < 0)
+		return NULL;
+
+	return ring_buffer_user__reserve(rb, size);
+}
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [PATCH 5/5] selftests/bpf: Add selftests validating the user ringbuf
  2022-08-08 15:53 [PATCH 1/5] bpf: Clear callee saved regs after updating REG0 David Vernet
                   ` (2 preceding siblings ...)
  2022-08-08 15:53 ` [PATCH 4/5] bpf: Add libbpf logic for user-space ring buffer David Vernet
@ 2022-08-08 15:53 ` David Vernet
  2022-08-08 18:14 ` [PATCH 1/5] bpf: Clear callee saved regs after updating REG0 Joanne Koong
  4 siblings, 0 replies; 28+ messages in thread
From: David Vernet @ 2022-08-08 15:53 UTC (permalink / raw)
  To: bpf
  Cc: ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, joannelkoong, linux-kernel,
	Kernel-team

This change includes selftests that validate the expected behavior and
APIs of the new BPF_MAP_TYPE_USER_RINGBUF map type.

Signed-off-by: David Vernet <void@manifault.com>
---
 .../selftests/bpf/prog_tests/user_ringbuf.c   | 587 ++++++++++++++++++
 .../selftests/bpf/progs/user_ringbuf_fail.c   | 174 ++++++
 .../bpf/progs/user_ringbuf_success.c          | 220 +++++++
 .../testing/selftests/bpf/test_user_ringbuf.h |  35 ++
 4 files changed, 1016 insertions(+)
 create mode 100644 tools/testing/selftests/bpf/prog_tests/user_ringbuf.c
 create mode 100644 tools/testing/selftests/bpf/progs/user_ringbuf_fail.c
 create mode 100644 tools/testing/selftests/bpf/progs/user_ringbuf_success.c
 create mode 100644 tools/testing/selftests/bpf/test_user_ringbuf.h

diff --git a/tools/testing/selftests/bpf/prog_tests/user_ringbuf.c b/tools/testing/selftests/bpf/prog_tests/user_ringbuf.c
new file mode 100644
index 000000000000..d62adb74da73
--- /dev/null
+++ b/tools/testing/selftests/bpf/prog_tests/user_ringbuf.c
@@ -0,0 +1,587 @@
+// SPDX-License-Identifier: GPL-2.0
+/* Copyright (c) 2022 Meta Platforms, Inc. and affiliates. */
+
+#define _GNU_SOURCE
+#include <linux/compiler.h>
+#include <linux/ring_buffer.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/mman.h>
+#include <sys/syscall.h>
+#include <sys/sysinfo.h>
+#include <test_progs.h>
+#include <unistd.h>
+
+#include "user_ringbuf_fail.skel.h"
+#include "user_ringbuf_success.skel.h"
+
+#include "../test_user_ringbuf.h"
+
+static int duration;
+static size_t log_buf_sz = 1 << 20; /* 1 MB */
+static char obj_log_buf[1048576];
+static const long c_sample_size = sizeof(struct sample) + BPF_RINGBUF_HDR_SZ;
+static const long c_ringbuf_size = 1 << 12; /* 1 small page */
+static const long c_max_entries = c_ringbuf_size / c_sample_size;
+
+static void drain_current_samples(void)
+{
+	syscall(__NR_getpgid);
+}
+
+static int write_samples(struct ring_buffer_user *ringbuf, uint32_t num_samples)
+{
+	int i, err = 0;
+
+	// Write some number of samples to the ring buffer.
+	for (i = 0; i < num_samples; i++) {
+		struct sample *entry;
+		int read;
+
+		entry = ring_buffer_user__reserve(ringbuf, sizeof(*entry));
+		if (!entry) {
+			err = -ENOMEM;
+			goto done;
+		}
+
+		entry->pid = getpid();
+		entry->seq = i;
+		entry->value = i * i;
+
+		read = snprintf(entry->comm, sizeof(entry->comm), "%u", i);
+		if (read <= 0) {
+			/* Only invoke CHECK on the error path to avoid spamming
+			 * logs with mostly success messages.
+			 */
+			CHECK(read <= 0, "snprintf_comm",
+			      "Failed to write index %d to comm\n", i);
+			err = read;
+			ring_buffer_user__discard(ringbuf, entry);
+			goto done;
+		}
+
+		ring_buffer_user__submit(ringbuf, entry);
+	}
+
+done:
+	drain_current_samples();
+
+	return err;
+}
+
+static struct user_ringbuf_success*
+open_load_ringbuf_skel(void)
+{
+	struct user_ringbuf_success *skel;
+	int err;
+
+	skel = user_ringbuf_success__open();
+	if (CHECK(!skel, "skel_open", "skeleton open failed\n"))
+		return NULL;
+
+	err = bpf_map__set_max_entries(skel->maps.user_ringbuf,
+				       c_ringbuf_size);
+	if (CHECK(err != 0, "set_max_entries", "set max entries failed: %d\n", err))
+		goto cleanup;
+
+	err = bpf_map__set_max_entries(skel->maps.kernel_ringbuf,
+				       c_ringbuf_size);
+	if (CHECK(err != 0, "set_max_entries", "set max entries failed: %d\n", err))
+		goto cleanup;
+
+	err = user_ringbuf_success__load(skel);
+	if (CHECK(err != 0, "skel_load", "skeleton load failed\n"))
+		goto cleanup;
+
+	return skel;
+
+cleanup:
+	user_ringbuf_success__destroy(skel);
+	return NULL;
+}
+
+static void test_user_ringbuf_mappings(void)
+{
+	int err, rb_fd;
+	int page_size = getpagesize();
+	void *mmap_ptr;
+	struct user_ringbuf_success *skel;
+
+	skel = open_load_ringbuf_skel();
+	if (!skel)
+		return;
+
+	rb_fd = bpf_map__fd(skel->maps.user_ringbuf);
+	/* cons_pos can be mapped R/O, can't add +X with mprotect. */
+	mmap_ptr = mmap(NULL, page_size, PROT_READ, MAP_SHARED, rb_fd, 0);
+	ASSERT_OK_PTR(mmap_ptr, "ro_cons_pos");
+	ASSERT_ERR(mprotect(mmap_ptr, page_size, PROT_WRITE), "write_cons_pos_protect");
+	ASSERT_ERR(mprotect(mmap_ptr, page_size, PROT_EXEC), "exec_cons_pos_protect");
+	ASSERT_ERR_PTR(mremap(mmap_ptr, 0, 4 * page_size, MREMAP_MAYMOVE), "wr_prod_pos");
+	err = -errno;
+	ASSERT_EQ(err, -EPERM, "wr_prod_pos_err");
+	ASSERT_OK(munmap(mmap_ptr, page_size), "unmap_ro_cons");
+
+	/* prod_pos can be mapped RW, can't add +X with mprotect. */
+	mmap_ptr = mmap(NULL, page_size, PROT_READ | PROT_WRITE, MAP_SHARED,
+			rb_fd, page_size);
+	ASSERT_OK_PTR(mmap_ptr, "rw_prod_pos");
+	ASSERT_ERR(mprotect(mmap_ptr, page_size, PROT_EXEC), "exec_prod_pos_protect");
+	err = -errno;
+	ASSERT_EQ(err, -EACCES, "wr_prod_pos_err");
+	ASSERT_OK(munmap(mmap_ptr, page_size), "unmap_rw_prod");
+
+	/* data pages can be mapped RW, can't add +X with mprotect. */
+	mmap_ptr = mmap(NULL, page_size, PROT_WRITE, MAP_SHARED, rb_fd,
+			2 * page_size);
+	ASSERT_OK_PTR(mmap_ptr, "rw_data");
+	ASSERT_ERR(mprotect(mmap_ptr, page_size, PROT_EXEC), "exec_data_protect");
+	err = -errno;
+	ASSERT_EQ(err, -EACCES, "exec_data_err");
+	ASSERT_OK(munmap(mmap_ptr, page_size), "unmap_rw_data");
+
+	user_ringbuf_success__destroy(skel);
+}
+
+static int load_skel_create_ringbufs(struct user_ringbuf_success **skel_out,
+				     struct ring_buffer **kern_ringbuf_out,
+				     ring_buffer_sample_fn callback,
+				     struct ring_buffer_user **user_ringbuf_out)
+{
+	struct user_ringbuf_success *skel;
+	struct ring_buffer *kern_ringbuf = NULL;
+	struct ring_buffer_user *user_ringbuf = NULL;
+	int err = -ENOMEM, rb_fd;
+
+	skel = open_load_ringbuf_skel();
+	if (!skel)
+		return err;
+
+	/* only trigger BPF program for current process */
+	skel->bss->pid = getpid();
+
+	if (kern_ringbuf_out) {
+		rb_fd = bpf_map__fd(skel->maps.kernel_ringbuf);
+		kern_ringbuf = ring_buffer__new(rb_fd, callback, skel, NULL);
+		if (CHECK(!kern_ringbuf, "kern_ringbuf_create",
+					"failed to create kern ringbuf\n"))
+			goto cleanup;
+
+		*kern_ringbuf_out = kern_ringbuf;
+	}
+
+	if (user_ringbuf_out) {
+		rb_fd = bpf_map__fd(skel->maps.user_ringbuf);
+		user_ringbuf = ring_buffer_user__new(rb_fd, NULL);
+		if (CHECK(!user_ringbuf, "user_ringbuf_create",
+			  "failed to create user ringbuf\n"))
+			goto cleanup;
+
+		*user_ringbuf_out = user_ringbuf;
+		ASSERT_EQ(skel->bss->read, 0, "no_reads_after_load");
+	}
+
+	err = user_ringbuf_success__attach(skel);
+	if (CHECK(err != 0, "skel_attach", "skeleton attachment failed: %d\n",
+		  err))
+		goto cleanup;
+
+	*skel_out = skel;
+	return 0;
+
+cleanup:
+	if (kern_ringbuf_out)
+		*kern_ringbuf_out = NULL;
+	if (user_ringbuf_out)
+		*user_ringbuf_out = NULL;
+	ring_buffer__free(kern_ringbuf);
+	ring_buffer_user__free(user_ringbuf);
+	user_ringbuf_success__destroy(skel);
+	return err;
+}
+
+static int
+load_skel_create_user_ringbuf(struct user_ringbuf_success **skel_out,
+			      struct ring_buffer_user **ringbuf_out)
+{
+	return load_skel_create_ringbufs(skel_out, NULL, NULL, ringbuf_out);
+}
+
+static void test_user_ringbuf_commit(void)
+{
+	struct user_ringbuf_success *skel;
+	struct ring_buffer_user *ringbuf;
+	int err;
+
+	err = load_skel_create_user_ringbuf(&skel, &ringbuf);
+	if (err)
+		return;
+
+	ASSERT_EQ(skel->bss->read, 0, "num_samples_read_before");
+
+	err = write_samples(ringbuf, 2);
+	if (CHECK(err, "write_samples", "failed to write samples: %d\n", err))
+		goto cleanup;
+
+	ASSERT_EQ(skel->bss->read, 2, "num_samples_read_after");
+
+cleanup:
+	ring_buffer_user__free(ringbuf);
+	user_ringbuf_success__destroy(skel);
+}
+
+static void test_user_ringbuf_fill(void)
+{
+	struct user_ringbuf_success *skel;
+	struct ring_buffer_user *ringbuf;
+	int err;
+
+	err = load_skel_create_user_ringbuf(&skel, &ringbuf);
+	if (err)
+		return;
+
+	err = write_samples(ringbuf, c_max_entries * 5);
+	ASSERT_EQ(err, -ENOMEM, "too_many_samples_posted");
+	ASSERT_EQ(skel->bss->read, c_max_entries, "max_entries");
+
+	ring_buffer_user__free(ringbuf);
+	user_ringbuf_success__destroy(skel);
+}
+
+static void test_user_ringbuf_loop(void)
+{
+	struct user_ringbuf_success *skel;
+	struct ring_buffer_user *ringbuf;
+	uint32_t total_samples = 8192;
+	uint32_t remaining_samples = total_samples;
+	int err;
+
+	err = load_skel_create_user_ringbuf(&skel, &ringbuf);
+	if (err)
+		return;
+
+	do  {
+		uint32_t curr_samples;
+
+		curr_samples = remaining_samples > c_max_entries
+			? c_max_entries : remaining_samples;
+		err = write_samples(ringbuf, curr_samples);
+		if (err != 0) {
+			/* Perform CHECK inside of if statement to avoid
+			 * flooding logs on the success path.
+			 */
+			CHECK(err, "write_samples",
+					"failed to write sample batch: %d\n", err);
+			goto cleanup;
+		}
+
+		remaining_samples -= curr_samples;
+		ASSERT_EQ(skel->bss->read, total_samples - remaining_samples,
+			  "current_batched_entries");
+	} while (remaining_samples > 0);
+	ASSERT_EQ(skel->bss->read, total_samples, "total_batched_entries");
+
+cleanup:
+	ring_buffer_user__free(ringbuf);
+	user_ringbuf_success__destroy(skel);
+}
+
+static int send_test_message(struct ring_buffer_user *ringbuf,
+			     enum test_msg_op op, s64 operand_64,
+			     s32 operand_32)
+{
+	struct test_msg *msg;
+
+	msg = ring_buffer_user__reserve(ringbuf, sizeof(*msg));
+	if (!msg) {
+		/* Only invoke CHECK on the error path to avoid spamming
+		 * logs with mostly success messages.
+		 */
+		CHECK(msg != NULL, "reserve_msg",
+		      "Failed to reserve message\n");
+		return -ENOMEM;
+	}
+
+	msg->msg_op = op;
+
+	switch (op) {
+	case TEST_MSG_OP_INC64:
+	case TEST_MSG_OP_MUL64:
+		msg->operand_64 = operand_64;
+		break;
+	case TEST_MSG_OP_INC32:
+	case TEST_MSG_OP_MUL32:
+		msg->operand_32 = operand_32;
+		break;
+	default:
+		PRINT_FAIL("Invalid operand %d\n", op);
+		ring_buffer_user__discard(ringbuf, msg);
+		return -EINVAL;
+	}
+
+	ring_buffer_user__submit(ringbuf, msg);
+
+	return 0;
+}
+
+static void kick_kernel_read_messages(void)
+{
+	syscall(__NR_getcwd);
+}
+
+static int handle_kernel_msg(void *ctx, void *data, size_t len)
+{
+	struct user_ringbuf_success *skel = ctx;
+	struct test_msg *msg = data;
+
+	switch (msg->msg_op) {
+	case TEST_MSG_OP_INC64:
+		skel->bss->user_mutated += msg->operand_64;
+		return 0;
+	case TEST_MSG_OP_INC32:
+		skel->bss->user_mutated += msg->operand_32;
+		return 0;
+	case TEST_MSG_OP_MUL64:
+		skel->bss->user_mutated *= msg->operand_64;
+		return 0;
+	case TEST_MSG_OP_MUL32:
+		skel->bss->user_mutated *= msg->operand_32;
+		return 0;
+	default:
+		fprintf(stderr, "Invalid operand %d\n", msg->msg_op);
+		return -EINVAL;
+	}
+}
+
+static void drain_kernel_messages_buffer(struct ring_buffer *kern_ringbuf)
+{
+	int err;
+
+	err = ring_buffer__consume(kern_ringbuf);
+	if (err)
+		/* Only check in failure to avoid spamming success logs. */
+		CHECK(!err, "consume_kern_ringbuf",
+		      "Failed to consume kernel ringbuf\n");
+}
+
+static void test_user_ringbuf_msg_protocol(void)
+{
+	struct user_ringbuf_success *skel;
+	struct ring_buffer_user *user_ringbuf;
+	struct ring_buffer *kern_ringbuf;
+	int err, i;
+	__u64 expected_kern = 0;
+
+	err = load_skel_create_ringbufs(&skel, &kern_ringbuf, handle_kernel_msg,
+					&user_ringbuf);
+	if (CHECK(err, "create_ringbufs", "Failed to create ringbufs: %d\n",
+		  err))
+		return;
+
+	for (i = 0; i < 64; i++) {
+		enum test_msg_op op = i % TEST_MSG_OP_NUM_OPS;
+		__u64 operand_64 = TEST_OP_64;
+		__u32 operand_32 = TEST_OP_32;
+
+		err = send_test_message(user_ringbuf, op, operand_64,
+					operand_32);
+		if (err) {
+			CHECK(err, "send_test_message",
+			"Failed to send test message\n");
+			goto cleanup;
+		}
+
+		switch (op) {
+		case TEST_MSG_OP_INC64:
+			expected_kern += operand_64;
+			break;
+		case TEST_MSG_OP_INC32:
+			expected_kern += operand_32;
+			break;
+		case TEST_MSG_OP_MUL64:
+			expected_kern *= operand_64;
+			break;
+		case TEST_MSG_OP_MUL32:
+			expected_kern *= operand_32;
+			break;
+		default:
+			PRINT_FAIL("Unexpected op %d\n", op);
+			goto cleanup;
+		}
+
+		if (i % 8 == 0) {
+			kick_kernel_read_messages();
+			ASSERT_EQ(skel->bss->kern_mutated, expected_kern,
+				  "expected_kern");
+			ASSERT_EQ(skel->bss->err, 0, "bpf_prog_err");
+			drain_kernel_messages_buffer(kern_ringbuf);
+		}
+	}
+
+cleanup:
+	ring_buffer__free(kern_ringbuf);
+	ring_buffer_user__free(user_ringbuf);
+	user_ringbuf_success__destroy(skel);
+}
+
+static void *kick_kernel_cb(void *arg)
+{
+	/* Sleep to better exercise the path for the main thread waiting in
+	 * poll_wait().
+	 */
+	sleep(1);
+
+	/* Kick the kernel, causing it to drain the ringbuffer and then wake up
+	 * the test thread waiting on epoll.
+	 */
+	syscall(__NR_getrlimit);
+
+	return NULL;
+}
+
+static int spawn_kick_thread_for_poll(void)
+{
+	pthread_t thread;
+
+	return pthread_create(&thread, NULL, kick_kernel_cb, NULL);
+}
+
+static void test_user_ringbuf_poll_wait(void)
+{
+	struct user_ringbuf_success *skel;
+	struct ring_buffer_user *ringbuf;
+	int err, num_written = 0;
+	__u64 *token;
+
+	err = load_skel_create_user_ringbuf(&skel, &ringbuf);
+	if (err)
+		return;
+
+	ASSERT_EQ(skel->bss->read, 0, "num_samples_read_before");
+
+	while (1) {
+		/* Write samples until the buffer is full. */
+		token = ring_buffer_user__reserve(ringbuf, sizeof(*token));
+		if (!token)
+			break;
+
+		*token = 0xdeadbeef;
+
+		ring_buffer_user__submit(ringbuf, token);
+		num_written++;
+	}
+
+	if (!ASSERT_GE(num_written, 0, "num_written"))
+		goto cleanup;
+
+	/* Should not have read any samples until the kernel is kicked. */
+	ASSERT_EQ(skel->bss->read, 0, "num_pre_kick");
+
+	token = ring_buffer_user__poll(ringbuf, sizeof(*token), 1000);
+	if (!ASSERT_EQ(token, NULL, "pre_kick_timeout_token"))
+		goto cleanup;
+
+	err = spawn_kick_thread_for_poll();
+	if (!ASSERT_EQ(err, 0, "deferred_kick_thread\n"))
+		goto cleanup;
+
+	token = ring_buffer_user__poll(ringbuf, sizeof(*token), 10000);
+	if (!token) {
+		PRINT_FAIL("Failed to poll for user ringbuf entry\n");
+		ring_buffer_user__discard(ringbuf, token);
+		goto cleanup;
+	}
+
+	ASSERT_EQ(skel->bss->read, num_written, "num_post_kill");
+	ASSERT_EQ(skel->bss->err, 0, "err_post_poll");
+	ring_buffer_user__discard(ringbuf, token);
+
+cleanup:
+	ring_buffer_user__free(ringbuf);
+	user_ringbuf_success__destroy(skel);
+}
+
+static struct {
+	const char *prog_name;
+	const char *expected_err_msg;
+} failure_tests[] = {
+	/* failure cases */
+	{"user_ringbuf_callback_bad_access1", "negative offset alloc_dynptr_ptr ptr"},
+	{"user_ringbuf_callback_bad_access2", "dereference of modified alloc_dynptr_ptr ptr"},
+	{"user_ringbuf_callback_write_forbidden", "invalid mem access 'alloc_dynptr_ptr'"},
+	{"user_ringbuf_callback_null_context_write", "invalid mem access 'scalar'"},
+	{"user_ringbuf_callback_null_context_read", "invalid mem access 'scalar'"},
+	{"user_ringbuf_callback_discard_dynptr", "arg 1 is an unacquired reference"},
+	{"user_ringbuf_callback_submit_dynptr", "arg 1 is an unacquired reference"},
+};
+
+#define SUCCESS_TEST(_func) { _func, #_func }
+
+static struct {
+	void (*test_callback)(void);
+	const char *test_name;
+} success_tests[] = {
+	SUCCESS_TEST(test_user_ringbuf_mappings),
+	SUCCESS_TEST(test_user_ringbuf_commit),
+	SUCCESS_TEST(test_user_ringbuf_fill),
+	SUCCESS_TEST(test_user_ringbuf_loop),
+	SUCCESS_TEST(test_user_ringbuf_msg_protocol),
+	SUCCESS_TEST(test_user_ringbuf_poll_wait),
+};
+
+static void verify_fail(const char *prog_name, const char *expected_err_msg)
+{
+	LIBBPF_OPTS(bpf_object_open_opts, opts);
+	struct bpf_program *prog;
+	struct user_ringbuf_fail *skel;
+	int err;
+
+	opts.kernel_log_buf = obj_log_buf;
+	opts.kernel_log_size = log_buf_sz;
+	opts.kernel_log_level = 1;
+
+	skel = user_ringbuf_fail__open_opts(&opts);
+	if (!ASSERT_OK_PTR(skel, "dynptr_fail__open_opts"))
+		goto cleanup;
+
+	prog = bpf_object__find_program_by_name(skel->obj, prog_name);
+	if (!ASSERT_OK_PTR(prog, "bpf_object__find_program_by_name"))
+		goto cleanup;
+
+	bpf_program__set_autoload(prog, true);
+
+	bpf_map__set_max_entries(skel->maps.user_ringbuf, getpagesize());
+
+	err = user_ringbuf_fail__load(skel);
+	if (!ASSERT_ERR(err, "unexpected load success"))
+		goto cleanup;
+
+	if (!ASSERT_OK_PTR(strstr(obj_log_buf, expected_err_msg), "expected_err_msg")) {
+		fprintf(stderr, "Expected err_msg: %s\n", expected_err_msg);
+		fprintf(stderr, "Verifier output: %s\n", obj_log_buf);
+	}
+
+cleanup:
+	user_ringbuf_fail__destroy(skel);
+}
+
+void test_user_ringbuf(void)
+{
+	int i;
+
+	for (i = 0; i < ARRAY_SIZE(success_tests); i++) {
+		if (!test__start_subtest(success_tests[i].test_name))
+			continue;
+
+		success_tests[i].test_callback();
+	}
+
+	for (i = 0; i < ARRAY_SIZE(failure_tests); i++) {
+		if (!test__start_subtest(failure_tests[i].prog_name))
+			continue;
+
+		verify_fail(failure_tests[i].prog_name,
+			    failure_tests[i].expected_err_msg);
+	}
+}
diff --git a/tools/testing/selftests/bpf/progs/user_ringbuf_fail.c b/tools/testing/selftests/bpf/progs/user_ringbuf_fail.c
new file mode 100644
index 000000000000..237df8ca637a
--- /dev/null
+++ b/tools/testing/selftests/bpf/progs/user_ringbuf_fail.c
@@ -0,0 +1,174 @@
+// SPDX-License-Identifier: GPL-2.0
+/* Copyright (c) 2022 Meta Platforms, Inc. and affiliates. */
+
+#include <linux/bpf.h>
+#include <bpf/bpf_helpers.h>
+#include "bpf_misc.h"
+
+char _license[] SEC("license") = "GPL";
+
+struct sample {
+	int pid;
+	int seq;
+	long value;
+	char comm[16];
+};
+
+struct {
+	__uint(type, BPF_MAP_TYPE_USER_RINGBUF);
+} user_ringbuf SEC(".maps");
+
+static long
+bad_access1(struct bpf_dynptr *dynptr, void *context)
+{
+	const struct sample *sample;
+
+	sample = bpf_dynptr_data(dynptr - 1, 0, sizeof(*sample));
+	bpf_printk("Was able to pass bad pointer %lx\n", (__u64)dynptr - 1);
+
+	return 0;
+}
+
+/* A callback that accesses a dynptr in a bpf_user_ringbuf_drain callback should
+ * not be able to read before the pointer.
+ */
+SEC("?raw_tp/sys_nanosleep")
+int user_ringbuf_callback_bad_access1(void *ctx)
+{
+	struct bpf_dynptr ptr;
+
+	bpf_user_ringbuf_drain(&user_ringbuf, bad_access1, NULL, 0);
+
+	return 0;
+}
+
+static long
+bad_access2(struct bpf_dynptr *dynptr, void *context)
+{
+	const struct sample *sample;
+
+	sample = bpf_dynptr_data(dynptr + 1, 0, sizeof(*sample));
+	bpf_printk("Was able to pass bad pointer %lx\n", (__u64)dynptr + 1);
+
+	return 0;
+}
+
+/* A callback that accesses a dynptr in a bpf_user_ringbuf_drain callback should
+ * not be able to read past the end of the pointer.
+ */
+SEC("?raw_tp/sys_nanosleep")
+int user_ringbuf_callback_bad_access2(void *ctx)
+{
+	struct bpf_dynptr ptr;
+
+	bpf_user_ringbuf_drain(&user_ringbuf, bad_access2, NULL, 0);
+
+	return 0;
+}
+
+static long
+write_forbidden(struct bpf_dynptr *dynptr, void *context)
+{
+	*((long *)dynptr) = 0;
+
+	return 0;
+}
+
+/* A callback that accesses a dynptr in a bpf_user_ringbuf_drain callback should
+ * not be able to write to that pointer.
+ */
+SEC("?raw_tp/sys_nanosleep")
+int user_ringbuf_callback_write_forbidden(void *ctx)
+{
+	struct bpf_dynptr ptr;
+
+	bpf_user_ringbuf_drain(&user_ringbuf, write_forbidden, NULL, 0);
+
+	return 0;
+}
+
+static long
+null_context_write(struct bpf_dynptr *dynptr, void *context)
+{
+	*((__u64 *)context) = 0;
+
+	return 0;
+}
+
+/* A callback that accesses a dynptr in a bpf_user_ringbuf_drain callback should
+ * not be able to write to that pointer.
+ */
+SEC("?raw_tp/sys_nanosleep")
+int user_ringbuf_callback_null_context_write(void *ctx)
+{
+	struct bpf_dynptr ptr;
+
+	bpf_user_ringbuf_drain(&user_ringbuf, null_context_write, NULL, 0);
+
+	return 0;
+}
+
+static long
+null_context_read(struct bpf_dynptr *dynptr, void *context)
+{
+	__u64 id = *((__u64 *)context);
+
+	bpf_printk("Read id %lu\n", id);
+
+	return 0;
+}
+
+/* A callback that accesses a dynptr in a bpf_user_ringbuf_drain callback should
+ * not be able to write to that pointer.
+ */
+SEC("?raw_tp/sys_nanosleep")
+int user_ringbuf_callback_null_context_read(void *ctx)
+{
+	struct bpf_dynptr ptr;
+
+	bpf_user_ringbuf_drain(&user_ringbuf, null_context_read, NULL, 0);
+
+	return 0;
+}
+
+static long
+try_discard_dynptr(struct bpf_dynptr *dynptr, void *context)
+{
+	bpf_ringbuf_discard_dynptr(dynptr, 0);
+
+	return 0;
+}
+
+/* A callback that accesses a dynptr in a bpf_user_ringbuf_drain callback should
+ * not be able to read past the end of the pointer.
+ */
+SEC("?raw_tp/sys_nanosleep")
+int user_ringbuf_callback_discard_dynptr(void *ctx)
+{
+	struct bpf_dynptr ptr;
+
+	bpf_user_ringbuf_drain(&user_ringbuf, try_discard_dynptr, NULL, 0);
+
+	return 0;
+}
+
+static long
+try_submit_dynptr(struct bpf_dynptr *dynptr, void *context)
+{
+	bpf_ringbuf_submit_dynptr(dynptr, 0);
+
+	return 0;
+}
+
+/* A callback that accesses a dynptr in a bpf_user_ringbuf_drain callback should
+ * not be able to read past the end of the pointer.
+ */
+SEC("?raw_tp/sys_nanosleep")
+int user_ringbuf_callback_submit_dynptr(void *ctx)
+{
+	struct bpf_dynptr ptr;
+
+	bpf_user_ringbuf_drain(&user_ringbuf, try_submit_dynptr, NULL, 0);
+
+	return 0;
+}
diff --git a/tools/testing/selftests/bpf/progs/user_ringbuf_success.c b/tools/testing/selftests/bpf/progs/user_ringbuf_success.c
new file mode 100644
index 000000000000..ab8035a47da5
--- /dev/null
+++ b/tools/testing/selftests/bpf/progs/user_ringbuf_success.c
@@ -0,0 +1,220 @@
+// SPDX-License-Identifier: GPL-2.0
+/* Copyright (c) 2022 Meta Platforms, Inc. and affiliates. */
+
+#include <linux/bpf.h>
+#include <bpf/bpf_helpers.h>
+#include "bpf_misc.h"
+#include "../test_user_ringbuf.h"
+
+char _license[] SEC("license") = "GPL";
+
+struct {
+	__uint(type, BPF_MAP_TYPE_USER_RINGBUF);
+} user_ringbuf SEC(".maps");
+
+struct {
+	__uint(type, BPF_MAP_TYPE_RINGBUF);
+} kernel_ringbuf SEC(".maps");
+
+/* inputs */
+int pid, err, val;
+
+int read = 0;
+
+/* Counter used for end-to-end protocol test */
+__u64 kern_mutated = 0;
+__u64 user_mutated = 0;
+__u64 expected_user_mutated = 0;
+
+static int
+is_test_process(void)
+{
+	int cur_pid = bpf_get_current_pid_tgid() >> 32;
+
+	return cur_pid == pid;
+}
+
+static long
+record_sample(struct bpf_dynptr *dynptr, void *context)
+{
+	const struct sample *sample = NULL;
+	struct sample stack_sample;
+	int status;
+	static int num_calls;
+
+	if (num_calls++ % 2 == 0) {
+		status = bpf_dynptr_read(&stack_sample, sizeof(stack_sample),
+					 dynptr, 0, 0);
+		if (status) {
+			bpf_printk("bpf_dynptr_read() failed: %d\n", status);
+			err = 1;
+			return 0;
+		}
+	} else {
+		sample = bpf_dynptr_data(dynptr, 0, sizeof(*sample));
+		if (!sample) {
+			bpf_printk("Unexpectedly failed to get sample\n");
+			err = 2;
+			return 0;
+		}
+		stack_sample = *sample;
+	}
+
+	__sync_fetch_and_add(&read, 1);
+	return 0;
+}
+
+static void
+handle_sample_msg(const struct test_msg *msg)
+{
+	switch (msg->msg_op) {
+	case TEST_MSG_OP_INC64:
+		kern_mutated += msg->operand_64;
+		break;
+	case TEST_MSG_OP_INC32:
+		kern_mutated += msg->operand_32;
+		break;
+	case TEST_MSG_OP_MUL64:
+		kern_mutated *= msg->operand_64;
+		break;
+	case TEST_MSG_OP_MUL32:
+		kern_mutated *= msg->operand_32;
+		break;
+	default:
+		bpf_printk("Unrecognized op %d\n", msg->msg_op);
+		err = 2;
+	}
+}
+
+static long
+read_protocol_msg(struct bpf_dynptr *dynptr, void *context)
+{
+	const struct test_msg *msg = NULL;
+
+	msg = bpf_dynptr_data(dynptr, 0, sizeof(*msg));
+	if (!msg) {
+		err = 1;
+		bpf_printk("Unexpectedly failed to get msg\n");
+		return 0;
+	}
+
+	handle_sample_msg(msg);
+
+	return 0;
+}
+
+static int publish_next_kern_msg(__u32 index, void *context)
+{
+	struct test_msg *msg = NULL;
+	int operand_64 = TEST_OP_64;
+	int operand_32 = TEST_OP_32;
+
+	msg = bpf_ringbuf_reserve(&kernel_ringbuf, sizeof(*msg), 0);
+	if (!msg) {
+		err = 4;
+		return 1;
+	}
+
+	switch (index % TEST_MSG_OP_NUM_OPS) {
+	case TEST_MSG_OP_INC64:
+		msg->operand_64 = operand_64;
+		msg->msg_op = TEST_MSG_OP_INC64;
+		expected_user_mutated += operand_64;
+		break;
+	case TEST_MSG_OP_INC32:
+		msg->operand_32 = operand_32;
+		msg->msg_op = TEST_MSG_OP_INC32;
+		expected_user_mutated += operand_32;
+		break;
+	case TEST_MSG_OP_MUL64:
+		msg->operand_64 = operand_64;
+		msg->msg_op = TEST_MSG_OP_MUL64;
+		expected_user_mutated *= operand_64;
+		break;
+	case TEST_MSG_OP_MUL32:
+		msg->operand_32 = operand_32;
+		msg->msg_op = TEST_MSG_OP_MUL32;
+		expected_user_mutated *= operand_32;
+		break;
+	default:
+		bpf_ringbuf_discard(msg, 0);
+		err = 5;
+		return 1;
+	}
+
+	bpf_ringbuf_submit(msg, 0);
+
+	return 0;
+}
+
+static void
+publish_kern_messages(void)
+{
+	if (expected_user_mutated != user_mutated) {
+		bpf_printk("%d != %d\n", expected_user_mutated, user_mutated);
+		err = 3;
+		return;
+	}
+
+	bpf_loop(8, publish_next_kern_msg, NULL, 0);
+}
+
+SEC("fentry/" SYS_PREFIX "sys_getcwd")
+int test_user_ringbuf_protocol(void *ctx)
+{
+	long status = 0;
+	struct sample *sample = NULL;
+	struct bpf_dynptr ptr;
+
+	if (!is_test_process())
+		return 0;
+
+	status = bpf_user_ringbuf_drain(&user_ringbuf, read_protocol_msg, NULL, 0);
+	if (status < 0) {
+		bpf_printk("Drain returned: %ld\n", status);
+		err = 1;
+		return 0;
+	}
+
+	publish_kern_messages();
+
+	return 0;
+}
+
+SEC("fentry/" SYS_PREFIX "sys_getpgid")
+int test_user_ringbuf(void *ctx)
+{
+	int status = 0;
+	struct sample *sample = NULL;
+	struct bpf_dynptr ptr;
+
+	if (!is_test_process())
+		return 0;
+
+	err = bpf_user_ringbuf_drain(&user_ringbuf, record_sample, NULL, 0);
+
+	return 0;
+}
+
+static long
+do_nothing_cb(struct bpf_dynptr *dynptr, void *context)
+{
+	__sync_fetch_and_add(&read, 1);
+	return 0;
+}
+
+SEC("fentry/" SYS_PREFIX "sys_getrlimit")
+int test_user_ringbuf_epoll(void *ctx)
+{
+	long num_samples;
+
+	if (!is_test_process())
+		return 0;
+
+	num_samples = bpf_user_ringbuf_drain(&user_ringbuf, do_nothing_cb, NULL,
+					     0);
+	if (num_samples <= 0)
+		err = 1;
+
+	return 0;
+}
diff --git a/tools/testing/selftests/bpf/test_user_ringbuf.h b/tools/testing/selftests/bpf/test_user_ringbuf.h
new file mode 100644
index 000000000000..1643b4d59ba7
--- /dev/null
+++ b/tools/testing/selftests/bpf/test_user_ringbuf.h
@@ -0,0 +1,35 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+/* Copyright (c) 2022 Meta Platforms, Inc. and affiliates. */
+
+#ifndef _TEST_USER_RINGBUF_H
+#define _TEST_USER_RINGBUF_H
+
+#define TEST_OP_64 4
+#define TEST_OP_32 2
+
+enum test_msg_op {
+	TEST_MSG_OP_INC64,
+	TEST_MSG_OP_INC32,
+	TEST_MSG_OP_MUL64,
+	TEST_MSG_OP_MUL32,
+
+	// Must come last.
+	TEST_MSG_OP_NUM_OPS,
+};
+
+struct test_msg {
+	enum test_msg_op msg_op;
+	union {
+		__s64 operand_64;
+		__s32 operand_32;
+	};
+};
+
+struct sample {
+	int pid;
+	int seq;
+	long value;
+	char comm[16];
+};
+
+#endif /* _TEST_USER_RINGBUF_H */
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 28+ messages in thread

* Re: [PATCH 1/5] bpf: Clear callee saved regs after updating REG0
  2022-08-08 15:53 [PATCH 1/5] bpf: Clear callee saved regs after updating REG0 David Vernet
                   ` (3 preceding siblings ...)
  2022-08-08 15:53 ` [PATCH 5/5] selftests/bpf: Add selftests validating the user ringbuf David Vernet
@ 2022-08-08 18:14 ` Joanne Koong
  2022-08-08 18:50   ` David Vernet
  4 siblings, 1 reply; 28+ messages in thread
From: Joanne Koong @ 2022-08-08 18:14 UTC (permalink / raw)
  To: David Vernet
  Cc: bpf, ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, linux-kernel, Kernel-team

On Mon, Aug 8, 2022 at 8:53 AM David Vernet <void@manifault.com> wrote:
>
> In the verifier, we currently reset all of the registers containing caller
> saved args before updating the callee's return register (REG0). In a
> follow-on patch, we will need to be able to be able to inspect the caller
> saved registers when updating REG0 to determine if a dynptr that's passed
> to a helper function was allocated by a helper, or allocated by a program.
>
> This patch therefore updates check_helper_call() to clear the caller saved
> regs after updating REG0.
>
Overall lgtm

There's a patch [0] that finds + stores the ref obj id before the
caller saved regs get reset, which would make this patch not needed.
That hasn't been merged in yet though and I think there's pros for
either approach.

In the one where we find + store the ref obj id before any caller
saved regs get reset, the pro is that getting the dynptr metadata (eg
ref obj id and in the near future, the dynptr type as well) earlier
will be useful (eg when we add skb/xdp dynptrs [1], we'll need to know
the type of the dynptr in order to determine whether to set the return
reg as PTR_TO_PACKET). In this patch, the pro is that the logic is a
lot more obvious to readers that the ref obj id for the dynptr gets
found and set in order to store it in the return reg's ref obj id.

I personally lean more towards the approach in [0] because I think
that ends up being cleaner for future extensibility, but I don't feel
strongly about it and would be happy going with this approach as well

[0] https://lore.kernel.org/bpf/20220722175807.4038317-1-joannelkoong@gmail.com/#t

[1] https://lore.kernel.org/bpf/20220726184706.954822-1-joannelkoong@gmail.com/T/#t

> Signed-off-by: David Vernet <void@manifault.com>
> ---
>  kernel/bpf/verifier.c | 15 ++++++++++-----
>  1 file changed, 10 insertions(+), 5 deletions(-)
>
> diff --git a/kernel/bpf/verifier.c b/kernel/bpf/verifier.c
> index 096fdac70165..938ba1536249 100644
> --- a/kernel/bpf/verifier.c
> +++ b/kernel/bpf/verifier.c
> @@ -7348,11 +7348,9 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
>         if (err)
>                 return err;
>
> -       /* reset caller saved regs */
> -       for (i = 0; i < CALLER_SAVED_REGS; i++) {
> -               mark_reg_not_init(env, regs, caller_saved[i]);
> -               check_reg_arg(env, caller_saved[i], DST_OP_NO_MARK);
> -       }
> +       /* reset return reg */
> +       mark_reg_not_init(env, regs, BPF_REG_0);
> +       check_reg_arg(env, BPF_REG_0, DST_OP_NO_MARK);
>
>         /* helper call returns 64-bit value. */
>         regs[BPF_REG_0].subreg_def = DEF_NOT_SUBREG;
> @@ -7488,6 +7486,13 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
>                 regs[BPF_REG_0].ref_obj_id = dynptr_id;
>         }
>
> +       /* reset remaining caller saved regs */
> +       BUILD_BUG_ON(caller_saved[0] != BPF_REG_0);

nit: caller_saved is a read-only const, so I don't think this line is needed

> +       for (i = 1; i < CALLER_SAVED_REGS; i++) {

nit: maybe "for i = BPF_REG_1" ?

> +               mark_reg_not_init(env, regs, caller_saved[i]);
> +               check_reg_arg(env, caller_saved[i], DST_OP_NO_MARK);
> +       }
> +
>         do_refine_retval_range(regs, fn->ret_type, func_id, &meta);
>
>         err = check_map_func_compatibility(env, meta.map_ptr, func_id);
> --
> 2.30.2
>

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 1/5] bpf: Clear callee saved regs after updating REG0
  2022-08-08 18:14 ` [PATCH 1/5] bpf: Clear callee saved regs after updating REG0 Joanne Koong
@ 2022-08-08 18:50   ` David Vernet
  2022-08-08 23:32     ` Joanne Koong
  0 siblings, 1 reply; 28+ messages in thread
From: David Vernet @ 2022-08-08 18:50 UTC (permalink / raw)
  To: Joanne Koong
  Cc: bpf, ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, linux-kernel, Kernel-team

On Mon, Aug 08, 2022 at 11:14:48AM -0700, Joanne Koong wrote:
> On Mon, Aug 8, 2022 at 8:53 AM David Vernet <void@manifault.com> wrote:
> >
> > In the verifier, we currently reset all of the registers containing caller
> > saved args before updating the callee's return register (REG0). In a
> > follow-on patch, we will need to be able to be able to inspect the caller
> > saved registers when updating REG0 to determine if a dynptr that's passed
> > to a helper function was allocated by a helper, or allocated by a program.
> >
> > This patch therefore updates check_helper_call() to clear the caller saved
> > regs after updating REG0.
> >
> Overall lgtm

Thanks for the quick review!

> There's a patch [0] that finds + stores the ref obj id before the
> caller saved regs get reset, which would make this patch not needed.

Interesting. Indeed, that would solve this issue, and I'm fine with that
approach as well, if not preferential to it.

> That hasn't been merged in yet though and I think there's pros for
> either approach.
> 
> In the one where we find + store the ref obj id before any caller
> saved regs get reset, the pro is that getting the dynptr metadata (eg
> ref obj id and in the near future, the dynptr type as well) earlier
> will be useful (eg when we add skb/xdp dynptrs [1], we'll need to know
> the type of the dynptr in order to determine whether to set the return
> reg as PTR_TO_PACKET). In this patch, the pro is that the logic is a
> lot more obvious to readers that the ref obj id for the dynptr gets
> found and set in order to store it in the return reg's ref obj id.
> 
> I personally lean more towards the approach in [0] because I think
> that ends up being cleaner for future extensibility, but I don't feel
> strongly about it and would be happy going with this approach as well

So, I think regardless of whether this gets merged, [0] is probably worth
merging as I agree that it simplifies the current logic for setting the ref
obj id and is a purely positive change on its own.

When I was originally typing my response to your email, I was wondering
whether it would be useful to keep the state in the caller saved registers
for the logic in 7360 - 7489 in general for the future even if [0] is
applied. It's probably simpler, however, to keep what we have now and just
reset all of the registers. So if [0] gets merged, I can just remove this
patch from the set.

> [0] https://lore.kernel.org/bpf/20220722175807.4038317-1-joannelkoong@gmail.com/#t
> 
> [1] https://lore.kernel.org/bpf/20220726184706.954822-1-joannelkoong@gmail.com/T/#t
> 
> > Signed-off-by: David Vernet <void@manifault.com>
> > ---
> >  kernel/bpf/verifier.c | 15 ++++++++++-----
> >  1 file changed, 10 insertions(+), 5 deletions(-)
> >
> > diff --git a/kernel/bpf/verifier.c b/kernel/bpf/verifier.c
> > index 096fdac70165..938ba1536249 100644
> > --- a/kernel/bpf/verifier.c
> > +++ b/kernel/bpf/verifier.c
> > @@ -7348,11 +7348,9 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
> >         if (err)
> >                 return err;
> >
> > -       /* reset caller saved regs */
> > -       for (i = 0; i < CALLER_SAVED_REGS; i++) {
> > -               mark_reg_not_init(env, regs, caller_saved[i]);
> > -               check_reg_arg(env, caller_saved[i], DST_OP_NO_MARK);
> > -       }
> > +       /* reset return reg */
> > +       mark_reg_not_init(env, regs, BPF_REG_0);
> > +       check_reg_arg(env, BPF_REG_0, DST_OP_NO_MARK);
> >
> >         /* helper call returns 64-bit value. */
> >         regs[BPF_REG_0].subreg_def = DEF_NOT_SUBREG;
> > @@ -7488,6 +7486,13 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
> >                 regs[BPF_REG_0].ref_obj_id = dynptr_id;
> >         }
> >
> > +       /* reset remaining caller saved regs */
> > +       BUILD_BUG_ON(caller_saved[0] != BPF_REG_0);
> 
> nit: caller_saved is a read-only const, so I don't think this line is needed

It being a read-only const is was why I made this a BUILD_BUG_ON. My
intention here was to ensure that we're not accidentally skipping the
resetting of caller_saved[0]. The original code iterated from
caller_saved[0] -> caller_saved[CALLER_SAVED_REGS - 1]. Now that we're
starting from caller_saved[1], this compile-time assertion verifies that
we're not accidentally skipping caller_saved[0] by checking that it's the
same as BPF_REG_0, which is reset above. Does that make sense?

> > +       for (i = 1; i < CALLER_SAVED_REGS; i++) {
> 
> nit: maybe "for i = BPF_REG_1" ?

Good suggestion, will apply in the v2 (if there is one and we don't decide
to just go with [0] :-))

Thanks,
David

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 3/5] bpf: Add bpf_user_ringbuf_drain() helper
  2022-08-08 15:53 ` [PATCH 3/5] bpf: Add bpf_user_ringbuf_drain() helper David Vernet
@ 2022-08-08 21:55   ` kernel test robot
  2022-08-11 23:22   ` Andrii Nakryiko
  1 sibling, 0 replies; 28+ messages in thread
From: kernel test robot @ 2022-08-08 21:55 UTC (permalink / raw)
  To: David Vernet, bpf
  Cc: kbuild-all, ast, daniel, andrii, john.fastabend, martin.lau,
	song, yhs, kpsingh, sdf, haoluo, jolsa, tj, joannelkoong,
	linux-kernel, Kernel-team

Hi David,

Thank you for the patch! Perhaps something to improve:

[auto build test WARNING on bpf-next/master]
[also build test WARNING on bpf/master linus/master next-20220808]
[cannot apply to v5.19]
[If your patch is applied to the wrong git tree, kindly drop us a note.
And when submitting patch, we suggest to use '--base' as documented in
https://git-scm.com/docs/git-format-patch#_base_tree_information]

url:    https://github.com/intel-lab-lkp/linux/commits/David-Vernet/bpf-Clear-callee-saved-regs-after-updating-REG0/20220808-235558
base:   https://git.kernel.org/pub/scm/linux/kernel/git/bpf/bpf-next.git master
config: arc-allyesconfig (https://download.01.org/0day-ci/archive/20220809/202208090505.me3WqXOM-lkp@intel.com/config)
compiler: arceb-elf-gcc (GCC) 12.1.0
reproduce (this is a W=1 build):
        wget https://raw.githubusercontent.com/intel/lkp-tests/master/sbin/make.cross -O ~/bin/make.cross
        chmod +x ~/bin/make.cross
        # https://github.com/intel-lab-lkp/linux/commit/70db51b231aeddaa6eecd19afeeebef610ae2686
        git remote add linux-review https://github.com/intel-lab-lkp/linux
        git fetch --no-tags linux-review David-Vernet/bpf-Clear-callee-saved-regs-after-updating-REG0/20220808-235558
        git checkout 70db51b231aeddaa6eecd19afeeebef610ae2686
        # save the config file
        mkdir build_dir && cp config build_dir/.config
        COMPILER_INSTALL_PATH=$HOME/0day COMPILER=gcc-12.1.0 make.cross W=1 O=build_dir ARCH=arc SHELL=/bin/bash kernel/bpf/

If you fix the issue, kindly add following tag where applicable
Reported-by: kernel test robot <lkp@intel.com>

All warnings (new ones prefixed by >>):

   kernel/bpf/ringbuf.c: In function '__bpf_user_ringbuf_poll':
>> kernel/bpf/ringbuf.c:653:15: warning: cast to pointer from integer of different size [-Wint-to-pointer-cast]
     653 |         hdr = (u32 *)((uintptr_t)rb->data + (cons_pos & rb->mask));
         |               ^
   kernel/bpf/ringbuf.c:682:19: warning: cast to pointer from integer of different size [-Wint-to-pointer-cast]
     682 |         *sample = (void *)((uintptr_t)rb->data +
         |                   ^
   kernel/bpf/ringbuf.c: In function '____bpf_user_ringbuf_drain':
>> kernel/bpf/ringbuf.c:736:40: warning: cast from pointer to integer of different size [-Wpointer-to-int-cast]
     736 |                         ret = callback((u64)&dynptr,
         |                                        ^


vim +653 kernel/bpf/ringbuf.c

   626	
   627	static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample,
   628					   u32 *size)
   629	{
   630		unsigned long cons_pos, prod_pos;
   631		u32 sample_len, total_len;
   632		u32 *hdr;
   633		int err;
   634		int busy = 0;
   635	
   636		/* If another consumer is already consuming a sample, wait for them to
   637		 * finish.
   638		 */
   639		if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
   640			return -EBUSY;
   641	
   642		/* Synchronizes with smp_store_release() in user-space. */
   643		prod_pos = smp_load_acquire(&rb->producer_pos);
   644		/* Synchronizes with smp_store_release() in
   645		 * __bpf_user_ringbuf_sample_release().
   646		 */
   647		cons_pos = smp_load_acquire(&rb->consumer_pos);
   648		if (cons_pos >= prod_pos) {
   649			atomic_set(&rb->busy, 0);
   650			return -ENODATA;
   651		}
   652	
 > 653		hdr = (u32 *)((uintptr_t)rb->data + (cons_pos & rb->mask));
   654		sample_len = *hdr;
   655	
   656		/* Check that the sample can fit into a dynptr. */
   657		err = bpf_dynptr_check_size(sample_len);
   658		if (err) {
   659			atomic_set(&rb->busy, 0);
   660			return err;
   661		}
   662	
   663		/* Check that the sample fits within the region advertised by the
   664		 * consumer position.
   665		 */
   666		total_len = sample_len + BPF_RINGBUF_HDR_SZ;
   667		if (total_len > prod_pos - cons_pos) {
   668			atomic_set(&rb->busy, 0);
   669			return -E2BIG;
   670		}
   671	
   672		/* Check that the sample fits within the data region of the ring buffer.
   673		 */
   674		if (total_len > rb->mask + 1) {
   675			atomic_set(&rb->busy, 0);
   676			return -E2BIG;
   677		}
   678	
   679		/* consumer_pos is updated when the sample is released.
   680		 */
   681	
   682		*sample = (void *)((uintptr_t)rb->data +
   683				   ((cons_pos + BPF_RINGBUF_HDR_SZ) & rb->mask));
   684		*size = sample_len;
   685	
   686		return 0;
   687	}
   688	
   689	static void
   690	__bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size,
   691					  u64 flags)
   692	{
   693	
   694	
   695		/* To release the ringbuffer, just increment the producer position to
   696		 * signal that a new sample can be consumed. The busy bit is cleared by
   697		 * userspace when posting a new sample to the ringbuffer.
   698		 */
   699		smp_store_release(&rb->consumer_pos, rb->consumer_pos + size +
   700				  BPF_RINGBUF_HDR_SZ);
   701	
   702		if (flags & BPF_RB_FORCE_WAKEUP || !(flags & BPF_RB_NO_WAKEUP))
   703			irq_work_queue(&rb->work);
   704	
   705		atomic_set(&rb->busy, 0);
   706	}
   707	
   708	BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map,
   709		   void *, callback_fn, void *, callback_ctx, u64, flags)
   710	{
   711		struct bpf_ringbuf *rb;
   712		long num_samples = 0, ret = 0;
   713		int err;
   714		bpf_callback_t callback = (bpf_callback_t)callback_fn;
   715		u64 wakeup_flags = BPF_RB_NO_WAKEUP | BPF_RB_FORCE_WAKEUP;
   716	
   717		if (unlikely(flags & ~wakeup_flags))
   718			return -EINVAL;
   719	
   720		/* The two wakeup flags are mutually exclusive. */
   721		if (unlikely((flags & wakeup_flags) == wakeup_flags))
   722			return -EINVAL;
   723	
   724		rb = container_of(map, struct bpf_ringbuf_map, map)->rb;
   725		do {
   726			u32 size;
   727			void *sample;
   728	
   729			err = __bpf_user_ringbuf_poll(rb, &sample, &size);
   730	
   731			if (!err) {
   732				struct bpf_dynptr_kern dynptr;
   733	
   734				bpf_dynptr_init(&dynptr, sample, BPF_DYNPTR_TYPE_LOCAL,
   735						0, size);
 > 736				ret = callback((u64)&dynptr,
   737						(u64)(uintptr_t)callback_ctx, 0, 0, 0);
   738	
   739				__bpf_user_ringbuf_sample_release(rb, size, flags);
   740				num_samples++;
   741			}
   742		} while (err == 0 && num_samples < 4096 && ret == 0);
   743	
   744		return num_samples;
   745	}
   746	

-- 
0-DAY CI Kernel Test Service
https://01.org/lkp

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 1/5] bpf: Clear callee saved regs after updating REG0
  2022-08-08 18:50   ` David Vernet
@ 2022-08-08 23:32     ` Joanne Koong
  2022-08-09 12:47       ` David Vernet
  0 siblings, 1 reply; 28+ messages in thread
From: Joanne Koong @ 2022-08-08 23:32 UTC (permalink / raw)
  To: David Vernet
  Cc: bpf, ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, linux-kernel, Kernel-team

On Mon, Aug 8, 2022 at 11:50 AM David Vernet <void@manifault.com> wrote:
>
> On Mon, Aug 08, 2022 at 11:14:48AM -0700, Joanne Koong wrote:
> > On Mon, Aug 8, 2022 at 8:53 AM David Vernet <void@manifault.com> wrote:
> > >
> > > In the verifier, we currently reset all of the registers containing caller
> > > saved args before updating the callee's return register (REG0). In a
> > > follow-on patch, we will need to be able to be able to inspect the caller
> > > saved registers when updating REG0 to determine if a dynptr that's passed
> > > to a helper function was allocated by a helper, or allocated by a program.
> > >
> > > This patch therefore updates check_helper_call() to clear the caller saved
> > > regs after updating REG0.
> > >
> > Overall lgtm
>
> Thanks for the quick review!
>
> > There's a patch [0] that finds + stores the ref obj id before the
> > caller saved regs get reset, which would make this patch not needed.
>
> Interesting. Indeed, that would solve this issue, and I'm fine with that
> approach as well, if not preferential to it.
>
> > That hasn't been merged in yet though and I think there's pros for
> > either approach.
> >
> > In the one where we find + store the ref obj id before any caller
> > saved regs get reset, the pro is that getting the dynptr metadata (eg
> > ref obj id and in the near future, the dynptr type as well) earlier
> > will be useful (eg when we add skb/xdp dynptrs [1], we'll need to know
> > the type of the dynptr in order to determine whether to set the return
> > reg as PTR_TO_PACKET). In this patch, the pro is that the logic is a
> > lot more obvious to readers that the ref obj id for the dynptr gets
> > found and set in order to store it in the return reg's ref obj id.
> >
> > I personally lean more towards the approach in [0] because I think
> > that ends up being cleaner for future extensibility, but I don't feel
> > strongly about it and would be happy going with this approach as well
>
> So, I think regardless of whether this gets merged, [0] is probably worth
> merging as I agree that it simplifies the current logic for setting the ref
> obj id and is a purely positive change on its own.
>
> When I was originally typing my response to your email, I was wondering
> whether it would be useful to keep the state in the caller saved registers
> for the logic in 7360 - 7489 in general for the future even if [0] is
> applied. It's probably simpler, however, to keep what we have now and just
> reset all of the registers. So if [0] gets merged, I can just remove this
> patch from the set.

sounds great!

>
> > [0] https://lore.kernel.org/bpf/20220722175807.4038317-1-joannelkoong@gmail.com/#t
> >
> > [1] https://lore.kernel.org/bpf/20220726184706.954822-1-joannelkoong@gmail.com/T/#t
> >
> > > Signed-off-by: David Vernet <void@manifault.com>
> > > ---
> > >  kernel/bpf/verifier.c | 15 ++++++++++-----
> > >  1 file changed, 10 insertions(+), 5 deletions(-)
> > >
> > > diff --git a/kernel/bpf/verifier.c b/kernel/bpf/verifier.c
> > > index 096fdac70165..938ba1536249 100644
> > > --- a/kernel/bpf/verifier.c
> > > +++ b/kernel/bpf/verifier.c
> > > @@ -7348,11 +7348,9 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
> > >         if (err)
> > >                 return err;
> > >
> > > -       /* reset caller saved regs */
> > > -       for (i = 0; i < CALLER_SAVED_REGS; i++) {
> > > -               mark_reg_not_init(env, regs, caller_saved[i]);
> > > -               check_reg_arg(env, caller_saved[i], DST_OP_NO_MARK);
> > > -       }
> > > +       /* reset return reg */
> > > +       mark_reg_not_init(env, regs, BPF_REG_0);
> > > +       check_reg_arg(env, BPF_REG_0, DST_OP_NO_MARK);
> > >
> > >         /* helper call returns 64-bit value. */
> > >         regs[BPF_REG_0].subreg_def = DEF_NOT_SUBREG;
> > > @@ -7488,6 +7486,13 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
> > >                 regs[BPF_REG_0].ref_obj_id = dynptr_id;
> > >         }
> > >
> > > +       /* reset remaining caller saved regs */
> > > +       BUILD_BUG_ON(caller_saved[0] != BPF_REG_0);
> >
> > nit: caller_saved is a read-only const, so I don't think this line is needed
>
> It being a read-only const is was why I made this a BUILD_BUG_ON. My
> intention here was to ensure that we're not accidentally skipping the
> resetting of caller_saved[0]. The original code iterated from
> caller_saved[0] -> caller_saved[CALLER_SAVED_REGS - 1]. Now that we're
> starting from caller_saved[1], this compile-time assertion verifies that
> we're not accidentally skipping caller_saved[0] by checking that it's the
> same as BPF_REG_0, which is reset above. Does that make sense?

I think it's an invariant that r0 - r5 are the caller saved args and
that caller_saved[0] will always be BPF_REG_0. I'm having a hard time
seeing a case where this would change in the future, but then again, I
am also not a fortune teller so maybe I am wrong here :) I don't think
it's a big deal though so I don't feel strongly about this

>
> > > +       for (i = 1; i < CALLER_SAVED_REGS; i++) {
> >
> > nit: maybe "for i = BPF_REG_1" ?
>
> Good suggestion, will apply in the v2 (if there is one and we don't decide
> to just go with [0] :-))
>
> Thanks,
> David

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 1/5] bpf: Clear callee saved regs after updating REG0
  2022-08-08 23:32     ` Joanne Koong
@ 2022-08-09 12:47       ` David Vernet
  0 siblings, 0 replies; 28+ messages in thread
From: David Vernet @ 2022-08-09 12:47 UTC (permalink / raw)
  To: Joanne Koong
  Cc: bpf, ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, linux-kernel, Kernel-team

On Mon, Aug 08, 2022 at 04:32:39PM -0700, Joanne Koong wrote:

[...]

> > It being a read-only const is was why I made this a BUILD_BUG_ON. My
> > intention here was to ensure that we're not accidentally skipping the
> > resetting of caller_saved[0]. The original code iterated from
> > caller_saved[0] -> caller_saved[CALLER_SAVED_REGS - 1]. Now that we're
> > starting from caller_saved[1], this compile-time assertion verifies that
> > we're not accidentally skipping caller_saved[0] by checking that it's the
> > same as BPF_REG_0, which is reset above. Does that make sense?
> 
> I think it's an invariant that r0 - r5 are the caller saved args and
> that caller_saved[0] will always be BPF_REG_0. I'm having a hard time
> seeing a case where this would change in the future, but then again, I
> am also not a fortune teller so maybe I am wrong here :) I don't think
> it's a big deal though so I don't feel strongly about this

I agree that it seems very unlikely to change, but I don't see the harm in
leaving it in. Compile time checks are very fast, and are meant for cases
such as these to check constant, build-time invariants. If you feel
strongly, I can remove it.

Thanks,
David

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 3/5] bpf: Add bpf_user_ringbuf_drain() helper
  2022-08-08 15:53 ` [PATCH 3/5] bpf: Add bpf_user_ringbuf_drain() helper David Vernet
  2022-08-08 21:55   ` kernel test robot
@ 2022-08-11 23:22   ` Andrii Nakryiko
  2022-08-12  0:01     ` David Vernet
  2022-08-12  0:46     ` David Vernet
  1 sibling, 2 replies; 28+ messages in thread
From: Andrii Nakryiko @ 2022-08-11 23:22 UTC (permalink / raw)
  To: David Vernet
  Cc: bpf, ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, joannelkoong, linux-kernel,
	Kernel-team

On Mon, Aug 8, 2022 at 8:54 AM David Vernet <void@manifault.com> wrote:
>
> Now that we have a BPF_MAP_TYPE_USER_RINGBUF map type, we need to add a
> helper function that allows BPF programs to drain samples from the ring
> buffer, and invoke a callback for each. This patch adds a new
> bpf_user_ringbuf_drain() helper that provides this abstraction.
>
> In order to support this, we needed to also add a new PTR_TO_DYNPTR
> register type to reflect a dynptr that was allocated by a helper function
> and passed to a BPF program. The verifier currently only supports
> PTR_TO_DYNPTR registers that are also DYNPTR_TYPE_LOCAL and MEM_ALLOC.

This commit message is a bit too laconic. There is a lot of
implications of various parts of this patch, it would be great to
highlight most important ones. Please consider elaborating a bit more.

>
> Signed-off-by: David Vernet <void@manifault.com>
> ---
>  include/linux/bpf.h            |   6 +-
>  include/uapi/linux/bpf.h       |   8 ++
>  kernel/bpf/helpers.c           |   2 +
>  kernel/bpf/ringbuf.c           | 157 +++++++++++++++++++++++++++++++--
>  kernel/bpf/verifier.c          |  57 +++++++++++-
>  tools/include/uapi/linux/bpf.h |   8 ++
>  6 files changed, 228 insertions(+), 10 deletions(-)
>
> diff --git a/include/linux/bpf.h b/include/linux/bpf.h
> index 20c26aed7896..4d0d5f2a3ac8 100644
> --- a/include/linux/bpf.h
> +++ b/include/linux/bpf.h
> @@ -401,7 +401,7 @@ enum bpf_type_flag {
>         /* DYNPTR points to memory local to the bpf program. */
>         DYNPTR_TYPE_LOCAL       = BIT(8 + BPF_BASE_TYPE_BITS),
>
> -       /* DYNPTR points to a ringbuf record. */
> +       /* DYNPTR points to a kernel-produced ringbuf record. */
>         DYNPTR_TYPE_RINGBUF     = BIT(9 + BPF_BASE_TYPE_BITS),
>
>         /* Size is known at compile time. */
> @@ -606,6 +606,7 @@ enum bpf_reg_type {
>         PTR_TO_MEM,              /* reg points to valid memory region */
>         PTR_TO_BUF,              /* reg points to a read/write buffer */
>         PTR_TO_FUNC,             /* reg points to a bpf program function */
> +       PTR_TO_DYNPTR,           /* reg points to a dynptr */
>         __BPF_REG_TYPE_MAX,
>
>         /* Extended reg_types. */
> @@ -2410,6 +2411,7 @@ extern const struct bpf_func_proto bpf_loop_proto;
>  extern const struct bpf_func_proto bpf_copy_from_user_task_proto;
>  extern const struct bpf_func_proto bpf_set_retval_proto;
>  extern const struct bpf_func_proto bpf_get_retval_proto;
> +extern const struct bpf_func_proto bpf_user_ringbuf_drain_proto;
>
>  const struct bpf_func_proto *tracing_prog_func_proto(
>    enum bpf_func_id func_id, const struct bpf_prog *prog);
> @@ -2554,7 +2556,7 @@ enum bpf_dynptr_type {
>         BPF_DYNPTR_TYPE_INVALID,
>         /* Points to memory that is local to the bpf program */
>         BPF_DYNPTR_TYPE_LOCAL,
> -       /* Underlying data is a ringbuf record */
> +       /* Underlying data is a kernel-produced ringbuf record */
>         BPF_DYNPTR_TYPE_RINGBUF,
>  };
>
> diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h
> index a341f877b230..ca125648d7fd 100644
> --- a/include/uapi/linux/bpf.h
> +++ b/include/uapi/linux/bpf.h
> @@ -5332,6 +5332,13 @@ union bpf_attr {
>   *             **-EACCES** if the SYN cookie is not valid.
>   *
>   *             **-EPROTONOSUPPORT** if CONFIG_IPV6 is not builtin.
> + *
> + * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags)
> + *     Description
> + *             Drain samples from the specified user ringbuffer, and invoke the
> + *             provided callback for each such sample.

please specify what's the expected signature of callback_fn

> + *     Return
> + *             An error if a sample could not be drained.

Negative error, right? And might be worth it briefly describing what
are the situation(s) when you won't be able to drain a sample?

Also please clarify if having no sample to drain is an error or not?

It's also important to specify that if no error (and -ENODATA
shouldn't be an error, actually) occurred then we get number of
consumed samples back.

>   */
>  #define __BPF_FUNC_MAPPER(FN)          \
>         FN(unspec),                     \
> @@ -5542,6 +5549,7 @@ union bpf_attr {
>         FN(tcp_raw_gen_syncookie_ipv6), \
>         FN(tcp_raw_check_syncookie_ipv4),       \
>         FN(tcp_raw_check_syncookie_ipv6),       \
> +       FN(user_ringbuf_drain),         \
>         /* */
>
>  /* integer value in 'imm' field of BPF_CALL instruction selects which helper
> diff --git a/kernel/bpf/helpers.c b/kernel/bpf/helpers.c
> index 1f961f9982d2..b8097876014c 100644
> --- a/kernel/bpf/helpers.c
> +++ b/kernel/bpf/helpers.c
> @@ -1647,6 +1647,8 @@ bpf_base_func_proto(enum bpf_func_id func_id)
>                 return &bpf_dynptr_write_proto;
>         case BPF_FUNC_dynptr_data:
>                 return &bpf_dynptr_data_proto;
> +       case BPF_FUNC_user_ringbuf_drain:
> +               return &bpf_user_ringbuf_drain_proto;
>         default:
>                 break;
>         }
> diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
> index 29e2de42df15..fc589fc8eb7c 100644
> --- a/kernel/bpf/ringbuf.c
> +++ b/kernel/bpf/ringbuf.c
> @@ -291,16 +291,33 @@ static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb)
>  }
>
>  static __poll_t ringbuf_map_poll(struct bpf_map *map, struct file *filp,
> -                                struct poll_table_struct *pts)
> +                                struct poll_table_struct *pts,
> +                                bool kernel_producer)
>  {
>         struct bpf_ringbuf_map *rb_map;
> +       bool buffer_empty;
>
>         rb_map = container_of(map, struct bpf_ringbuf_map, map);
>         poll_wait(filp, &rb_map->rb->waitq, pts);
>
> -       if (ringbuf_avail_data_sz(rb_map->rb))
> -               return EPOLLIN | EPOLLRDNORM;
> -       return 0;
> +       buffer_empty = !ringbuf_avail_data_sz(rb_map->rb);
> +
> +       if (kernel_producer)
> +               return buffer_empty ? 0 : EPOLLIN | EPOLLRDNORM;
> +       else
> +               return buffer_empty ? EPOLLOUT | EPOLLWRNORM : 0;
> +}
> +
> +static __poll_t ringbuf_map_poll_kern(struct bpf_map *map, struct file *filp,
> +                                     struct poll_table_struct *pts)
> +{
> +       return ringbuf_map_poll(map, filp, pts, true);
> +}
> +
> +static __poll_t ringbuf_map_poll_user(struct bpf_map *map, struct file *filp,
> +                                     struct poll_table_struct *pts)
> +{
> +       return ringbuf_map_poll(map, filp, pts, false);
>  }

This is an even stronger case where I think we should keep two
implementations completely separate. Please keep existing
ringbuf_map_poll as is and just add a user variant as a separate
implementation

>
>  BTF_ID_LIST_SINGLE(ringbuf_map_btf_ids, struct, bpf_ringbuf_map)
> @@ -309,7 +326,7 @@ const struct bpf_map_ops ringbuf_map_ops = {
>         .map_alloc = ringbuf_map_alloc,
>         .map_free = ringbuf_map_free,
>         .map_mmap = ringbuf_map_mmap_kern,
> -       .map_poll = ringbuf_map_poll,
> +       .map_poll = ringbuf_map_poll_kern,
>         .map_lookup_elem = ringbuf_map_lookup_elem,
>         .map_update_elem = ringbuf_map_update_elem,
>         .map_delete_elem = ringbuf_map_delete_elem,
> @@ -323,6 +340,7 @@ const struct bpf_map_ops user_ringbuf_map_ops = {
>         .map_alloc = ringbuf_map_alloc,
>         .map_free = ringbuf_map_free,
>         .map_mmap = ringbuf_map_mmap_user,
> +       .map_poll = ringbuf_map_poll_user,
>         .map_lookup_elem = ringbuf_map_lookup_elem,
>         .map_update_elem = ringbuf_map_update_elem,
>         .map_delete_elem = ringbuf_map_delete_elem,
> @@ -605,3 +623,132 @@ const struct bpf_func_proto bpf_ringbuf_discard_dynptr_proto = {
>         .arg1_type      = ARG_PTR_TO_DYNPTR | DYNPTR_TYPE_RINGBUF | OBJ_RELEASE,
>         .arg2_type      = ARG_ANYTHING,
>  };
> +
> +static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample,
> +                                  u32 *size)

"poll" part is quite confusing as it has nothing to do with epoll and
the other poll implementation. Maybe "peek"? It peek into next sample
without consuming it, seems appropriate

nit: this declaration can also stay on single line

> +{
> +       unsigned long cons_pos, prod_pos;
> +       u32 sample_len, total_len;
> +       u32 *hdr;
> +       int err;
> +       int busy = 0;

nit: combine matching types:

u32 sample_len, total_len, *hdr;
int err, busy = 0;

> +
> +       /* If another consumer is already consuming a sample, wait for them to
> +        * finish.
> +        */
> +       if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
> +               return -EBUSY;
> +
> +       /* Synchronizes with smp_store_release() in user-space. */
> +       prod_pos = smp_load_acquire(&rb->producer_pos);

I think we should enforce that prod_pos is a multiple of 8

> +       /* Synchronizes with smp_store_release() in
> +        * __bpf_user_ringbuf_sample_release().
> +        */
> +       cons_pos = smp_load_acquire(&rb->consumer_pos);
> +       if (cons_pos >= prod_pos) {
> +               atomic_set(&rb->busy, 0);
> +               return -ENODATA;
> +       }
> +
> +       hdr = (u32 *)((uintptr_t)rb->data + (cons_pos & rb->mask));
> +       sample_len = *hdr;

do we need smp_load_acquire() here? libbpf's ring_buffer
implementation uses load_acquire here


> +
> +       /* Check that the sample can fit into a dynptr. */
> +       err = bpf_dynptr_check_size(sample_len);
> +       if (err) {
> +               atomic_set(&rb->busy, 0);
> +               return err;
> +       }
> +
> +       /* Check that the sample fits within the region advertised by the
> +        * consumer position.
> +        */
> +       total_len = sample_len + BPF_RINGBUF_HDR_SZ;

round up to closest multiple of 8? All the pointers into ringbuf data
area should be 8-byte aligned

> +       if (total_len > prod_pos - cons_pos) {
> +               atomic_set(&rb->busy, 0);
> +               return -E2BIG;
> +       }
> +
> +       /* Check that the sample fits within the data region of the ring buffer.
> +        */
> +       if (total_len > rb->mask + 1) {
> +               atomic_set(&rb->busy, 0);
> +               return -E2BIG;
> +       }
> +
> +       /* consumer_pos is updated when the sample is released.
> +        */
> +

nit: unnecessary empty line

and please keep single-line comments as single-line, no */ on separate
line in such case

> +       *sample = (void *)((uintptr_t)rb->data +
> +                          ((cons_pos + BPF_RINGBUF_HDR_SZ) & rb->mask));
> +       *size = sample_len;
> +
> +       return 0;
> +}
> +
> +static void
> +__bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size,
> +                                 u64 flags)

try to keep single lines if they are under 100 characters

> +{
> +
> +

empty lines, why?

> +       /* To release the ringbuffer, just increment the producer position to
> +        * signal that a new sample can be consumed. The busy bit is cleared by
> +        * userspace when posting a new sample to the ringbuffer.
> +        */
> +       smp_store_release(&rb->consumer_pos, rb->consumer_pos + size +
> +                         BPF_RINGBUF_HDR_SZ);
> +
> +       if (flags & BPF_RB_FORCE_WAKEUP || !(flags & BPF_RB_NO_WAKEUP))

please use () around bit operator expressions

> +               irq_work_queue(&rb->work);
> +
> +       atomic_set(&rb->busy, 0);

set busy before scheduling irq_work? why delaying?

> +}
> +
> +BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map,
> +          void *, callback_fn, void *, callback_ctx, u64, flags)
> +{
> +       struct bpf_ringbuf *rb;
> +       long num_samples = 0, ret = 0;
> +       int err;
> +       bpf_callback_t callback = (bpf_callback_t)callback_fn;
> +       u64 wakeup_flags = BPF_RB_NO_WAKEUP | BPF_RB_FORCE_WAKEUP;
> +
> +       if (unlikely(flags & ~wakeup_flags))
> +               return -EINVAL;
> +
> +       /* The two wakeup flags are mutually exclusive. */
> +       if (unlikely((flags & wakeup_flags) == wakeup_flags))
> +               return -EINVAL;

we don't check this for existing ringbuf, so maybe let's keep it
consistent? FORCE_WAKEUP is just stronger than NO_WAKEUP

> +
> +       rb = container_of(map, struct bpf_ringbuf_map, map)->rb;
> +       do {
> +               u32 size;
> +               void *sample;
> +
> +               err = __bpf_user_ringbuf_poll(rb, &sample, &size);
> +

nit: don't keep empty line between function call and error check

> +               if (!err) {

so -ENODATA is a special error and you should stop if you get that.
But for any other error we should propagate error back, not just
silently consuming it

maybe

err = ...
if (err) {
    if (err == -ENODATA)
      break;
    return err;
}

?

> +                       struct bpf_dynptr_kern dynptr;
> +
> +                       bpf_dynptr_init(&dynptr, sample, BPF_DYNPTR_TYPE_LOCAL,
> +                                       0, size);

we should try to avoid unnecessary re-initialization of dynptr, let's
initialize it once and then just update data and size field inside the
loop?


> +                       ret = callback((u64)&dynptr,
> +                                       (u64)(uintptr_t)callback_ctx, 0, 0, 0);
> +
> +                       __bpf_user_ringbuf_sample_release(rb, size, flags);
> +                       num_samples++;
> +               }
> +       } while (err == 0 && num_samples < 4096 && ret == 0);
> +

4096 is pretty arbitrary. Definitely worth noting it somewhere and it
seems somewhat low, tbh...

ideally we'd cond_resched() from time to time, but that would require
BPF program to be sleepable, so we can't do that :(


> +       return num_samples;
> +}
> +
> +const struct bpf_func_proto bpf_user_ringbuf_drain_proto = {
> +       .func           = bpf_user_ringbuf_drain,
> +       .ret_type       = RET_INTEGER,
> +       .arg1_type      = ARG_CONST_MAP_PTR,
> +       .arg2_type      = ARG_PTR_TO_FUNC,
> +       .arg3_type      = ARG_PTR_TO_STACK_OR_NULL,
> +       .arg4_type      = ARG_ANYTHING,
> +};
> diff --git a/kernel/bpf/verifier.c b/kernel/bpf/verifier.c
> index 4a9562c62af0..99dfdc3ed187 100644
> --- a/kernel/bpf/verifier.c
> +++ b/kernel/bpf/verifier.c
> @@ -555,6 +555,7 @@ static const char *reg_type_str(struct bpf_verifier_env *env,
>                 [PTR_TO_BUF]            = "buf",
>                 [PTR_TO_FUNC]           = "func",
>                 [PTR_TO_MAP_KEY]        = "map_key",
> +               [PTR_TO_DYNPTR]         = "dynptr_ptr",
>         };
>
>         if (type & PTR_MAYBE_NULL) {
> @@ -5656,6 +5657,12 @@ static const struct bpf_reg_types stack_ptr_types = { .types = { PTR_TO_STACK }
>  static const struct bpf_reg_types const_str_ptr_types = { .types = { PTR_TO_MAP_VALUE } };
>  static const struct bpf_reg_types timer_types = { .types = { PTR_TO_MAP_VALUE } };
>  static const struct bpf_reg_types kptr_types = { .types = { PTR_TO_MAP_VALUE } };
> +static const struct bpf_reg_types dynptr_types = {
> +       .types = {
> +               PTR_TO_STACK,
> +               PTR_TO_DYNPTR | MEM_ALLOC | DYNPTR_TYPE_LOCAL,
> +       }
> +};
>
>  static const struct bpf_reg_types *compatible_reg_types[__BPF_ARG_TYPE_MAX] = {
>         [ARG_PTR_TO_MAP_KEY]            = &map_key_value_types,
> @@ -5682,7 +5689,7 @@ static const struct bpf_reg_types *compatible_reg_types[__BPF_ARG_TYPE_MAX] = {
>         [ARG_PTR_TO_CONST_STR]          = &const_str_ptr_types,
>         [ARG_PTR_TO_TIMER]              = &timer_types,
>         [ARG_PTR_TO_KPTR]               = &kptr_types,
> -       [ARG_PTR_TO_DYNPTR]             = &stack_ptr_types,
> +       [ARG_PTR_TO_DYNPTR]             = &dynptr_types,
>  };
>
>  static int check_reg_type(struct bpf_verifier_env *env, u32 regno,
> @@ -6025,6 +6032,13 @@ static int check_func_arg(struct bpf_verifier_env *env, u32 arg,
>                 err = check_mem_size_reg(env, reg, regno, true, meta);
>                 break;
>         case ARG_PTR_TO_DYNPTR:
> +               /* We only need to check for initialized / uninitialized helper
> +                * dynptr args if the dynptr is not MEM_ALLOC, as the assumption
> +                * is that if it is, that a helper function initialized the
> +                * dynptr on behalf of the BPF program.
> +                */
> +               if (reg->type & MEM_ALLOC)

Isn't PTR_TO_DYNPTR enough indication? Do we need MEM_ALLOC modifier?
Normal dynptr created and used inside BPF program on the stack are
actually PTR_TO_STACK, so that should be enough distinction? Or am I
missing something?

> +                       break;
>                 if (arg_type & MEM_UNINIT) {
>                         if (!is_dynptr_reg_valid_uninit(env, reg)) {
>                                 verbose(env, "Dynptr has to be an uninitialized dynptr\n");
> @@ -6197,7 +6211,9 @@ static int check_map_func_compatibility(struct bpf_verifier_env *env,
>                         goto error;
>                 break;
>         case BPF_MAP_TYPE_USER_RINGBUF:
> -               goto error;
> +               if (func_id != BPF_FUNC_user_ringbuf_drain)
> +                       goto error;
> +               break;
>         case BPF_MAP_TYPE_STACK_TRACE:
>                 if (func_id != BPF_FUNC_get_stackid)
>                         goto error;
> @@ -6317,6 +6333,10 @@ static int check_map_func_compatibility(struct bpf_verifier_env *env,
>                 if (map->map_type != BPF_MAP_TYPE_RINGBUF)
>                         goto error;
>                 break;
> +       case BPF_FUNC_user_ringbuf_drain:
> +               if (map->map_type != BPF_MAP_TYPE_USER_RINGBUF)
> +                       goto error;
> +               break;
>         case BPF_FUNC_get_stackid:
>                 if (map->map_type != BPF_MAP_TYPE_STACK_TRACE)
>                         goto error;
> @@ -6901,6 +6921,29 @@ static int set_find_vma_callback_state(struct bpf_verifier_env *env,
>         return 0;
>  }
>
> +static int set_user_ringbuf_callback_state(struct bpf_verifier_env *env,
> +                                          struct bpf_func_state *caller,
> +                                          struct bpf_func_state *callee,
> +                                          int insn_idx)
> +{
> +       /* bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void
> +        *                        callback_ctx, u64 flags);
> +        * callback_fn(struct bpf_dynptr_t* dynptr, void *callback_ctx);
> +        */
> +       __mark_reg_not_init(env, &callee->regs[BPF_REG_0]);
> +       callee->regs[BPF_REG_1].type = PTR_TO_DYNPTR | DYNPTR_TYPE_LOCAL | MEM_ALLOC;
> +       __mark_reg_known_zero(&callee->regs[BPF_REG_1]);
> +       callee->regs[BPF_REG_2] = caller->regs[BPF_REG_3];
> +
> +       /* unused */
> +       __mark_reg_not_init(env, &callee->regs[BPF_REG_3]);
> +       __mark_reg_not_init(env, &callee->regs[BPF_REG_4]);
> +       __mark_reg_not_init(env, &callee->regs[BPF_REG_5]);
> +
> +       callee->in_callback_fn = true;
> +       return 0;
> +}
> +
>  static int prepare_func_exit(struct bpf_verifier_env *env, int *insn_idx)
>  {
>         struct bpf_verifier_state *state = env->cur_state;
> @@ -7345,6 +7388,10 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
>                         }
>                 }
>                 break;
> +       case BPF_FUNC_user_ringbuf_drain:
> +               err = __check_func_call(env, insn, insn_idx_p, meta.subprogno,
> +                                       set_user_ringbuf_callback_state);
> +               break;
>         }
>
>         if (err)
> @@ -7477,11 +7524,15 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
>                 /* Find the id of the dynptr we're acquiring a reference to */
>                 for (i = 0; i < MAX_BPF_FUNC_REG_ARGS; i++) {
>                         if (arg_type_is_dynptr(fn->arg_type[i])) {
> +                               struct bpf_reg_state *reg = &regs[BPF_REG_1 + i];
> +
>                                 if (dynptr_id) {
>                                         verbose(env, "verifier internal error: multiple dynptr args in func\n");
>                                         return -EFAULT;
>                                 }
> -                               dynptr_id = stack_slot_get_id(env, &regs[BPF_REG_1 + i]);
> +
> +                               if (!(reg->type & MEM_ALLOC))
> +                                       dynptr_id = stack_slot_get_id(env, reg);

this part has changed in bpf-next

>                         }
>                 }
>                 /* For release_reference() */
> diff --git a/tools/include/uapi/linux/bpf.h b/tools/include/uapi/linux/bpf.h
> index ce0ce713faf9..e5e4ab12177c 100644
> --- a/tools/include/uapi/linux/bpf.h
> +++ b/tools/include/uapi/linux/bpf.h
> @@ -5332,6 +5332,13 @@ union bpf_attr {
>   *             **-EACCES** if the SYN cookie is not valid.
>   *
>   *             **-EPROTONOSUPPORT** if CONFIG_IPV6 is not builtin.
> + *
> + * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags)
> + *     Description
> + *             Drain samples from the specified user ringbuffer, and invoke the
> + *             provided callback for each such sample.
> + *     Return
> + *             An error if a sample could not be drained.
>   */
>  #define __BPF_FUNC_MAPPER(FN)          \
>         FN(unspec),                     \
> @@ -5542,6 +5549,7 @@ union bpf_attr {
>         FN(tcp_raw_gen_syncookie_ipv6), \
>         FN(tcp_raw_check_syncookie_ipv4),       \
>         FN(tcp_raw_check_syncookie_ipv6),       \
> +       FN(bpf_user_ringbuf_drain),     \
>         /* */
>
>  /* integer value in 'imm' field of BPF_CALL instruction selects which helper
> --
> 2.30.2
>

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 2/5] bpf: Define new BPF_MAP_TYPE_USER_RINGBUF map type
  2022-08-08 15:53 ` [PATCH 2/5] bpf: Define new BPF_MAP_TYPE_USER_RINGBUF map type David Vernet
@ 2022-08-11 23:22   ` Andrii Nakryiko
  2022-08-12 16:21     ` David Vernet
  2022-08-11 23:29   ` Andrii Nakryiko
  1 sibling, 1 reply; 28+ messages in thread
From: Andrii Nakryiko @ 2022-08-11 23:22 UTC (permalink / raw)
  To: David Vernet
  Cc: bpf, ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, joannelkoong, linux-kernel,
	Kernel-team

On Mon, Aug 8, 2022 at 8:54 AM David Vernet <void@manifault.com> wrote:
>
> We want to support a ringbuf map type where samples are published from
> user-space to BPF programs. BPF currently supports a kernel -> user-space
> circular ringbuffer via the BPF_MAP_TYPE_RINGBUF map type. We'll need to
> define a new map type for user-space -> kernel, as none of the helpers
> exported for BPF_MAP_TYPE_RINGBUF will apply to a user-space producer
> ringbuffer, and we'll want to add one or more helper functions that would
> not apply for a kernel-producer ringbuffer.
>
> This patch therefore adds a new BPF_MAP_TYPE_USER_RINGBUF map type
> definition. The map type is useless in its current form, as there is no way
> to access or use it for anything until we add more BPF helpers. A follow-on
> patch will therefore add a new helper function that allows BPF programs to
> run callbacks on samples that are published to the ringbuffer.
>
> Signed-off-by: David Vernet <void@manifault.com>
> ---
>  include/linux/bpf_types.h      |  1 +
>  include/uapi/linux/bpf.h       |  1 +
>  kernel/bpf/ringbuf.c           | 70 +++++++++++++++++++++++++++++-----
>  kernel/bpf/verifier.c          |  3 ++
>  tools/include/uapi/linux/bpf.h |  1 +
>  tools/lib/bpf/libbpf.c         |  1 +
>  6 files changed, 68 insertions(+), 9 deletions(-)
>

[...]

>
> -static int ringbuf_map_mmap(struct bpf_map *map, struct vm_area_struct *vma)
> +static int ringbuf_map_mmap(struct bpf_map *map, struct vm_area_struct *vma,
> +                           bool kernel_producer)
>  {
>         struct bpf_ringbuf_map *rb_map;
>
>         rb_map = container_of(map, struct bpf_ringbuf_map, map);
>
>         if (vma->vm_flags & VM_WRITE) {
> -               /* allow writable mapping for the consumer_pos only */
> -               if (vma->vm_pgoff != 0 || vma->vm_end - vma->vm_start != PAGE_SIZE)
> +               if (kernel_producer) {
> +                       /* allow writable mapping for the consumer_pos only */
> +                       if (vma->vm_pgoff != 0 || vma->vm_end - vma->vm_start != PAGE_SIZE)
> +                               return -EPERM;
> +               /* For user ringbufs, disallow writable mappings to the
> +                * consumer pointer, and allow writable mappings to both the
> +                * producer position, and the ring buffer data itself.
> +                */
> +               } else if (vma->vm_pgoff == 0)
>                         return -EPERM;

the asymmetrical use of {} in one if branch and not using them in
another is extremely confusing, please don't do that

the way you put big comment inside the wrong if branch also throws me
off, maybe move it before return -EPERM instead with proper
indentation?

sorry for nitpicks, but I've been stuck for a few minutes trying to
figure out what exactly is happening here :)


>         } else {
>                 vma->vm_flags &= ~VM_MAYWRITE;
> @@ -242,6 +271,16 @@ static int ringbuf_map_mmap(struct bpf_map *map, struct vm_area_struct *vma)
>                                    vma->vm_pgoff + RINGBUF_PGOFF);
>  }
>
> +static int ringbuf_map_mmap_kern(struct bpf_map *map, struct vm_area_struct *vma)
> +{
> +       return ringbuf_map_mmap(map, vma, true);
> +}
> +
> +static int ringbuf_map_mmap_user(struct bpf_map *map, struct vm_area_struct *vma)
> +{
> +       return ringbuf_map_mmap(map, vma, false);
> +}

I wouldn't mind if you just have two separate implementations of
ringbuf_map_mmap for _kern and _user cases, tbh, probably would be
clearer as well

> +
>  static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb)
>  {
>         unsigned long cons_pos, prod_pos;
> @@ -269,7 +308,7 @@ const struct bpf_map_ops ringbuf_map_ops = {
>         .map_meta_equal = bpf_map_meta_equal,
>         .map_alloc = ringbuf_map_alloc,
>         .map_free = ringbuf_map_free,
> -       .map_mmap = ringbuf_map_mmap,
> +       .map_mmap = ringbuf_map_mmap_kern,
>         .map_poll = ringbuf_map_poll,
>         .map_lookup_elem = ringbuf_map_lookup_elem,
>         .map_update_elem = ringbuf_map_update_elem,
> @@ -278,6 +317,19 @@ const struct bpf_map_ops ringbuf_map_ops = {
>         .map_btf_id = &ringbuf_map_btf_ids[0],
>  };
>

[...]

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 2/5] bpf: Define new BPF_MAP_TYPE_USER_RINGBUF map type
  2022-08-08 15:53 ` [PATCH 2/5] bpf: Define new BPF_MAP_TYPE_USER_RINGBUF map type David Vernet
  2022-08-11 23:22   ` Andrii Nakryiko
@ 2022-08-11 23:29   ` Andrii Nakryiko
  2022-08-12 16:23     ` David Vernet
  1 sibling, 1 reply; 28+ messages in thread
From: Andrii Nakryiko @ 2022-08-11 23:29 UTC (permalink / raw)
  To: David Vernet
  Cc: bpf, ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, joannelkoong, linux-kernel,
	Kernel-team

On Mon, Aug 8, 2022 at 8:54 AM David Vernet <void@manifault.com> wrote:
>
> We want to support a ringbuf map type where samples are published from
> user-space to BPF programs. BPF currently supports a kernel -> user-space
> circular ringbuffer via the BPF_MAP_TYPE_RINGBUF map type. We'll need to
> define a new map type for user-space -> kernel, as none of the helpers
> exported for BPF_MAP_TYPE_RINGBUF will apply to a user-space producer
> ringbuffer, and we'll want to add one or more helper functions that would
> not apply for a kernel-producer ringbuffer.
>
> This patch therefore adds a new BPF_MAP_TYPE_USER_RINGBUF map type
> definition. The map type is useless in its current form, as there is no way
> to access or use it for anything until we add more BPF helpers. A follow-on
> patch will therefore add a new helper function that allows BPF programs to
> run callbacks on samples that are published to the ringbuffer.
>
> Signed-off-by: David Vernet <void@manifault.com>
> ---
>  include/linux/bpf_types.h      |  1 +
>  include/uapi/linux/bpf.h       |  1 +
>  kernel/bpf/ringbuf.c           | 70 +++++++++++++++++++++++++++++-----
>  kernel/bpf/verifier.c          |  3 ++
>  tools/include/uapi/linux/bpf.h |  1 +
>  tools/lib/bpf/libbpf.c         |  1 +
>  6 files changed, 68 insertions(+), 9 deletions(-)
>
> diff --git a/include/linux/bpf_types.h b/include/linux/bpf_types.h
> index 2b9112b80171..2c6a4f2562a7 100644
> --- a/include/linux/bpf_types.h
> +++ b/include/linux/bpf_types.h
> @@ -126,6 +126,7 @@ BPF_MAP_TYPE(BPF_MAP_TYPE_STRUCT_OPS, bpf_struct_ops_map_ops)
>  #endif
>  BPF_MAP_TYPE(BPF_MAP_TYPE_RINGBUF, ringbuf_map_ops)
>  BPF_MAP_TYPE(BPF_MAP_TYPE_BLOOM_FILTER, bloom_filter_map_ops)
> +BPF_MAP_TYPE(BPF_MAP_TYPE_USER_RINGBUF, user_ringbuf_map_ops)
>
>  BPF_LINK_TYPE(BPF_LINK_TYPE_RAW_TRACEPOINT, raw_tracepoint)
>  BPF_LINK_TYPE(BPF_LINK_TYPE_TRACING, tracing)
> diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h
> index 7bf9ba1329be..a341f877b230 100644
> --- a/include/uapi/linux/bpf.h
> +++ b/include/uapi/linux/bpf.h
> @@ -909,6 +909,7 @@ enum bpf_map_type {
>         BPF_MAP_TYPE_INODE_STORAGE,
>         BPF_MAP_TYPE_TASK_STORAGE,
>         BPF_MAP_TYPE_BLOOM_FILTER,
> +       BPF_MAP_TYPE_USER_RINGBUF,
>  };
>
>  /* Note that tracing related programs such as
> diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
> index ded4faeca192..29e2de42df15 100644
> --- a/kernel/bpf/ringbuf.c
> +++ b/kernel/bpf/ringbuf.c
> @@ -38,12 +38,32 @@ struct bpf_ringbuf {
>         struct page **pages;
>         int nr_pages;
>         spinlock_t spinlock ____cacheline_aligned_in_smp;
> -       /* Consumer and producer counters are put into separate pages to allow
> -        * mapping consumer page as r/w, but restrict producer page to r/o.
> -        * This protects producer position from being modified by user-space
> -        * application and ruining in-kernel position tracking.
> +       /* Consumer and producer counters are put into separate pages to
> +        * allow each position to be mapped with different permissions.
> +        * This prevents a user-space application from modifying the
> +        * position and ruining in-kernel tracking. The permissions of the
> +        * pages depend on who is producing samples: user-space or the
> +        * kernel.
> +        *
> +        * Kernel-producer
> +        * ---------------
> +        * The producer position and data pages are mapped as r/o in
> +        * userspace. For this approach, bits in the header of samples are
> +        * used to signal to user-space, and to other producers, whether a
> +        * sample is currently being written.
> +        *
> +        * User-space producer
> +        * -------------------
> +        * Only the page containing the consumer position, and whether the
> +        * ringbuffer is currently being consumed via a 'busy' bit, are
> +        * mapped r/o in user-space. Sample headers may not be used to
> +        * communicate any information between kernel consumers, as a
> +        * user-space application could modify its contents at any time.
>          */
> -       unsigned long consumer_pos __aligned(PAGE_SIZE);
> +       struct {
> +               unsigned long consumer_pos;
> +               atomic_t busy;

one more thing, why does busy have to be exposed into user-space
mapped memory at all? Can't it be just a private variable in
bpf_ringbuf?

> +       } __aligned(PAGE_SIZE);
>         unsigned long producer_pos __aligned(PAGE_SIZE);
>         char data[] __aligned(PAGE_SIZE);
>  };

[...]

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 4/5] bpf: Add libbpf logic for user-space ring buffer
  2022-08-08 15:53 ` [PATCH 4/5] bpf: Add libbpf logic for user-space ring buffer David Vernet
@ 2022-08-11 23:39   ` Andrii Nakryiko
  2022-08-12 17:28     ` David Vernet
  0 siblings, 1 reply; 28+ messages in thread
From: Andrii Nakryiko @ 2022-08-11 23:39 UTC (permalink / raw)
  To: David Vernet
  Cc: bpf, ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, joannelkoong, linux-kernel,
	Kernel-team

On Mon, Aug 8, 2022 at 8:54 AM David Vernet <void@manifault.com> wrote:
>
> Now that all of the logic is in place in the kernel to support user-space
> produced ringbuffers, we can add the user-space logic to libbpf.
>
> Signed-off-by: David Vernet <void@manifault.com>
> ---
>  kernel/bpf/ringbuf.c          |   7 +-
>  tools/lib/bpf/libbpf.c        |  10 +-
>  tools/lib/bpf/libbpf.h        |  19 +++
>  tools/lib/bpf/libbpf.map      |   6 +
>  tools/lib/bpf/libbpf_probes.c |   1 +
>  tools/lib/bpf/ringbuf.c       | 216 ++++++++++++++++++++++++++++++++++
>  6 files changed, 256 insertions(+), 3 deletions(-)
>
> diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
> index fc589fc8eb7c..a10558e79ec8 100644
> --- a/kernel/bpf/ringbuf.c
> +++ b/kernel/bpf/ringbuf.c
> @@ -639,7 +639,9 @@ static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample,
>         if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
>                 return -EBUSY;
>
> -       /* Synchronizes with smp_store_release() in user-space. */
> +       /* Synchronizes with smp_store_release() in __ring_buffer_user__commit()
> +        * in user-space.
> +        */

let's not hard-code libbpf function names in kernel comments, it's
still user-space

>         prod_pos = smp_load_acquire(&rb->producer_pos);
>         /* Synchronizes with smp_store_release() in
>          * __bpf_user_ringbuf_sample_release().
> @@ -695,6 +697,9 @@ __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size,
>         /* To release the ringbuffer, just increment the producer position to
>          * signal that a new sample can be consumed. The busy bit is cleared by
>          * userspace when posting a new sample to the ringbuffer.
> +        *
> +        * Synchronizes with smp_load_acquire() in ring_buffer_user__reserve()
> +        * in user-space.
>          */

same

>         smp_store_release(&rb->consumer_pos, rb->consumer_pos + size +
>                           BPF_RINGBUF_HDR_SZ);
> diff --git a/tools/lib/bpf/libbpf.c b/tools/lib/bpf/libbpf.c
> index 9c1f2d09f44e..f7fe09dce643 100644
> --- a/tools/lib/bpf/libbpf.c
> +++ b/tools/lib/bpf/libbpf.c
> @@ -2367,6 +2367,12 @@ static size_t adjust_ringbuf_sz(size_t sz)
>         return sz;
>  }
>
> +static bool map_is_ringbuf(const struct bpf_map *map)
> +{
> +       return map->def.type == BPF_MAP_TYPE_RINGBUF ||
> +              map->def.type == BPF_MAP_TYPE_USER_RINGBUF;
> +}
> +
>  static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def)
>  {
>         map->def.type = def->map_type;
> @@ -2381,7 +2387,7 @@ static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def
>         map->btf_value_type_id = def->value_type_id;
>
>         /* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */
> -       if (map->def.type == BPF_MAP_TYPE_RINGBUF)
> +       if (map_is_ringbuf(map))
>                 map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries);
>
>         if (def->parts & MAP_DEF_MAP_TYPE)
> @@ -4363,7 +4369,7 @@ int bpf_map__set_max_entries(struct bpf_map *map, __u32 max_entries)
>         map->def.max_entries = max_entries;
>
>         /* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */
> -       if (map->def.type == BPF_MAP_TYPE_RINGBUF)
> +       if (map_is_ringbuf(map))
>                 map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries);
>
>         return 0;
> diff --git a/tools/lib/bpf/libbpf.h b/tools/lib/bpf/libbpf.h
> index 61493c4cddac..6d1d0539b08d 100644
> --- a/tools/lib/bpf/libbpf.h
> +++ b/tools/lib/bpf/libbpf.h
> @@ -1009,6 +1009,7 @@ LIBBPF_API int bpf_tc_query(const struct bpf_tc_hook *hook,
>
>  /* Ring buffer APIs */
>  struct ring_buffer;
> +struct ring_buffer_user;


I know that I'm the one asking to use ring_buffer_user name, but given
kernel is using USER_RINGBUF and user_ringbuf naming, let's be
consistent and use user_ring_buffer in libbpf as well. I was
contemplating uring_buffer, can't make up my mind if it's better or
not.

Also ring_buffer_user reads like "user of ring buffer", which adds to
confusion :)


>
>  typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size);
>
> @@ -1028,6 +1029,24 @@ LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms);
>  LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb);
>  LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb);
>
> +struct ring_buffer_user_opts {
> +       size_t sz; /* size of this struct, for forward/backward compatibility */
> +};
> +
> +#define ring_buffer_user_opts__last_field sz
> +
> +LIBBPF_API struct ring_buffer_user *
> +ring_buffer_user__new(int map_fd, const struct ring_buffer_user_opts *opts);
> +LIBBPF_API void *ring_buffer_user__reserve(struct ring_buffer_user *rb,
> +                                          uint32_t size);
> +LIBBPF_API void *ring_buffer_user__poll(struct ring_buffer_user *rb,
> +                                       uint32_t size, int timeout_ms);
> +LIBBPF_API void ring_buffer_user__submit(struct ring_buffer_user *rb,
> +                                        void *sample);
> +LIBBPF_API void ring_buffer_user__discard(struct ring_buffer_user *rb,
> +                                         void *sample);
> +LIBBPF_API void ring_buffer_user__free(struct ring_buffer_user *rb);
> +
>  /* Perf buffer APIs */
>  struct perf_buffer;
>
> diff --git a/tools/lib/bpf/libbpf.map b/tools/lib/bpf/libbpf.map
> index 119e6e1ea7f1..8db11040df1b 100644
> --- a/tools/lib/bpf/libbpf.map
> +++ b/tools/lib/bpf/libbpf.map
> @@ -365,4 +365,10 @@ LIBBPF_1.0.0 {
>                 libbpf_bpf_map_type_str;
>                 libbpf_bpf_prog_type_str;
>                 perf_buffer__buffer;
> +               ring_buffer_user__discard;
> +               ring_buffer_user__free;
> +               ring_buffer_user__new;
> +               ring_buffer_user__poll;
> +               ring_buffer_user__reserve;
> +               ring_buffer_user__submit;
>  };
> diff --git a/tools/lib/bpf/libbpf_probes.c b/tools/lib/bpf/libbpf_probes.c
> index 6d495656f554..f3a8e8e74eb8 100644
> --- a/tools/lib/bpf/libbpf_probes.c
> +++ b/tools/lib/bpf/libbpf_probes.c
> @@ -231,6 +231,7 @@ static int probe_map_create(enum bpf_map_type map_type)
>                         return btf_fd;
>                 break;
>         case BPF_MAP_TYPE_RINGBUF:
> +       case BPF_MAP_TYPE_USER_RINGBUF:
>                 key_size = 0;
>                 value_size = 0;
>                 max_entries = 4096;
> diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c
> index 8bc117bcc7bc..86e3c11d8486 100644
> --- a/tools/lib/bpf/ringbuf.c
> +++ b/tools/lib/bpf/ringbuf.c
> @@ -39,6 +39,17 @@ struct ring_buffer {
>         int ring_cnt;
>  };
>
> +struct ring_buffer_user {
> +       struct epoll_event event;
> +       unsigned long *consumer_pos;
> +       unsigned long *producer_pos;
> +       void *data;
> +       unsigned long mask;
> +       size_t page_size;
> +       int map_fd;
> +       int epoll_fd;
> +};
> +
>  static void ringbuf_unmap_ring(struct ring_buffer *rb, struct ring *r)
>  {
>         if (r->consumer_pos) {
> @@ -300,3 +311,208 @@ int ring_buffer__epoll_fd(const struct ring_buffer *rb)
>  {
>         return rb->epoll_fd;
>  }
> +
> +static void __user_ringbuf_unmap_ring(struct ring_buffer_user *rb)

libbpf generally doesn't use double underscore naming pattern, let's
not do that here as well

> +{
> +       if (rb->consumer_pos) {
> +               munmap(rb->consumer_pos, rb->page_size);
> +               rb->consumer_pos = NULL;
> +       }
> +       if (rb->producer_pos) {
> +               munmap(rb->producer_pos, rb->page_size + 2 * (rb->mask + 1));
> +               rb->producer_pos = NULL;
> +       }
> +}
> +
> +void ring_buffer_user__free(struct ring_buffer_user *rb)
> +{
> +       if (!rb)
> +               return;
> +
> +       __user_ringbuf_unmap_ring(rb);
> +
> +       if (rb->epoll_fd >= 0)
> +               close(rb->epoll_fd);
> +
> +       free(rb);
> +}
> +
> +static int __ring_buffer_user_map(struct ring_buffer_user *rb, int map_fd)
> +{
> +

please don't leave stray empty lines around the code


> +       struct bpf_map_info info;
> +       __u32 len = sizeof(info);
> +       void *tmp;
> +       struct epoll_event *rb_epoll;
> +       int err;
> +
> +       memset(&info, 0, sizeof(info));
> +
> +       err = bpf_obj_get_info_by_fd(map_fd, &info, &len);
> +       if (err) {
> +               err = -errno;
> +               pr_warn("user ringbuf: failed to get map info for fd=%d: %d\n",
> +                       map_fd, err);
> +               return libbpf_err(err);
> +       }
> +
> +       if (info.type != BPF_MAP_TYPE_USER_RINGBUF) {
> +               pr_warn("user ringbuf: map fd=%d is not BPF_MAP_TYPE_USER_RINGBUF\n",
> +                       map_fd);
> +               return libbpf_err(-EINVAL);
> +       }
> +
> +       rb->map_fd = map_fd;
> +       rb->mask = info.max_entries - 1;
> +
> +       /* Map read-only consumer page */
> +       tmp = mmap(NULL, rb->page_size, PROT_READ, MAP_SHARED, map_fd, 0);
> +       if (tmp == MAP_FAILED) {
> +               err = -errno;
> +               pr_warn("user ringbuf: failed to mmap consumer page for map fd=%d: %d\n",
> +                       map_fd, err);
> +               return libbpf_err(err);
> +       }
> +       rb->consumer_pos = tmp;
> +
> +       /* Map read-write the producer page and data pages. We map the data
> +        * region as twice the total size of the ringbuffer to allow the simple
> +        * reading and writing of samples that wrap around the end of the
> +        * buffer.  See the kernel implementation for details.
> +        */
> +       tmp = mmap(NULL, rb->page_size + 2 * info.max_entries,
> +                  PROT_READ | PROT_WRITE, MAP_SHARED, map_fd, rb->page_size);
> +       if (tmp == MAP_FAILED) {
> +               err = -errno;
> +               pr_warn("user ringbuf: failed to mmap data pages for map fd=%d: %d\n",
> +                       map_fd, err);
> +               return libbpf_err(err);
> +       }
> +
> +       rb->producer_pos = tmp;
> +       rb->data = tmp + rb->page_size;
> +
> +       rb_epoll = &rb->event;
> +       rb_epoll->events = EPOLLOUT;
> +       if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, rb_epoll) < 0) {
> +               err = -errno;
> +               pr_warn("user ringbuf: failed to epoll add map fd=%d: %d\n",
> +                       map_fd, err);
> +               return libbpf_err(err);
> +       }
> +
> +       return 0;
> +}
> +
> +struct ring_buffer_user *
> +ring_buffer_user__new(int map_fd, const struct ring_buffer_user_opts *opts)
> +{
> +       struct ring_buffer_user *rb;
> +       int err;
> +
> +       if (!OPTS_VALID(opts, ring_buffer_opts))
> +               return errno = EINVAL, NULL;
> +
> +       rb = calloc(1, sizeof(*rb));
> +       if (!rb)
> +               return errno = ENOMEM, NULL;
> +
> +       rb->page_size = getpagesize();
> +
> +       rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
> +       if (rb->epoll_fd < 0) {
> +               err = -errno;
> +               pr_warn("user ringbuf: failed to create epoll instance: %d\n",
> +                       err);
> +               goto err_out;
> +       }
> +
> +       err = __ring_buffer_user_map(rb, map_fd);
> +       if (err)
> +               goto err_out;
> +
> +       return rb;
> +
> +err_out:
> +       ring_buffer_user__free(rb);
> +       return errno = -err, NULL;
> +}
> +
> +static void __ring_buffer_user__commit(struct ring_buffer_user *rb)
> +{
> +       uint32_t *hdr;
> +       uint32_t total_len;
> +       unsigned long prod_pos;
> +
> +       prod_pos = *rb->producer_pos;
> +       hdr = rb->data + (prod_pos & rb->mask);
> +
> +       total_len = *hdr + BPF_RINGBUF_HDR_SZ;

round up to multiple of 8

> +
> +       /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_poll() in
> +        * the kernel.
> +        */
> +       smp_store_release(rb->producer_pos, prod_pos + total_len);
> +}
> +
> +/* Discard a previously reserved sample into the ring buffer. Because the user
> + * ringbuffer is assumed to be single producer, this can simply be a no-op, and
> + * the producer pointer is left in the same place as when it was reserved.
> + */
> +void ring_buffer_user__discard(struct ring_buffer_user *rb, void *sample)
> +{}

{
}

> +
> +/* Submit a previously reserved sample into the ring buffer.
> + */

nit: this is single-line comment


> +void ring_buffer_user__submit(struct ring_buffer_user *rb, void *sample)
> +{
> +       __ring_buffer_user__commit(rb);
> +}

this made me think that it's probably best to add kernel support for
busy bit anyways (just like for existing ringbuf), so that we can
eventually turn this into multi-producer on user-space side (all we
need is a lock, really). So let's anticipate that on kernel side from
the very beginning

> +
> +/* Reserve a pointer to a sample in the user ring buffer. This function is *not*
> + * thread safe, and the ring-buffer supports only a single producer.
> + */
> +void *ring_buffer_user__reserve(struct ring_buffer_user *rb, uint32_t size)
> +{
> +       uint32_t *hdr;
> +       /* 64-bit to avoid overflow in case of extreme application behavior */
> +       size_t avail_size, total_size, max_size;

size_t is not guaranteed to be 64-bit, neither is long

> +       unsigned long cons_pos, prod_pos;
> +
> +       /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_poll() in
> +        * the kernel.
> +        */
> +       cons_pos = smp_load_acquire(rb->consumer_pos);
> +       /* Synchronizes with smp_store_release() in __ring_buffer_user__commit()
> +        */
> +       prod_pos = smp_load_acquire(rb->producer_pos);
> +
> +       max_size = rb->mask + 1;
> +       avail_size = max_size - (prod_pos - cons_pos);
> +       total_size = size + BPF_RINGBUF_HDR_SZ;

round_up(8)

> +
> +       if (total_size > max_size || avail_size < total_size)
> +               return NULL;

set errno as well?

> +
> +       hdr = rb->data + (prod_pos & rb->mask);
> +       *hdr = size;

I'd make sure that entire 8 bytes of header are zero-initialized, for
better future extensibility

> +
> +       /* Producer pos is updated when a sample is submitted. */
> +
> +       return (void *)rb->data + ((prod_pos + BPF_RINGBUF_HDR_SZ) & rb->mask);
> +}
> +
> +/* Poll for available space in the ringbuffer, and reserve a record when it
> + * becomes available.
> + */
> +void *ring_buffer_user__poll(struct ring_buffer_user *rb, uint32_t size,
> +                            int timeout_ms)
> +{
> +       int cnt;
> +
> +       cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, timeout_ms);
> +       if (cnt < 0)
> +               return NULL;
> +
> +       return ring_buffer_user__reserve(rb, size);

it's not clear how just doing epoll_wait() guarantees that we have >=
size of space available?.. Seems like some tests are missing?

> +}
> --
> 2.30.2
>

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 3/5] bpf: Add bpf_user_ringbuf_drain() helper
  2022-08-11 23:22   ` Andrii Nakryiko
@ 2022-08-12  0:01     ` David Vernet
  2022-08-12  0:46     ` David Vernet
  1 sibling, 0 replies; 28+ messages in thread
From: David Vernet @ 2022-08-12  0:01 UTC (permalink / raw)
  To: Andrii Nakryiko
  Cc: bpf, ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, joannelkoong, linux-kernel,
	Kernel-team

On Thu, Aug 11, 2022 at 04:22:43PM -0700, Andrii Nakryiko wrote:
> On Mon, Aug 8, 2022 at 8:54 AM David Vernet <void@manifault.com> wrote:
> >
> > Now that we have a BPF_MAP_TYPE_USER_RINGBUF map type, we need to add a
> > helper function that allows BPF programs to drain samples from the ring
> > buffer, and invoke a callback for each. This patch adds a new
> > bpf_user_ringbuf_drain() helper that provides this abstraction.
> >
> > In order to support this, we needed to also add a new PTR_TO_DYNPTR
> > register type to reflect a dynptr that was allocated by a helper function
> > and passed to a BPF program. The verifier currently only supports
> > PTR_TO_DYNPTR registers that are also DYNPTR_TYPE_LOCAL and MEM_ALLOC.
> 
> This commit message is a bit too laconic. There is a lot of
> implications of various parts of this patch, it would be great to
> highlight most important ones. Please consider elaborating a bit more.

Not a problem, I'll do this in v3.

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 3/5] bpf: Add bpf_user_ringbuf_drain() helper
  2022-08-11 23:22   ` Andrii Nakryiko
  2022-08-12  0:01     ` David Vernet
@ 2022-08-12  0:46     ` David Vernet
  2022-08-16 18:57       ` Andrii Nakryiko
  1 sibling, 1 reply; 28+ messages in thread
From: David Vernet @ 2022-08-12  0:46 UTC (permalink / raw)
  To: Andrii Nakryiko
  Cc: bpf, ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, joannelkoong, linux-kernel,
	Kernel-team

On Thu, Aug 11, 2022 at 04:22:43PM -0700, Andrii Nakryiko wrote:
> On Mon, Aug 8, 2022 at 8:54 AM David Vernet <void@manifault.com> wrote:
> >
> > Now that we have a BPF_MAP_TYPE_USER_RINGBUF map type, we need to add a
> > helper function that allows BPF programs to drain samples from the ring
> > buffer, and invoke a callback for each. This patch adds a new
> > bpf_user_ringbuf_drain() helper that provides this abstraction.
> >
> > In order to support this, we needed to also add a new PTR_TO_DYNPTR
> > register type to reflect a dynptr that was allocated by a helper function
> > and passed to a BPF program. The verifier currently only supports
> > PTR_TO_DYNPTR registers that are also DYNPTR_TYPE_LOCAL and MEM_ALLOC.
> 
> This commit message is a bit too laconic. There is a lot of
> implications of various parts of this patch, it would be great to
> highlight most important ones. Please consider elaborating a bit more.

Argh, sent my last email too early that only responded to this too early.
I'll do this in v3, as mentioned in my other email.

> > diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h
> > index a341f877b230..ca125648d7fd 100644
> > --- a/include/uapi/linux/bpf.h
> > +++ b/include/uapi/linux/bpf.h
> > @@ -5332,6 +5332,13 @@ union bpf_attr {
> >   *             **-EACCES** if the SYN cookie is not valid.
> >   *
> >   *             **-EPROTONOSUPPORT** if CONFIG_IPV6 is not builtin.
> > + *
> > + * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags)
> > + *     Description
> > + *             Drain samples from the specified user ringbuffer, and invoke the
> > + *             provided callback for each such sample.
> 
> please specify what's the expected signature of callback_fn

Will do, unfortunatley we're inconsistent in doing this in other helper
functions, so it wasn't clear from context.

> > + *     Return
> > + *             An error if a sample could not be drained.
> 
> Negative error, right? And might be worth it briefly describing what
> are the situation(s) when you won't be able to drain a sample?
> 
> Also please clarify if having no sample to drain is an error or not?
> 
> It's also important to specify that if no error (and -ENODATA
> shouldn't be an error, actually) occurred then we get number of
> consumed samples back.

Agreed, I'll add more data here in the next version.

[...]

> > +
> > +static __poll_t ringbuf_map_poll_kern(struct bpf_map *map, struct file *filp,
> > +                                     struct poll_table_struct *pts)
> > +{
> > +       return ringbuf_map_poll(map, filp, pts, true);
> > +}
> > +
> > +static __poll_t ringbuf_map_poll_user(struct bpf_map *map, struct file *filp,
> > +                                     struct poll_table_struct *pts)
> > +{
> > +       return ringbuf_map_poll(map, filp, pts, false);
> >  }
> 
> This is an even stronger case where I think we should keep two
> implementations completely separate. Please keep existing
> ringbuf_map_poll as is and just add a user variant as a separate
> implementation

Agreed, I'll split both this and  into separate functions (and the mmaps)
into separate functions.

> >  BTF_ID_LIST_SINGLE(ringbuf_map_btf_ids, struct, bpf_ringbuf_map)
> > @@ -309,7 +326,7 @@ const struct bpf_map_ops ringbuf_map_ops = {
> >         .map_alloc = ringbuf_map_alloc,
> >         .map_free = ringbuf_map_free,
> >         .map_mmap = ringbuf_map_mmap_kern,
> > -       .map_poll = ringbuf_map_poll,
> > +       .map_poll = ringbuf_map_poll_kern,
> >         .map_lookup_elem = ringbuf_map_lookup_elem,
> >         .map_update_elem = ringbuf_map_update_elem,
> >         .map_delete_elem = ringbuf_map_delete_elem,
> > @@ -323,6 +340,7 @@ const struct bpf_map_ops user_ringbuf_map_ops = {
> >         .map_alloc = ringbuf_map_alloc,
> >         .map_free = ringbuf_map_free,
> >         .map_mmap = ringbuf_map_mmap_user,
> > +       .map_poll = ringbuf_map_poll_user,
> >         .map_lookup_elem = ringbuf_map_lookup_elem,
> >         .map_update_elem = ringbuf_map_update_elem,
> >         .map_delete_elem = ringbuf_map_delete_elem,
> > @@ -605,3 +623,132 @@ const struct bpf_func_proto bpf_ringbuf_discard_dynptr_proto = {
> >         .arg1_type      = ARG_PTR_TO_DYNPTR | DYNPTR_TYPE_RINGBUF | OBJ_RELEASE,
> >         .arg2_type      = ARG_ANYTHING,
> >  };
> > +
> > +static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample,
> > +                                  u32 *size)
> 
> "poll" part is quite confusing as it has nothing to do with epoll and
> the other poll implementation. Maybe "peek"? It peek into next sample
> without consuming it, seems appropriate
> 
> nit: this declaration can also stay on single line

Yeah, I agree that "poll" is confusing. I think "peek" is a good option. I
was also considering "consume", but I don't think that makes sense given
that we're not actually done consuming the sample until we release it.

> > +{
> > +       unsigned long cons_pos, prod_pos;
> > +       u32 sample_len, total_len;
> > +       u32 *hdr;
> > +       int err;
> > +       int busy = 0;
> 
> nit: combine matching types:
> 
> u32 sample_len, total_len, *hdr;
> int err, busy = 0;

Ack.

> > +
> > +       /* If another consumer is already consuming a sample, wait for them to
> > +        * finish.
> > +        */
> > +       if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
> > +               return -EBUSY;
> > +
> > +       /* Synchronizes with smp_store_release() in user-space. */
> > +       prod_pos = smp_load_acquire(&rb->producer_pos);
> 
> I think we should enforce that prod_pos is a multiple of 8

Agreed, I'll add a check and selftest for this.

> > +       /* Synchronizes with smp_store_release() in
> > +        * __bpf_user_ringbuf_sample_release().
> > +        */
> > +       cons_pos = smp_load_acquire(&rb->consumer_pos);
> > +       if (cons_pos >= prod_pos) {
> > +               atomic_set(&rb->busy, 0);
> > +               return -ENODATA;
> > +       }
> > +
> > +       hdr = (u32 *)((uintptr_t)rb->data + (cons_pos & rb->mask));
> > +       sample_len = *hdr;
> 
> do we need smp_load_acquire() here? libbpf's ring_buffer
> implementation uses load_acquire here

I thought about this when I was first adding the logic, but I can't
convince myself that it's necessary and wasn't able to figure out why we
did a load acquire on the len in libbpf. The kernel doesn't do a store
release on the header, so I'm not sure what the load acquire in libbpf
actually accomplishes. I could certainly be missing something, but I
_think_ the important thing is that we have load-acquire / store-release
pairs for the consumer and producer positions.

> > +       /* Check that the sample can fit into a dynptr. */
> > +       err = bpf_dynptr_check_size(sample_len);
> > +       if (err) {
> > +               atomic_set(&rb->busy, 0);
> > +               return err;
> > +       }
> > +
> > +       /* Check that the sample fits within the region advertised by the
> > +        * consumer position.
> > +        */
> > +       total_len = sample_len + BPF_RINGBUF_HDR_SZ;
> 
> round up to closest multiple of 8? All the pointers into ringbuf data
> area should be 8-byte aligned

Will do.

> > +       if (total_len > prod_pos - cons_pos) {
> > +               atomic_set(&rb->busy, 0);
> > +               return -E2BIG;
> > +       }
> > +
> > +       /* Check that the sample fits within the data region of the ring buffer.
> > +        */
> > +       if (total_len > rb->mask + 1) {
> > +               atomic_set(&rb->busy, 0);
> > +               return -E2BIG;
> > +       }
> > +
> > +       /* consumer_pos is updated when the sample is released.
> > +        */
> > +
> 
> nit: unnecessary empty line
> 
> and please keep single-line comments as single-line, no */ on separate
> line in such case

Will do.

> > +       *sample = (void *)((uintptr_t)rb->data +
> > +                          ((cons_pos + BPF_RINGBUF_HDR_SZ) & rb->mask));
> > +       *size = sample_len;
> > +
> > +       return 0;
> > +}
> > +
> > +static void
> > +__bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size,
> > +                                 u64 flags)
> 
> try to keep single lines if they are under 100 characters

Ack, this seems to really differ by subsystem. I'll follow this norm for
BPF.

> > +{
> > +
> > +
> 
> empty lines, why?

Apologies, thanks for catching this.

> > +       /* To release the ringbuffer, just increment the producer position to
> > +        * signal that a new sample can be consumed. The busy bit is cleared by
> > +        * userspace when posting a new sample to the ringbuffer.
> > +        */
> > +       smp_store_release(&rb->consumer_pos, rb->consumer_pos + size +
> > +                         BPF_RINGBUF_HDR_SZ);
> > +
> > +       if (flags & BPF_RB_FORCE_WAKEUP || !(flags & BPF_RB_NO_WAKEUP))
> 
> please use () around bit operator expressions

Ack.

> > +               irq_work_queue(&rb->work);
> > +
> > +       atomic_set(&rb->busy, 0);
> 
> set busy before scheduling irq_work? why delaying?

Yeah, I thought about this. I don't think there's any problem with clearing
busy before we schedule the irq_work_queue(). I elected to do this to err
on the side of simpler logic until we observed contention, but yeah, let me
just do the more performant thing here.

> > +}
> > +
> > +BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map,
> > +          void *, callback_fn, void *, callback_ctx, u64, flags)
> > +{
> > +       struct bpf_ringbuf *rb;
> > +       long num_samples = 0, ret = 0;
> > +       int err;
> > +       bpf_callback_t callback = (bpf_callback_t)callback_fn;
> > +       u64 wakeup_flags = BPF_RB_NO_WAKEUP | BPF_RB_FORCE_WAKEUP;
> > +
> > +       if (unlikely(flags & ~wakeup_flags))
> > +               return -EINVAL;
> > +
> > +       /* The two wakeup flags are mutually exclusive. */
> > +       if (unlikely((flags & wakeup_flags) == wakeup_flags))
> > +               return -EINVAL;
> 
> we don't check this for existing ringbuf, so maybe let's keep it
> consistent? FORCE_WAKEUP is just stronger than NO_WAKEUP

Ack.

> > +
> > +       rb = container_of(map, struct bpf_ringbuf_map, map)->rb;
> > +       do {
> > +               u32 size;
> > +               void *sample;
> > +
> > +               err = __bpf_user_ringbuf_poll(rb, &sample, &size);
> > +
> 
> nit: don't keep empty line between function call and error check

Ack.

> > +               if (!err) {
> 
> so -ENODATA is a special error and you should stop if you get that.
> But for any other error we should propagate error back, not just
> silently consuming it
> 
> maybe
> 
> err = ...
> if (err) {
>     if (err == -ENODATA)
>       break;
>     return err;
> }
> 
> ?

Agreed, I'll fix this.

> > +                       struct bpf_dynptr_kern dynptr;
> > +
> > +                       bpf_dynptr_init(&dynptr, sample, BPF_DYNPTR_TYPE_LOCAL,
> > +                                       0, size);
> 
> we should try to avoid unnecessary re-initialization of dynptr, let's
> initialize it once and then just update data and size field inside the
> loop?

Hmm ok, let me give that a try. 

> > +                       ret = callback((u64)&dynptr,
> > +                                       (u64)(uintptr_t)callback_ctx, 0, 0, 0);
> > +
> > +                       __bpf_user_ringbuf_sample_release(rb, size, flags);
> > +                       num_samples++;
> > +               }
> > +       } while (err == 0 && num_samples < 4096 && ret == 0);
> > +
> 
> 4096 is pretty arbitrary. Definitely worth noting it somewhere and it
> seems somewhat low, tbh...
> 
> ideally we'd cond_resched() from time to time, but that would require
> BPF program to be sleepable, so we can't do that :(

Yeah, I knew this would come up in discussion. I would love to do
cond_resched() here, but as you said, I don't think it's an option :-/ And
given the fact that we're calling back into the BPF program, we have to be
cognizant of things taking a while and clogging up the CPU. What do you
think is a more reasonable number than 4096?

[...]

> >         case ARG_PTR_TO_DYNPTR:
> > +               /* We only need to check for initialized / uninitialized helper
> > +                * dynptr args if the dynptr is not MEM_ALLOC, as the assumption
> > +                * is that if it is, that a helper function initialized the
> > +                * dynptr on behalf of the BPF program.
> > +                */
> > +               if (reg->type & MEM_ALLOC)
> 
> Isn't PTR_TO_DYNPTR enough indication? Do we need MEM_ALLOC modifier?
> Normal dynptr created and used inside BPF program on the stack are
> actually PTR_TO_STACK, so that should be enough distinction? Or am I
> missing something?

I think this would also work in the current state of the codebase, but IIUC
it relies on PTR_TO_STACK being the only way that a BPF program could ever
allocate a dynptr. I was trying to guard against the case of a helper being
added in the future that e.g. returned a dynamically allocated dynptr that
the caller would eventually need to release in another helper call.
MEM_ALLOC seems like the correct modifier to more generally denote that the
dynptr was externally allocated.  If you think this is overkill I'm totally
fine with removing MEM_ALLOC. We can always add it down the road if we add
a new helper that requires it.

[...]

> > @@ -7477,11 +7524,15 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
> >                 /* Find the id of the dynptr we're acquiring a reference to */
> >                 for (i = 0; i < MAX_BPF_FUNC_REG_ARGS; i++) {
> >                         if (arg_type_is_dynptr(fn->arg_type[i])) {
> > +                               struct bpf_reg_state *reg = &regs[BPF_REG_1 + i];
> > +
> >                                 if (dynptr_id) {
> >                                         verbose(env, "verifier internal error: multiple dynptr args in func\n");
> >                                         return -EFAULT;
> >                                 }
> > -                               dynptr_id = stack_slot_get_id(env, &regs[BPF_REG_1 + i]);
> > +
> > +                               if (!(reg->type & MEM_ALLOC))
> > +                                       dynptr_id = stack_slot_get_id(env, reg);
> 
> this part has changed in bpf-next

Yeah, this is all rewired in the new version I sent out in v2 (and will
send out in v3 when I apply your other suggestions).

[...]

Thanks for the thorough review!

- David

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 2/5] bpf: Define new BPF_MAP_TYPE_USER_RINGBUF map type
  2022-08-11 23:22   ` Andrii Nakryiko
@ 2022-08-12 16:21     ` David Vernet
  0 siblings, 0 replies; 28+ messages in thread
From: David Vernet @ 2022-08-12 16:21 UTC (permalink / raw)
  To: Andrii Nakryiko
  Cc: bpf, ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, joannelkoong, linux-kernel,
	Kernel-team

On Thu, Aug 11, 2022 at 04:22:50PM -0700, Andrii Nakryiko wrote:
> On Mon, Aug 8, 2022 at 8:54 AM David Vernet <void@manifault.com> wrote:
> >
> > We want to support a ringbuf map type where samples are published from
> > user-space to BPF programs. BPF currently supports a kernel -> user-space
> > circular ringbuffer via the BPF_MAP_TYPE_RINGBUF map type. We'll need to
> > define a new map type for user-space -> kernel, as none of the helpers
> > exported for BPF_MAP_TYPE_RINGBUF will apply to a user-space producer
> > ringbuffer, and we'll want to add one or more helper functions that would
> > not apply for a kernel-producer ringbuffer.
> >
> > This patch therefore adds a new BPF_MAP_TYPE_USER_RINGBUF map type
> > definition. The map type is useless in its current form, as there is no way
> > to access or use it for anything until we add more BPF helpers. A follow-on
> > patch will therefore add a new helper function that allows BPF programs to
> > run callbacks on samples that are published to the ringbuffer.
> >
> > Signed-off-by: David Vernet <void@manifault.com>
> > ---
> >  include/linux/bpf_types.h      |  1 +
> >  include/uapi/linux/bpf.h       |  1 +
> >  kernel/bpf/ringbuf.c           | 70 +++++++++++++++++++++++++++++-----
> >  kernel/bpf/verifier.c          |  3 ++
> >  tools/include/uapi/linux/bpf.h |  1 +
> >  tools/lib/bpf/libbpf.c         |  1 +
> >  6 files changed, 68 insertions(+), 9 deletions(-)
> >
> 
> [...]
> 
> >
> > -static int ringbuf_map_mmap(struct bpf_map *map, struct vm_area_struct *vma)
> > +static int ringbuf_map_mmap(struct bpf_map *map, struct vm_area_struct *vma,
> > +                           bool kernel_producer)
> >  {
> >         struct bpf_ringbuf_map *rb_map;
> >
> >         rb_map = container_of(map, struct bpf_ringbuf_map, map);
> >
> >         if (vma->vm_flags & VM_WRITE) {
> > -               /* allow writable mapping for the consumer_pos only */
> > -               if (vma->vm_pgoff != 0 || vma->vm_end - vma->vm_start != PAGE_SIZE)
> > +               if (kernel_producer) {
> > +                       /* allow writable mapping for the consumer_pos only */
> > +                       if (vma->vm_pgoff != 0 || vma->vm_end - vma->vm_start != PAGE_SIZE)
> > +                               return -EPERM;
> > +               /* For user ringbufs, disallow writable mappings to the
> > +                * consumer pointer, and allow writable mappings to both the
> > +                * producer position, and the ring buffer data itself.
> > +                */
> > +               } else if (vma->vm_pgoff == 0)
> >                         return -EPERM;
> 
> the asymmetrical use of {} in one if branch and not using them in
> another is extremely confusing, please don't do that

Ah, sorry, I misread the style guide and thought this was stipulated there,
but I now see that it's explicitly stated that you should include a brace
if only one branch is in a single statement. I'll fix this (by splitting
these into their own implementations, as mentioned below).

> the way you put big comment inside the wrong if branch also throws me
> off, maybe move it before return -EPERM instead with proper
> indentation?

Yeah, fair enough.

> sorry for nitpicks, but I've been stuck for a few minutes trying to
> figure out what exactly is happening here :)

Not a problem at all, sorry for the less-than-readable code.

> >         } else {
> >                 vma->vm_flags &= ~VM_MAYWRITE;
> > @@ -242,6 +271,16 @@ static int ringbuf_map_mmap(struct bpf_map *map, struct vm_area_struct *vma)
> >                                    vma->vm_pgoff + RINGBUF_PGOFF);
> >  }
> >
> > +static int ringbuf_map_mmap_kern(struct bpf_map *map, struct vm_area_struct *vma)
> > +{
> > +       return ringbuf_map_mmap(map, vma, true);
> > +}
> > +
> > +static int ringbuf_map_mmap_user(struct bpf_map *map, struct vm_area_struct *vma)
> > +{
> > +       return ringbuf_map_mmap(map, vma, false);
> > +}
> 
> I wouldn't mind if you just have two separate implementations of
> ringbuf_map_mmap for _kern and _user cases, tbh, probably would be
> clearer as well

Yeah, I can do this. I was trying to avoid any copy-pasta at all cost, but
I think it's doing more harm than good. I'll just split them into totally
separate implementations.

> > +
> >  static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb)
> >  {
> >         unsigned long cons_pos, prod_pos;
> > @@ -269,7 +308,7 @@ const struct bpf_map_ops ringbuf_map_ops = {
> >         .map_meta_equal = bpf_map_meta_equal,
> >         .map_alloc = ringbuf_map_alloc,
> >         .map_free = ringbuf_map_free,
> > -       .map_mmap = ringbuf_map_mmap,
> > +       .map_mmap = ringbuf_map_mmap_kern,
> >         .map_poll = ringbuf_map_poll,
> >         .map_lookup_elem = ringbuf_map_lookup_elem,
> >         .map_update_elem = ringbuf_map_update_elem,
> > @@ -278,6 +317,19 @@ const struct bpf_map_ops ringbuf_map_ops = {
> >         .map_btf_id = &ringbuf_map_btf_ids[0],
> >  };
> >
> 
> [...]

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 2/5] bpf: Define new BPF_MAP_TYPE_USER_RINGBUF map type
  2022-08-11 23:29   ` Andrii Nakryiko
@ 2022-08-12 16:23     ` David Vernet
  2022-08-16 18:43       ` Andrii Nakryiko
  0 siblings, 1 reply; 28+ messages in thread
From: David Vernet @ 2022-08-12 16:23 UTC (permalink / raw)
  To: Andrii Nakryiko
  Cc: bpf, ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, joannelkoong, linux-kernel,
	Kernel-team

On Thu, Aug 11, 2022 at 04:29:02PM -0700, Andrii Nakryiko wrote:

[...]

> > -       /* Consumer and producer counters are put into separate pages to allow
> > -        * mapping consumer page as r/w, but restrict producer page to r/o.
> > -        * This protects producer position from being modified by user-space
> > -        * application and ruining in-kernel position tracking.
> > +       /* Consumer and producer counters are put into separate pages to
> > +        * allow each position to be mapped with different permissions.
> > +        * This prevents a user-space application from modifying the
> > +        * position and ruining in-kernel tracking. The permissions of the
> > +        * pages depend on who is producing samples: user-space or the
> > +        * kernel.
> > +        *
> > +        * Kernel-producer
> > +        * ---------------
> > +        * The producer position and data pages are mapped as r/o in
> > +        * userspace. For this approach, bits in the header of samples are
> > +        * used to signal to user-space, and to other producers, whether a
> > +        * sample is currently being written.
> > +        *
> > +        * User-space producer
> > +        * -------------------
> > +        * Only the page containing the consumer position, and whether the
> > +        * ringbuffer is currently being consumed via a 'busy' bit, are
> > +        * mapped r/o in user-space. Sample headers may not be used to
> > +        * communicate any information between kernel consumers, as a
> > +        * user-space application could modify its contents at any time.
> >          */
> > -       unsigned long consumer_pos __aligned(PAGE_SIZE);
> > +       struct {
> > +               unsigned long consumer_pos;
> > +               atomic_t busy;
> 
> one more thing, why does busy have to be exposed into user-space
> mapped memory at all? Can't it be just a private variable in
> bpf_ringbuf?

It could be moved elsewhere in the struct. I put it here to avoid
increasing the size of struct bpf_ringbuf unnecessarily, as we had all of
this extra space on the consumer_pos page. Specifically, I was trying to
avoid taxing kernel-producer ringbuffers. If you'd prefer, I can just put
it elsewhere in the struct.

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 4/5] bpf: Add libbpf logic for user-space ring buffer
  2022-08-11 23:39   ` Andrii Nakryiko
@ 2022-08-12 17:28     ` David Vernet
  2022-08-16 19:09       ` Andrii Nakryiko
  0 siblings, 1 reply; 28+ messages in thread
From: David Vernet @ 2022-08-12 17:28 UTC (permalink / raw)
  To: Andrii Nakryiko
  Cc: bpf, ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, joannelkoong, linux-kernel,
	Kernel-team

On Thu, Aug 11, 2022 at 04:39:57PM -0700, Andrii Nakryiko wrote:

[...]

> > diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
> > index fc589fc8eb7c..a10558e79ec8 100644
> > --- a/kernel/bpf/ringbuf.c
> > +++ b/kernel/bpf/ringbuf.c
> > @@ -639,7 +639,9 @@ static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample,
> >         if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
> >                 return -EBUSY;
> >
> > -       /* Synchronizes with smp_store_release() in user-space. */
> > +       /* Synchronizes with smp_store_release() in __ring_buffer_user__commit()
> > +        * in user-space.
> > +        */
> 
> let's not hard-code libbpf function names in kernel comments, it's
> still user-space

Fair enough.

> >         prod_pos = smp_load_acquire(&rb->producer_pos);
> >         /* Synchronizes with smp_store_release() in
> >          * __bpf_user_ringbuf_sample_release().
> > @@ -695,6 +697,9 @@ __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size,
> >         /* To release the ringbuffer, just increment the producer position to
> >          * signal that a new sample can be consumed. The busy bit is cleared by
> >          * userspace when posting a new sample to the ringbuffer.
> > +        *
> > +        * Synchronizes with smp_load_acquire() in ring_buffer_user__reserve()
> > +        * in user-space.
> >          */
> 
> same
> 
> >         smp_store_release(&rb->consumer_pos, rb->consumer_pos + size +
> >                           BPF_RINGBUF_HDR_SZ);
> > diff --git a/tools/lib/bpf/libbpf.c b/tools/lib/bpf/libbpf.c
> > index 9c1f2d09f44e..f7fe09dce643 100644
> > --- a/tools/lib/bpf/libbpf.c
> > +++ b/tools/lib/bpf/libbpf.c
> > @@ -2367,6 +2367,12 @@ static size_t adjust_ringbuf_sz(size_t sz)
> >         return sz;
> >  }
> >
> > +static bool map_is_ringbuf(const struct bpf_map *map)
> > +{
> > +       return map->def.type == BPF_MAP_TYPE_RINGBUF ||
> > +              map->def.type == BPF_MAP_TYPE_USER_RINGBUF;
> > +}
> > +
> >  static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def)
> >  {
> >         map->def.type = def->map_type;
> > @@ -2381,7 +2387,7 @@ static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def
> >         map->btf_value_type_id = def->value_type_id;
> >
> >         /* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */
> > -       if (map->def.type == BPF_MAP_TYPE_RINGBUF)
> > +       if (map_is_ringbuf(map))
> >                 map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries);
> >
> >         if (def->parts & MAP_DEF_MAP_TYPE)
> > @@ -4363,7 +4369,7 @@ int bpf_map__set_max_entries(struct bpf_map *map, __u32 max_entries)
> >         map->def.max_entries = max_entries;
> >
> >         /* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */
> > -       if (map->def.type == BPF_MAP_TYPE_RINGBUF)
> > +       if (map_is_ringbuf(map))
> >                 map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries);
> >
> >         return 0;
> > diff --git a/tools/lib/bpf/libbpf.h b/tools/lib/bpf/libbpf.h
> > index 61493c4cddac..6d1d0539b08d 100644
> > --- a/tools/lib/bpf/libbpf.h
> > +++ b/tools/lib/bpf/libbpf.h
> > @@ -1009,6 +1009,7 @@ LIBBPF_API int bpf_tc_query(const struct bpf_tc_hook *hook,
> >
> >  /* Ring buffer APIs */
> >  struct ring_buffer;
> > +struct ring_buffer_user;
> 
> 
> I know that I'm the one asking to use ring_buffer_user name, but given
> kernel is using USER_RINGBUF and user_ringbuf naming, let's be
> consistent and use user_ring_buffer in libbpf as well. I was
> contemplating uring_buffer, can't make up my mind if it's better or
> not.

Not a problem, I'll revert it back to user_ring_buffer.

> Also ring_buffer_user reads like "user of ring buffer", which adds to
> confusion :)

[...]

> > +static void __user_ringbuf_unmap_ring(struct ring_buffer_user *rb)
> 
> libbpf generally doesn't use double underscore naming pattern, let's
> not do that here as well

Ack.

> > +{
> > +       if (rb->consumer_pos) {
> > +               munmap(rb->consumer_pos, rb->page_size);
> > +               rb->consumer_pos = NULL;
> > +       }
> > +       if (rb->producer_pos) {
> > +               munmap(rb->producer_pos, rb->page_size + 2 * (rb->mask + 1));
> > +               rb->producer_pos = NULL;
> > +       }
> > +}
> > +
> > +void ring_buffer_user__free(struct ring_buffer_user *rb)
> > +{
> > +       if (!rb)
> > +               return;
> > +
> > +       __user_ringbuf_unmap_ring(rb);
> > +
> > +       if (rb->epoll_fd >= 0)
> > +               close(rb->epoll_fd);
> > +
> > +       free(rb);
> > +}
> > +
> > +static int __ring_buffer_user_map(struct ring_buffer_user *rb, int map_fd)
> > +{
> > +
> 
> please don't leave stray empty lines around the code

Sorry, not sure how I missed that.

> > +static void __ring_buffer_user__commit(struct ring_buffer_user *rb)
> > +{
> > +       uint32_t *hdr;
> > +       uint32_t total_len;
> > +       unsigned long prod_pos;
> > +
> > +       prod_pos = *rb->producer_pos;
> > +       hdr = rb->data + (prod_pos & rb->mask);
> > +
> > +       total_len = *hdr + BPF_RINGBUF_HDR_SZ;
> 
> round up to multiple of 8

Will do, and I'll also validate this in the kernel.

> > +
> > +       /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_poll() in
> > +        * the kernel.
> > +        */
> > +       smp_store_release(rb->producer_pos, prod_pos + total_len);
> > +}
> > +
> > +/* Discard a previously reserved sample into the ring buffer. Because the user
> > + * ringbuffer is assumed to be single producer, this can simply be a no-op, and
> > + * the producer pointer is left in the same place as when it was reserved.
> > + */
> > +void ring_buffer_user__discard(struct ring_buffer_user *rb, void *sample)
> > +{}
> 
> {
> }

Ack.

> > +
> > +/* Submit a previously reserved sample into the ring buffer.
> > + */
> 
> nit: this is single-line comment

Ack.

> > +void ring_buffer_user__submit(struct ring_buffer_user *rb, void *sample)
> > +{
> > +       __ring_buffer_user__commit(rb);
> > +}
> 
> this made me think that it's probably best to add kernel support for
> busy bit anyways (just like for existing ringbuf), so that we can
> eventually turn this into multi-producer on user-space side (all we
> need is a lock, really). So let's anticipate that on kernel side from
> the very beginning

Hmm, yeah, fair enough. We have the extra space in the sample header to OR the
busy bit, and we already have a 2-stage reserve -> commit workflow, so we might
as well. I'll go ahead and add this, and then hopefully someday we can just add
a lock, as you mentioned.

> > +/* Reserve a pointer to a sample in the user ring buffer. This function is *not*
> > + * thread safe, and the ring-buffer supports only a single producer.
> > + */
> > +void *ring_buffer_user__reserve(struct ring_buffer_user *rb, uint32_t size)
> > +{
> > +       uint32_t *hdr;
> > +       /* 64-bit to avoid overflow in case of extreme application behavior */
> > +       size_t avail_size, total_size, max_size;
> 
> size_t is not guaranteed to be 64-bit, neither is long

Sorry, you're right. I'll just use explicit bit-width types.

> > +       unsigned long cons_pos, prod_pos;
> > +
> > +       /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_poll() in
> > +        * the kernel.
> > +        */
> > +       cons_pos = smp_load_acquire(rb->consumer_pos);
> > +       /* Synchronizes with smp_store_release() in __ring_buffer_user__commit()
> > +        */
> > +       prod_pos = smp_load_acquire(rb->producer_pos);
> > +
> > +       max_size = rb->mask + 1;
> > +       avail_size = max_size - (prod_pos - cons_pos);
> > +       total_size = size + BPF_RINGBUF_HDR_SZ;
> 
> round_up(8)

Ack.

> > +
> > +       if (total_size > max_size || avail_size < total_size)
> > +               return NULL;
> 
> set errno as well?

Will do.

> > +
> > +       hdr = rb->data + (prod_pos & rb->mask);
> > +       *hdr = size;
> 
> I'd make sure that entire 8 bytes of header are zero-initialized, for
> better future extensibility

Will do.

> > +
> > +       /* Producer pos is updated when a sample is submitted. */
> > +
> > +       return (void *)rb->data + ((prod_pos + BPF_RINGBUF_HDR_SZ) & rb->mask);
> > +}
> > +
> > +/* Poll for available space in the ringbuffer, and reserve a record when it
> > + * becomes available.
> > + */
> > +void *ring_buffer_user__poll(struct ring_buffer_user *rb, uint32_t size,
> > +                            int timeout_ms)
> > +{
> > +       int cnt;
> > +
> > +       cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, timeout_ms);
> > +       if (cnt < 0)
> > +               return NULL;
> > +
> > +       return ring_buffer_user__reserve(rb, size);
> 
> it's not clear how just doing epoll_wait() guarantees that we have >=
> size of space available?.. Seems like some tests are missing?

Right now, the kernel only kicks the polling writer once it's drained all
of the samples from the ring buffer. So at this point, if there's not
enough size in the buffer, there would be nothing we could do regardless.
This seemed like reasonable, simple behavior for the initial
implementation. I can make it a bit more intelligent if you'd like, and
return EPOLLRWNORM as soon as there is any space in the buffer, and have
libbpf potentially make multiple calls to epoll_wait() until enough space
has become available.

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 2/5] bpf: Define new BPF_MAP_TYPE_USER_RINGBUF map type
  2022-08-12 16:23     ` David Vernet
@ 2022-08-16 18:43       ` Andrii Nakryiko
  0 siblings, 0 replies; 28+ messages in thread
From: Andrii Nakryiko @ 2022-08-16 18:43 UTC (permalink / raw)
  To: David Vernet
  Cc: bpf, ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, joannelkoong, linux-kernel,
	Kernel-team

On Fri, Aug 12, 2022 at 9:23 AM David Vernet <void@manifault.com> wrote:
>
> On Thu, Aug 11, 2022 at 04:29:02PM -0700, Andrii Nakryiko wrote:
>
> [...]
>
> > > -       /* Consumer and producer counters are put into separate pages to allow
> > > -        * mapping consumer page as r/w, but restrict producer page to r/o.
> > > -        * This protects producer position from being modified by user-space
> > > -        * application and ruining in-kernel position tracking.
> > > +       /* Consumer and producer counters are put into separate pages to
> > > +        * allow each position to be mapped with different permissions.
> > > +        * This prevents a user-space application from modifying the
> > > +        * position and ruining in-kernel tracking. The permissions of the
> > > +        * pages depend on who is producing samples: user-space or the
> > > +        * kernel.
> > > +        *
> > > +        * Kernel-producer
> > > +        * ---------------
> > > +        * The producer position and data pages are mapped as r/o in
> > > +        * userspace. For this approach, bits in the header of samples are
> > > +        * used to signal to user-space, and to other producers, whether a
> > > +        * sample is currently being written.
> > > +        *
> > > +        * User-space producer
> > > +        * -------------------
> > > +        * Only the page containing the consumer position, and whether the
> > > +        * ringbuffer is currently being consumed via a 'busy' bit, are
> > > +        * mapped r/o in user-space. Sample headers may not be used to
> > > +        * communicate any information between kernel consumers, as a
> > > +        * user-space application could modify its contents at any time.
> > >          */
> > > -       unsigned long consumer_pos __aligned(PAGE_SIZE);
> > > +       struct {
> > > +               unsigned long consumer_pos;
> > > +               atomic_t busy;
> >
> > one more thing, why does busy have to be exposed into user-space
> > mapped memory at all? Can't it be just a private variable in
> > bpf_ringbuf?
>
> It could be moved elsewhere in the struct. I put it here to avoid
> increasing the size of struct bpf_ringbuf unnecessarily, as we had all of
> this extra space on the consumer_pos page. Specifically, I was trying to
> avoid taxing kernel-producer ringbuffers. If you'd prefer, I can just put
> it elsewhere in the struct.

Yes, let's move. 8 byte increase is not a problem, while exposing
internals into user-visible memory page is at the very least is
unclean.

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 3/5] bpf: Add bpf_user_ringbuf_drain() helper
  2022-08-12  0:46     ` David Vernet
@ 2022-08-16 18:57       ` Andrii Nakryiko
  2022-08-16 22:14         ` David Vernet
  0 siblings, 1 reply; 28+ messages in thread
From: Andrii Nakryiko @ 2022-08-16 18:57 UTC (permalink / raw)
  To: David Vernet
  Cc: bpf, ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, joannelkoong, linux-kernel,
	Kernel-team

On Thu, Aug 11, 2022 at 5:46 PM David Vernet <void@manifault.com> wrote:
>
> On Thu, Aug 11, 2022 at 04:22:43PM -0700, Andrii Nakryiko wrote:
> > On Mon, Aug 8, 2022 at 8:54 AM David Vernet <void@manifault.com> wrote:
> > >
> > > Now that we have a BPF_MAP_TYPE_USER_RINGBUF map type, we need to add a
> > > helper function that allows BPF programs to drain samples from the ring
> > > buffer, and invoke a callback for each. This patch adds a new
> > > bpf_user_ringbuf_drain() helper that provides this abstraction.
> > >
> > > In order to support this, we needed to also add a new PTR_TO_DYNPTR
> > > register type to reflect a dynptr that was allocated by a helper function
> > > and passed to a BPF program. The verifier currently only supports
> > > PTR_TO_DYNPTR registers that are also DYNPTR_TYPE_LOCAL and MEM_ALLOC.
> >
> > This commit message is a bit too laconic. There is a lot of
> > implications of various parts of this patch, it would be great to
> > highlight most important ones. Please consider elaborating a bit more.
>
> Argh, sent my last email too early that only responded to this too early.
> I'll do this in v3, as mentioned in my other email.
>

no worries

> > > diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h
> > > index a341f877b230..ca125648d7fd 100644
> > > --- a/include/uapi/linux/bpf.h
> > > +++ b/include/uapi/linux/bpf.h
> > > @@ -5332,6 +5332,13 @@ union bpf_attr {
> > >   *             **-EACCES** if the SYN cookie is not valid.
> > >   *
> > >   *             **-EPROTONOSUPPORT** if CONFIG_IPV6 is not builtin.
> > > + *
> > > + * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags)
> > > + *     Description
> > > + *             Drain samples from the specified user ringbuffer, and invoke the
> > > + *             provided callback for each such sample.
> >
> > please specify what's the expected signature of callback_fn
>
> Will do, unfortunatley we're inconsistent in doing this in other helper
> functions, so it wasn't clear from context.

That means we missed it for other helpers. The idea was to always
specify expected signature in UAPI comment, ideally we fix all the
missing cases.

>
> > > + *     Return
> > > + *             An error if a sample could not be drained.
> >
> > Negative error, right? And might be worth it briefly describing what
> > are the situation(s) when you won't be able to drain a sample?
> >
> > Also please clarify if having no sample to drain is an error or not?
> >
> > It's also important to specify that if no error (and -ENODATA
> > shouldn't be an error, actually) occurred then we get number of
> > consumed samples back.
>
> Agreed, I'll add more data here in the next version.
>
> [...]
>
> > > +
> > > +static __poll_t ringbuf_map_poll_kern(struct bpf_map *map, struct file *filp,
> > > +                                     struct poll_table_struct *pts)
> > > +{
> > > +       return ringbuf_map_poll(map, filp, pts, true);
> > > +}
> > > +
> > > +static __poll_t ringbuf_map_poll_user(struct bpf_map *map, struct file *filp,
> > > +                                     struct poll_table_struct *pts)
> > > +{
> > > +       return ringbuf_map_poll(map, filp, pts, false);
> > >  }
> >
> > This is an even stronger case where I think we should keep two
> > implementations completely separate. Please keep existing
> > ringbuf_map_poll as is and just add a user variant as a separate
> > implementation
>
> Agreed, I'll split both this and  into separate functions (and the mmaps)
> into separate functions.
>
> > >  BTF_ID_LIST_SINGLE(ringbuf_map_btf_ids, struct, bpf_ringbuf_map)
> > > @@ -309,7 +326,7 @@ const struct bpf_map_ops ringbuf_map_ops = {
> > >         .map_alloc = ringbuf_map_alloc,
> > >         .map_free = ringbuf_map_free,
> > >         .map_mmap = ringbuf_map_mmap_kern,
> > > -       .map_poll = ringbuf_map_poll,
> > > +       .map_poll = ringbuf_map_poll_kern,
> > >         .map_lookup_elem = ringbuf_map_lookup_elem,
> > >         .map_update_elem = ringbuf_map_update_elem,
> > >         .map_delete_elem = ringbuf_map_delete_elem,
> > > @@ -323,6 +340,7 @@ const struct bpf_map_ops user_ringbuf_map_ops = {
> > >         .map_alloc = ringbuf_map_alloc,
> > >         .map_free = ringbuf_map_free,
> > >         .map_mmap = ringbuf_map_mmap_user,
> > > +       .map_poll = ringbuf_map_poll_user,
> > >         .map_lookup_elem = ringbuf_map_lookup_elem,
> > >         .map_update_elem = ringbuf_map_update_elem,
> > >         .map_delete_elem = ringbuf_map_delete_elem,
> > > @@ -605,3 +623,132 @@ const struct bpf_func_proto bpf_ringbuf_discard_dynptr_proto = {
> > >         .arg1_type      = ARG_PTR_TO_DYNPTR | DYNPTR_TYPE_RINGBUF | OBJ_RELEASE,
> > >         .arg2_type      = ARG_ANYTHING,
> > >  };
> > > +
> > > +static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample,
> > > +                                  u32 *size)
> >
> > "poll" part is quite confusing as it has nothing to do with epoll and
> > the other poll implementation. Maybe "peek"? It peek into next sample
> > without consuming it, seems appropriate
> >
> > nit: this declaration can also stay on single line
>
> Yeah, I agree that "poll" is confusing. I think "peek" is a good option. I
> was also considering "consume", but I don't think that makes sense given
> that we're not actually done consuming the sample until we release it.

Exactly, consume is very bad as we don't "consume" it in the sense of
making that space available for reuse.

>
> > > +{
> > > +       unsigned long cons_pos, prod_pos;
> > > +       u32 sample_len, total_len;
> > > +       u32 *hdr;
> > > +       int err;
> > > +       int busy = 0;
> >
> > nit: combine matching types:
> >
> > u32 sample_len, total_len, *hdr;
> > int err, busy = 0;
>
> Ack.
>
> > > +
> > > +       /* If another consumer is already consuming a sample, wait for them to
> > > +        * finish.
> > > +        */
> > > +       if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
> > > +               return -EBUSY;
> > > +
> > > +       /* Synchronizes with smp_store_release() in user-space. */
> > > +       prod_pos = smp_load_acquire(&rb->producer_pos);
> >
> > I think we should enforce that prod_pos is a multiple of 8
>
> Agreed, I'll add a check and selftest for this.

Yep, consider also adding few tests where user-space intentionally
breaks the contract to make sure that kernel stays intact (if you
already did that, apologies, I haven't looked at selftests much).

>
> > > +       /* Synchronizes with smp_store_release() in
> > > +        * __bpf_user_ringbuf_sample_release().
> > > +        */
> > > +       cons_pos = smp_load_acquire(&rb->consumer_pos);
> > > +       if (cons_pos >= prod_pos) {
> > > +               atomic_set(&rb->busy, 0);
> > > +               return -ENODATA;
> > > +       }
> > > +
> > > +       hdr = (u32 *)((uintptr_t)rb->data + (cons_pos & rb->mask));
> > > +       sample_len = *hdr;
> >
> > do we need smp_load_acquire() here? libbpf's ring_buffer
> > implementation uses load_acquire here
>
> I thought about this when I was first adding the logic, but I can't
> convince myself that it's necessary and wasn't able to figure out why we
> did a load acquire on the len in libbpf. The kernel doesn't do a store
> release on the header, so I'm not sure what the load acquire in libbpf
> actually accomplishes. I could certainly be missing something, but I
> _think_ the important thing is that we have load-acquire / store-release
> pairs for the consumer and producer positions.

kernel does xchg on len on the kernel side, which is stronger than
smp_store_release (I think it was Paul's suggestion instead of doing
explicit memory barrier, but my memories are hazy for exact reasons).

Right now this might not be necessary, but if we add support for busy
bit in a sample header, it will be closer to what BPF ringbuf is doing
right now, with producer position being a reservation pointer, but
sample itself won't be "readable" until sample header is updated and
its busy bit is unset.

>
> > > +       /* Check that the sample can fit into a dynptr. */
> > > +       err = bpf_dynptr_check_size(sample_len);
> > > +       if (err) {
> > > +               atomic_set(&rb->busy, 0);
> > > +               return err;
> > > +       }
> > > +
> > > +       /* Check that the sample fits within the region advertised by the
> > > +        * consumer position.
> > > +        */
> > > +       total_len = sample_len + BPF_RINGBUF_HDR_SZ;
> >
> > round up to closest multiple of 8? All the pointers into ringbuf data
> > area should be 8-byte aligned
>
> Will do.
>
> > > +       if (total_len > prod_pos - cons_pos) {
> > > +               atomic_set(&rb->busy, 0);
> > > +               return -E2BIG;
> > > +       }
> > > +
> > > +       /* Check that the sample fits within the data region of the ring buffer.
> > > +        */
> > > +       if (total_len > rb->mask + 1) {
> > > +               atomic_set(&rb->busy, 0);
> > > +               return -E2BIG;
> > > +       }
> > > +
> > > +       /* consumer_pos is updated when the sample is released.
> > > +        */
> > > +
> >
> > nit: unnecessary empty line
> >
> > and please keep single-line comments as single-line, no */ on separate
> > line in such case
>
> Will do.
>
> > > +       *sample = (void *)((uintptr_t)rb->data +
> > > +                          ((cons_pos + BPF_RINGBUF_HDR_SZ) & rb->mask));
> > > +       *size = sample_len;
> > > +
> > > +       return 0;
> > > +}
> > > +
> > > +static void
> > > +__bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size,
> > > +                                 u64 flags)
> >
> > try to keep single lines if they are under 100 characters
>
> Ack, this seems to really differ by subsystem. I'll follow this norm for
> BPF.

It's a relatively recent relaxation (like a year old or something) in
kernel coding style. Single-line usually has much better readability,
so I prefer that while staying within reasonable limits.

>
> > > +{
> > > +
> > > +
> >
> > empty lines, why?
>
> Apologies, thanks for catching this.
>
> > > +       /* To release the ringbuffer, just increment the producer position to
> > > +        * signal that a new sample can be consumed. The busy bit is cleared by
> > > +        * userspace when posting a new sample to the ringbuffer.
> > > +        */
> > > +       smp_store_release(&rb->consumer_pos, rb->consumer_pos + size +
> > > +                         BPF_RINGBUF_HDR_SZ);
> > > +
> > > +       if (flags & BPF_RB_FORCE_WAKEUP || !(flags & BPF_RB_NO_WAKEUP))
> >
> > please use () around bit operator expressions
>
> Ack.
>
> > > +               irq_work_queue(&rb->work);
> > > +
> > > +       atomic_set(&rb->busy, 0);
> >
> > set busy before scheduling irq_work? why delaying?
>
> Yeah, I thought about this. I don't think there's any problem with clearing
> busy before we schedule the irq_work_queue(). I elected to do this to err
> on the side of simpler logic until we observed contention, but yeah, let me
> just do the more performant thing here.

busy is like a global lock, so freeing it ASAP is good, so yeah,
unless there are some bad implications, let's do it early

>
> > > +}
> > > +
> > > +BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map,
> > > +          void *, callback_fn, void *, callback_ctx, u64, flags)
> > > +{
> > > +       struct bpf_ringbuf *rb;
> > > +       long num_samples = 0, ret = 0;
> > > +       int err;
> > > +       bpf_callback_t callback = (bpf_callback_t)callback_fn;
> > > +       u64 wakeup_flags = BPF_RB_NO_WAKEUP | BPF_RB_FORCE_WAKEUP;
> > > +
> > > +       if (unlikely(flags & ~wakeup_flags))
> > > +               return -EINVAL;
> > > +
> > > +       /* The two wakeup flags are mutually exclusive. */
> > > +       if (unlikely((flags & wakeup_flags) == wakeup_flags))
> > > +               return -EINVAL;
> >
> > we don't check this for existing ringbuf, so maybe let's keep it
> > consistent? FORCE_WAKEUP is just stronger than NO_WAKEUP
>
> Ack.
>
> > > +
> > > +       rb = container_of(map, struct bpf_ringbuf_map, map)->rb;
> > > +       do {
> > > +               u32 size;
> > > +               void *sample;
> > > +
> > > +               err = __bpf_user_ringbuf_poll(rb, &sample, &size);
> > > +
> >
> > nit: don't keep empty line between function call and error check
>
> Ack.
>
> > > +               if (!err) {
> >
> > so -ENODATA is a special error and you should stop if you get that.
> > But for any other error we should propagate error back, not just
> > silently consuming it
> >
> > maybe
> >
> > err = ...
> > if (err) {
> >     if (err == -ENODATA)
> >       break;
> >     return err;
> > }
> >
> > ?
>
> Agreed, I'll fix this.
>
> > > +                       struct bpf_dynptr_kern dynptr;
> > > +
> > > +                       bpf_dynptr_init(&dynptr, sample, BPF_DYNPTR_TYPE_LOCAL,
> > > +                                       0, size);
> >
> > we should try to avoid unnecessary re-initialization of dynptr, let's
> > initialize it once and then just update data and size field inside the
> > loop?
>
> Hmm ok, let me give that a try.
>

discussed this offline, it might not be worth the hassle given dynptr
init is just setting 4 fields

> > > +                       ret = callback((u64)&dynptr,
> > > +                                       (u64)(uintptr_t)callback_ctx, 0, 0, 0);
> > > +
> > > +                       __bpf_user_ringbuf_sample_release(rb, size, flags);
> > > +                       num_samples++;
> > > +               }
> > > +       } while (err == 0 && num_samples < 4096 && ret == 0);
> > > +
> >
> > 4096 is pretty arbitrary. Definitely worth noting it somewhere and it
> > seems somewhat low, tbh...
> >
> > ideally we'd cond_resched() from time to time, but that would require
> > BPF program to be sleepable, so we can't do that :(
>
> Yeah, I knew this would come up in discussion. I would love to do
> cond_resched() here, but as you said, I don't think it's an option :-/ And
> given the fact that we're calling back into the BPF program, we have to be
> cognizant of things taking a while and clogging up the CPU. What do you
> think is a more reasonable number than 4096?

I don't know, tbh, but 4096 seems pretty low. For bpf_loop() we allow
up to 2mln iterations. I'd bump it up to 64-128K range, probably. But
also please move it into some internal #define'd constant, not some
integer literal buried in a code

>
> [...]
>
> > >         case ARG_PTR_TO_DYNPTR:
> > > +               /* We only need to check for initialized / uninitialized helper
> > > +                * dynptr args if the dynptr is not MEM_ALLOC, as the assumption
> > > +                * is that if it is, that a helper function initialized the
> > > +                * dynptr on behalf of the BPF program.
> > > +                */
> > > +               if (reg->type & MEM_ALLOC)
> >
> > Isn't PTR_TO_DYNPTR enough indication? Do we need MEM_ALLOC modifier?
> > Normal dynptr created and used inside BPF program on the stack are
> > actually PTR_TO_STACK, so that should be enough distinction? Or am I
> > missing something?
>
> I think this would also work in the current state of the codebase, but IIUC
> it relies on PTR_TO_STACK being the only way that a BPF program could ever
> allocate a dynptr. I was trying to guard against the case of a helper being
> added in the future that e.g. returned a dynamically allocated dynptr that
> the caller would eventually need to release in another helper call.
> MEM_ALLOC seems like the correct modifier to more generally denote that the
> dynptr was externally allocated.  If you think this is overkill I'm totally
> fine with removing MEM_ALLOC. We can always add it down the road if we add
> a new helper that requires it.
>

Hm.. I don't see a huge need for more flags for this, so I'd keep it
simple for now and if in the future we do have such a use case, we'll
address it at that time?


> [...]
>
> > > @@ -7477,11 +7524,15 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
> > >                 /* Find the id of the dynptr we're acquiring a reference to */
> > >                 for (i = 0; i < MAX_BPF_FUNC_REG_ARGS; i++) {
> > >                         if (arg_type_is_dynptr(fn->arg_type[i])) {
> > > +                               struct bpf_reg_state *reg = &regs[BPF_REG_1 + i];
> > > +
> > >                                 if (dynptr_id) {
> > >                                         verbose(env, "verifier internal error: multiple dynptr args in func\n");
> > >                                         return -EFAULT;
> > >                                 }
> > > -                               dynptr_id = stack_slot_get_id(env, &regs[BPF_REG_1 + i]);
> > > +
> > > +                               if (!(reg->type & MEM_ALLOC))
> > > +                                       dynptr_id = stack_slot_get_id(env, reg);
> >
> > this part has changed in bpf-next
>
> Yeah, this is all rewired in the new version I sent out in v2 (and will
> send out in v3 when I apply your other suggestions).

Cool, thanks. It was a pretty tight race between my comments and your v2 :)

>
> [...]
>
> Thanks for the thorough review!
>
> - David

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 4/5] bpf: Add libbpf logic for user-space ring buffer
  2022-08-12 17:28     ` David Vernet
@ 2022-08-16 19:09       ` Andrii Nakryiko
  2022-08-17 14:02         ` David Vernet
  0 siblings, 1 reply; 28+ messages in thread
From: Andrii Nakryiko @ 2022-08-16 19:09 UTC (permalink / raw)
  To: David Vernet
  Cc: bpf, ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, joannelkoong, linux-kernel,
	Kernel-team

On Fri, Aug 12, 2022 at 10:28 AM David Vernet <void@manifault.com> wrote:
>
> On Thu, Aug 11, 2022 at 04:39:57PM -0700, Andrii Nakryiko wrote:
>
> [...]
>
> > > diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
> > > index fc589fc8eb7c..a10558e79ec8 100644
> > > --- a/kernel/bpf/ringbuf.c
> > > +++ b/kernel/bpf/ringbuf.c
> > > @@ -639,7 +639,9 @@ static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample,
> > >         if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
> > >                 return -EBUSY;
> > >

[...]

> > > +void ring_buffer_user__submit(struct ring_buffer_user *rb, void *sample)
> > > +{
> > > +       __ring_buffer_user__commit(rb);
> > > +}
> >
> > this made me think that it's probably best to add kernel support for
> > busy bit anyways (just like for existing ringbuf), so that we can
> > eventually turn this into multi-producer on user-space side (all we
> > need is a lock, really). So let's anticipate that on kernel side from
> > the very beginning
>
> Hmm, yeah, fair enough. We have the extra space in the sample header to OR the
> busy bit, and we already have a 2-stage reserve -> commit workflow, so we might
> as well. I'll go ahead and add this, and then hopefully someday we can just add
> a lock, as you mentioned.

Right. We can probably also just document that reserve() step is the
only one that needs serialization among multiple producers (and
currently is required to taken care of by user app), while commit
(submit/discard) operation is thread-safe and needs no
synchronization.

The only reason we don't add it to libbpf right now is because we are
unsure about taking explicit dependency on pthread library. But I also
just found [0], so I don't know, maybe we should use that? I wonder if
it's supported by musl and other less full-featured libc
implementations, though.

  [0] https://www.gnu.org/software/libc/manual/html_node/ISO-C-Mutexes.html

>
> > > +/* Reserve a pointer to a sample in the user ring buffer. This function is *not*
> > > + * thread safe, and the ring-buffer supports only a single producer.
> > > + */
> > > +void *ring_buffer_user__reserve(struct ring_buffer_user *rb, uint32_t size)
> > > +{
> > > +       uint32_t *hdr;
> > > +       /* 64-bit to avoid overflow in case of extreme application behavior */
> > > +       size_t avail_size, total_size, max_size;
> >
> > size_t is not guaranteed to be 64-bit, neither is long

[...]

> > > +/* Poll for available space in the ringbuffer, and reserve a record when it
> > > + * becomes available.
> > > + */
> > > +void *ring_buffer_user__poll(struct ring_buffer_user *rb, uint32_t size,
> > > +                            int timeout_ms)
> > > +{
> > > +       int cnt;
> > > +
> > > +       cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, timeout_ms);
> > > +       if (cnt < 0)
> > > +               return NULL;
> > > +
> > > +       return ring_buffer_user__reserve(rb, size);
> >
> > it's not clear how just doing epoll_wait() guarantees that we have >=
> > size of space available?.. Seems like some tests are missing?
>
> Right now, the kernel only kicks the polling writer once it's drained all
> of the samples from the ring buffer. So at this point, if there's not
> enough size in the buffer, there would be nothing we could do regardless.
> This seemed like reasonable, simple behavior for the initial
> implementation. I can make it a bit more intelligent if you'd like, and
> return EPOLLRWNORM as soon as there is any space in the buffer, and have
> libbpf potentially make multiple calls to epoll_wait() until enough space
> has become available.

So this "drain all samples" notion is not great: you can end drain
prematurely and thus not really drain all the data in ringbuf. With
multiple producers there could also be always more data coming in in
parallel. Plus, when in the future we'll have BPF program associated
with such ringbuf on the kernel side, we won't have a notion of
draining queue, we'll be just submitting record and letting kernel
handle it eventually.

So I think yeah, you'd have to send notification when at least one
sample gets consumed. The problem is that it's going to be a
performance hit, potentially, if you are going to do this notification
for each consumed sample. BPF ringbuf gets somewhat around that by
using heuristic to avoid notification if we see that consumer is still
behind kernel when kernel submits a new sample.

I don't know if we can do anything clever here for waiting for some
space to be available...  Any thoughts?

As for making libbpf loop until enough space is available... I guess
that would be the only reasonable implementation, right? I wonder if
calling it "user_ring_buffer__reserve_blocking()" would be a better
name than just "poll", though?

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 3/5] bpf: Add bpf_user_ringbuf_drain() helper
  2022-08-16 18:57       ` Andrii Nakryiko
@ 2022-08-16 22:14         ` David Vernet
  2022-08-16 22:59           ` Andrii Nakryiko
  0 siblings, 1 reply; 28+ messages in thread
From: David Vernet @ 2022-08-16 22:14 UTC (permalink / raw)
  To: Andrii Nakryiko
  Cc: bpf, ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, joannelkoong, linux-kernel,
	Kernel-team

On Tue, Aug 16, 2022 at 11:57:10AM -0700, Andrii Nakryiko wrote:

[...]

> > > > diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h
> > > > index a341f877b230..ca125648d7fd 100644
> > > > --- a/include/uapi/linux/bpf.h
> > > > +++ b/include/uapi/linux/bpf.h
> > > > @@ -5332,6 +5332,13 @@ union bpf_attr {
> > > >   *             **-EACCES** if the SYN cookie is not valid.
> > > >   *
> > > >   *             **-EPROTONOSUPPORT** if CONFIG_IPV6 is not builtin.
> > > > + *
> > > > + * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags)
> > > > + *     Description
> > > > + *             Drain samples from the specified user ringbuffer, and invoke the
> > > > + *             provided callback for each such sample.
> > >
> > > please specify what's the expected signature of callback_fn
> >
> > Will do, unfortunatley we're inconsistent in doing this in other helper
> > functions, so it wasn't clear from context.
> 
> That means we missed it for other helpers. The idea was to always
> specify expected signature in UAPI comment, ideally we fix all the
> missing cases.

Agreed -- I'll take care of that as a follow-on patch-set.

> > Agreed, I'll add a check and selftest for this.
> 
> Yep, consider also adding few tests where user-space intentionally
> breaks the contract to make sure that kernel stays intact (if you
> already did that, apologies, I haven't looked at selftests much).

The only negative tests I currently have from user-space are verifying
that mapping permissions are correctly enforced. Happy to add some more
that tests boundaries for other parts of the API -- I agree that's both
useful and prudent.

> > > > +       /* Synchronizes with smp_store_release() in
> > > > +        * __bpf_user_ringbuf_sample_release().
> > > > +        */
> > > > +       cons_pos = smp_load_acquire(&rb->consumer_pos);
> > > > +       if (cons_pos >= prod_pos) {
> > > > +               atomic_set(&rb->busy, 0);
> > > > +               return -ENODATA;
> > > > +       }
> > > > +
> > > > +       hdr = (u32 *)((uintptr_t)rb->data + (cons_pos & rb->mask));
> > > > +       sample_len = *hdr;
> > >
> > > do we need smp_load_acquire() here? libbpf's ring_buffer
> > > implementation uses load_acquire here
> >
> > I thought about this when I was first adding the logic, but I can't
> > convince myself that it's necessary and wasn't able to figure out why we
> > did a load acquire on the len in libbpf. The kernel doesn't do a store
> > release on the header, so I'm not sure what the load acquire in libbpf
> > actually accomplishes. I could certainly be missing something, but I
> > _think_ the important thing is that we have load-acquire / store-release
> > pairs for the consumer and producer positions.
> 
> kernel does xchg on len on the kernel side, which is stronger than
> smp_store_release (I think it was Paul's suggestion instead of doing
> explicit memory barrier, but my memories are hazy for exact reasons).

Hmmm, yeah, for the kernel-producer ringbuffer, I believe the explicit
memory barrier is unnecessary because:

o The smp_store_release() on producer_pos provides ordering w.r.t.
  producer_pos, and the write to hdr->len which includes the busy-bit
  should therefore be visibile in libbpf, which does an
  smp_load_acquire().
o The xchg() when the sample is committed provides full barriers before
  and after, so the consumer is guaranteed to read the written contents of
  the sample IFF it also sees the BPF_RINGBUF_BUSY_BIT as unset.

I can't see any scenario in which a barrier would add synchronization not
already provided, though this stuff is tricky so I may have missed
something.

> Right now this might not be necessary, but if we add support for busy
> bit in a sample header, it will be closer to what BPF ringbuf is doing
> right now, with producer position being a reservation pointer, but
> sample itself won't be "readable" until sample header is updated and
> its busy bit is unset.

Regarding this patch, after thinking about this more I _think_ I do need
an xchg() (or equivalent operation with full barrier semantics) in
userspace when updating the producer_pos when committing the sample.
Which, after applying your suggestion (which I agree with) of supporting
BPF_RINGBUF_BUSY_BIT and BPF_RINGBUF_DISCARD_BIT from the get-go, would
similarly be an xchg() when setting the header, paired with an
smp_load_acquire() when reading the header in the kernel.

> > Yeah, I thought about this. I don't think there's any problem with clearing
> > busy before we schedule the irq_work_queue(). I elected to do this to err
> > on the side of simpler logic until we observed contention, but yeah, let me
> > just do the more performant thing here.
> 
> busy is like a global lock, so freeing it ASAP is good, so yeah,
> unless there are some bad implications, let's do it early

Ack.

[...]

> > > > +                       ret = callback((u64)&dynptr,
> > > > +                                       (u64)(uintptr_t)callback_ctx, 0, 0, 0);
> > > > +
> > > > +                       __bpf_user_ringbuf_sample_release(rb, size, flags);
> > > > +                       num_samples++;
> > > > +               }
> > > > +       } while (err == 0 && num_samples < 4096 && ret == 0);
> > > > +
> > >
> > > 4096 is pretty arbitrary. Definitely worth noting it somewhere and it
> > > seems somewhat low, tbh...
> > >
> > > ideally we'd cond_resched() from time to time, but that would require
> > > BPF program to be sleepable, so we can't do that :(
> >
> > Yeah, I knew this would come up in discussion. I would love to do
> > cond_resched() here, but as you said, I don't think it's an option :-/ And
> > given the fact that we're calling back into the BPF program, we have to be
> > cognizant of things taking a while and clogging up the CPU. What do you
> > think is a more reasonable number than 4096?
> 
> I don't know, tbh, but 4096 seems pretty low. For bpf_loop() we allow
> up to 2mln iterations. I'd bump it up to 64-128K range, probably. But
> also please move it into some internal #define'd constant, not some
> integer literal buried in a code

Sounds good to me. Maybe at some point we can make this configurable, or
something. If bpf_loop() allows a hard-coded number of iterations, I think
it's more forgivable to do the same here. I'll bump it up to 128k and move
it into a constant so it's not a magic number.

> >
> > [...]
> >
> > > >         case ARG_PTR_TO_DYNPTR:
> > > > +               /* We only need to check for initialized / uninitialized helper
> > > > +                * dynptr args if the dynptr is not MEM_ALLOC, as the assumption
> > > > +                * is that if it is, that a helper function initialized the
> > > > +                * dynptr on behalf of the BPF program.
> > > > +                */
> > > > +               if (reg->type & MEM_ALLOC)
> > >
> > > Isn't PTR_TO_DYNPTR enough indication? Do we need MEM_ALLOC modifier?
> > > Normal dynptr created and used inside BPF program on the stack are
> > > actually PTR_TO_STACK, so that should be enough distinction? Or am I
> > > missing something?
> >
> > I think this would also work in the current state of the codebase, but IIUC
> > it relies on PTR_TO_STACK being the only way that a BPF program could ever
> > allocate a dynptr. I was trying to guard against the case of a helper being
> > added in the future that e.g. returned a dynamically allocated dynptr that
> > the caller would eventually need to release in another helper call.
> > MEM_ALLOC seems like the correct modifier to more generally denote that the
> > dynptr was externally allocated.  If you think this is overkill I'm totally
> > fine with removing MEM_ALLOC. We can always add it down the road if we add
> > a new helper that requires it.
> >
> 
> Hm.. I don't see a huge need for more flags for this, so I'd keep it
> simple for now and if in the future we do have such a use case, we'll
> address it at that time?

Sounds good to me.

Thanks,
David

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 3/5] bpf: Add bpf_user_ringbuf_drain() helper
  2022-08-16 22:14         ` David Vernet
@ 2022-08-16 22:59           ` Andrii Nakryiko
  2022-08-17 20:24             ` David Vernet
  0 siblings, 1 reply; 28+ messages in thread
From: Andrii Nakryiko @ 2022-08-16 22:59 UTC (permalink / raw)
  To: David Vernet
  Cc: bpf, ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, joannelkoong, linux-kernel,
	Kernel-team

On Tue, Aug 16, 2022 at 3:14 PM David Vernet <void@manifault.com> wrote:
>
> On Tue, Aug 16, 2022 at 11:57:10AM -0700, Andrii Nakryiko wrote:
>
> [...]
>
> > > > > diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h
> > > > > index a341f877b230..ca125648d7fd 100644
> > > > > --- a/include/uapi/linux/bpf.h
> > > > > +++ b/include/uapi/linux/bpf.h
> > > > > @@ -5332,6 +5332,13 @@ union bpf_attr {
> > > > >   *             **-EACCES** if the SYN cookie is not valid.
> > > > >   *
> > > > >   *             **-EPROTONOSUPPORT** if CONFIG_IPV6 is not builtin.
> > > > > + *
> > > > > + * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags)
> > > > > + *     Description
> > > > > + *             Drain samples from the specified user ringbuffer, and invoke the
> > > > > + *             provided callback for each such sample.
> > > >
> > > > please specify what's the expected signature of callback_fn
> > >
> > > Will do, unfortunatley we're inconsistent in doing this in other helper
> > > functions, so it wasn't clear from context.
> >
> > That means we missed it for other helpers. The idea was to always
> > specify expected signature in UAPI comment, ideally we fix all the
> > missing cases.
>
> Agreed -- I'll take care of that as a follow-on patch-set.
>
> > > Agreed, I'll add a check and selftest for this.
> >
> > Yep, consider also adding few tests where user-space intentionally
> > breaks the contract to make sure that kernel stays intact (if you
> > already did that, apologies, I haven't looked at selftests much).
>
> The only negative tests I currently have from user-space are verifying
> that mapping permissions are correctly enforced. Happy to add some more
> that tests boundaries for other parts of the API -- I agree that's both
> useful and prudent.
>
> > > > > +       /* Synchronizes with smp_store_release() in
> > > > > +        * __bpf_user_ringbuf_sample_release().
> > > > > +        */
> > > > > +       cons_pos = smp_load_acquire(&rb->consumer_pos);
> > > > > +       if (cons_pos >= prod_pos) {
> > > > > +               atomic_set(&rb->busy, 0);
> > > > > +               return -ENODATA;
> > > > > +       }
> > > > > +
> > > > > +       hdr = (u32 *)((uintptr_t)rb->data + (cons_pos & rb->mask));
> > > > > +       sample_len = *hdr;
> > > >
> > > > do we need smp_load_acquire() here? libbpf's ring_buffer
> > > > implementation uses load_acquire here
> > >
> > > I thought about this when I was first adding the logic, but I can't
> > > convince myself that it's necessary and wasn't able to figure out why we
> > > did a load acquire on the len in libbpf. The kernel doesn't do a store
> > > release on the header, so I'm not sure what the load acquire in libbpf
> > > actually accomplishes. I could certainly be missing something, but I
> > > _think_ the important thing is that we have load-acquire / store-release
> > > pairs for the consumer and producer positions.
> >
> > kernel does xchg on len on the kernel side, which is stronger than
> > smp_store_release (I think it was Paul's suggestion instead of doing
> > explicit memory barrier, but my memories are hazy for exact reasons).
>
> Hmmm, yeah, for the kernel-producer ringbuffer, I believe the explicit
> memory barrier is unnecessary because:
>
> o The smp_store_release() on producer_pos provides ordering w.r.t.
>   producer_pos, and the write to hdr->len which includes the busy-bit
>   should therefore be visibile in libbpf, which does an
>   smp_load_acquire().
> o The xchg() when the sample is committed provides full barriers before
>   and after, so the consumer is guaranteed to read the written contents of
>   the sample IFF it also sees the BPF_RINGBUF_BUSY_BIT as unset.
>
> I can't see any scenario in which a barrier would add synchronization not
> already provided, though this stuff is tricky so I may have missed
> something.
>
> > Right now this might not be necessary, but if we add support for busy
> > bit in a sample header, it will be closer to what BPF ringbuf is doing
> > right now, with producer position being a reservation pointer, but
> > sample itself won't be "readable" until sample header is updated and
> > its busy bit is unset.
>
> Regarding this patch, after thinking about this more I _think_ I do need
> an xchg() (or equivalent operation with full barrier semantics) in
> userspace when updating the producer_pos when committing the sample.
> Which, after applying your suggestion (which I agree with) of supporting
> BPF_RINGBUF_BUSY_BIT and BPF_RINGBUF_DISCARD_BIT from the get-go, would
> similarly be an xchg() when setting the header, paired with an
> smp_load_acquire() when reading the header in the kernel.


right, I think __atomic_store_n() can be used in libbpf for this with
seq_cst ordering


>
> > > Yeah, I thought about this. I don't think there's any problem with clearing
> > > busy before we schedule the irq_work_queue(). I elected to do this to err
> > > on the side of simpler logic until we observed contention, but yeah, let me
> > > just do the more performant thing here.
> >
> > busy is like a global lock, so freeing it ASAP is good, so yeah,
> > unless there are some bad implications, let's do it early
>
> Ack.
>
> [...]
>
> > > > > +                       ret = callback((u64)&dynptr,
> > > > > +                                       (u64)(uintptr_t)callback_ctx, 0, 0, 0);
> > > > > +
> > > > > +                       __bpf_user_ringbuf_sample_release(rb, size, flags);
> > > > > +                       num_samples++;
> > > > > +               }
> > > > > +       } while (err == 0 && num_samples < 4096 && ret == 0);
> > > > > +
> > > >
> > > > 4096 is pretty arbitrary. Definitely worth noting it somewhere and it
> > > > seems somewhat low, tbh...
> > > >
> > > > ideally we'd cond_resched() from time to time, but that would require
> > > > BPF program to be sleepable, so we can't do that :(
> > >
> > > Yeah, I knew this would come up in discussion. I would love to do
> > > cond_resched() here, but as you said, I don't think it's an option :-/ And
> > > given the fact that we're calling back into the BPF program, we have to be
> > > cognizant of things taking a while and clogging up the CPU. What do you
> > > think is a more reasonable number than 4096?
> >
> > I don't know, tbh, but 4096 seems pretty low. For bpf_loop() we allow
> > up to 2mln iterations. I'd bump it up to 64-128K range, probably. But
> > also please move it into some internal #define'd constant, not some
> > integer literal buried in a code
>
> Sounds good to me. Maybe at some point we can make this configurable, or
> something. If bpf_loop() allows a hard-coded number of iterations, I think
> it's more forgivable to do the same here. I'll bump it up to 128k and move
> it into a constant so it's not a magic number.
>
> > >
> > > [...]
> > >
> > > > >         case ARG_PTR_TO_DYNPTR:
> > > > > +               /* We only need to check for initialized / uninitialized helper
> > > > > +                * dynptr args if the dynptr is not MEM_ALLOC, as the assumption
> > > > > +                * is that if it is, that a helper function initialized the
> > > > > +                * dynptr on behalf of the BPF program.
> > > > > +                */
> > > > > +               if (reg->type & MEM_ALLOC)
> > > >
> > > > Isn't PTR_TO_DYNPTR enough indication? Do we need MEM_ALLOC modifier?
> > > > Normal dynptr created and used inside BPF program on the stack are
> > > > actually PTR_TO_STACK, so that should be enough distinction? Or am I
> > > > missing something?
> > >
> > > I think this would also work in the current state of the codebase, but IIUC
> > > it relies on PTR_TO_STACK being the only way that a BPF program could ever
> > > allocate a dynptr. I was trying to guard against the case of a helper being
> > > added in the future that e.g. returned a dynamically allocated dynptr that
> > > the caller would eventually need to release in another helper call.
> > > MEM_ALLOC seems like the correct modifier to more generally denote that the
> > > dynptr was externally allocated.  If you think this is overkill I'm totally
> > > fine with removing MEM_ALLOC. We can always add it down the road if we add
> > > a new helper that requires it.
> > >
> >
> > Hm.. I don't see a huge need for more flags for this, so I'd keep it
> > simple for now and if in the future we do have such a use case, we'll
> > address it at that time?
>
> Sounds good to me.
>
> Thanks,
> David

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 4/5] bpf: Add libbpf logic for user-space ring buffer
  2022-08-16 19:09       ` Andrii Nakryiko
@ 2022-08-17 14:02         ` David Vernet
  2022-08-18  3:05           ` David Vernet
  0 siblings, 1 reply; 28+ messages in thread
From: David Vernet @ 2022-08-17 14:02 UTC (permalink / raw)
  To: Andrii Nakryiko
  Cc: bpf, ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, joannelkoong, linux-kernel,
	Kernel-team

On Tue, Aug 16, 2022 at 12:09:53PM -0700, Andrii Nakryiko wrote:
> > > > +void ring_buffer_user__submit(struct ring_buffer_user *rb, void *sample)
> > > > +{
> > > > +       __ring_buffer_user__commit(rb);
> > > > +}
> > >
> > > this made me think that it's probably best to add kernel support for
> > > busy bit anyways (just like for existing ringbuf), so that we can
> > > eventually turn this into multi-producer on user-space side (all we
> > > need is a lock, really). So let's anticipate that on kernel side from
> > > the very beginning
> >
> > Hmm, yeah, fair enough. We have the extra space in the sample header to OR the
> > busy bit, and we already have a 2-stage reserve -> commit workflow, so we might
> > as well. I'll go ahead and add this, and then hopefully someday we can just add
> > a lock, as you mentioned.
> 
> Right. We can probably also just document that reserve() step is the
> only one that needs serialization among multiple producers (and
> currently is required to taken care of by user app), while commit
> (submit/discard) operation is thread-safe and needs no
> synchronization.

Sounds good.

> The only reason we don't add it to libbpf right now is because we are
> unsure about taking explicit dependency on pthread library. But I also
> just found [0], so I don't know, maybe we should use that? I wonder if
> it's supported by musl and other less full-featured libc
> implementations, though.
> 
>   [0] https://www.gnu.org/software/libc/manual/html_node/ISO-C-Mutexes.html

IMHO, and others may disagree, if it's in the C standard it seems like it
should be fair game to add to libbpf? Also FWIW, it looks like musl does
support it.  See mtx_*.c in [0].

[0] https://git.musl-libc.org/cgit/musl/tree/src/thread

That being said, I would like to try and keep this decision outside the
scope of user-ringbuf though, if possible. Would you be OK this landing
as is (modulo further discussion, revisions, etc, of course), and then
we can update this implementation to be multi-producer if and when we've
added something like mtx_t support in a follow-on patch-set?

[...]

> > > > +/* Poll for available space in the ringbuffer, and reserve a record when it
> > > > + * becomes available.
> > > > + */
> > > > +void *ring_buffer_user__poll(struct ring_buffer_user *rb, uint32_t size,
> > > > +                            int timeout_ms)
> > > > +{
> > > > +       int cnt;
> > > > +
> > > > +       cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, timeout_ms);
> > > > +       if (cnt < 0)
> > > > +               return NULL;
> > > > +
> > > > +       return ring_buffer_user__reserve(rb, size);
> > >
> > > it's not clear how just doing epoll_wait() guarantees that we have >=
> > > size of space available?.. Seems like some tests are missing?
> >
> > Right now, the kernel only kicks the polling writer once it's drained all
> > of the samples from the ring buffer. So at this point, if there's not
> > enough size in the buffer, there would be nothing we could do regardless.
> > This seemed like reasonable, simple behavior for the initial
> > implementation. I can make it a bit more intelligent if you'd like, and
> > return EPOLLRWNORM as soon as there is any space in the buffer, and have
> > libbpf potentially make multiple calls to epoll_wait() until enough space
> > has become available.
> 
> So this "drain all samples" notion is not great: you can end drain
> prematurely and thus not really drain all the data in ringbuf.With
> multiple producers there could also be always more data coming in in
> parallel. Plus, when in the future we'll have BPF program associated
> with such ringbuf on the kernel side, we won't have a notion of
> draining queue, we'll be just submitting record and letting kernel
> handle it eventually.

I don't disagree with any of your points. I think what we'll have to
decide-on is a trade-off between performance and usability. As you pointed
out, if we only kick user-space once the ringbuffer is empty, that imposes
the requirement on the kernel that it will always drain the ringbuffer.
That might not even be possible though if we have multiple producers
posting data in parallel.

More on this below, but the TL;DR is that I agree with you, and I think
having a model where we kick user-space whenever a sample is consumed from
the buffer is a lot easier to reason about, and probably our only option if
our plan is to make the ringbuffer MPMC. I'll make this change in v3.

> So I think yeah, you'd have to send notification when at least one
> sample gets consumed. The problem is that it's going to be a
> performance hit, potentially, if you are going to do this notification
> for each consumed sample. BPF ringbuf gets somewhat around that by
> using heuristic to avoid notification if we see that consumer is still
> behind kernel when kernel submits a new sample.

Something perhaps worth pointing out here is that this heuristic works
because the kernel-producer ringbuffer is MPSC. If it were MPMC, we'd
potentially have the same problem you pointed out above where you'd never
wake up an epoll-waiter because other consumers would drain the buffer, and
by the time the kernel got around to posting another sample, could observe
that consumer_pos == producer_pos, and either wouldn't wake up anyone on
the waitq, or wouldn't return any events from ringbuf_map_poll(). If our
intention is to make user-space ringbuffers MPMC, it becomes more difficult
to use these nice heuristics.

> I don't know if we can do anything clever here for waiting for some
> space to be available...  Any thoughts?

Hmmm, yeah, nothing clever is coming to mind. The problem is that we can't
make assumptions about why user-space would be epoll-waiting on the
ringbuffer because because it's a producer, and the user-space producer is
free to post variably sized samples.

For example, I was initially considering whether we could do a heuristic
where we notify the producer only if the buffer was previously full /
producer_pos was ringbuf size away from consumer_pos when we drained a
sample, but that doesn't work at all because there could be space in the
ringbuffer, but user-space is epoll-waiting for *more* space to become
available for some large sample that it wants to publish.

I think the only options we have are:

1. Send a notification (i.e. schedule bpf_ringbuf_notify() using
   irq_work_queue(), and then return EPOLLOUT | EPOLLWRNORM if
   the ringbuffer is not full), every time a sample is drained.

2. Keep the behavior in v1, wherein we have a contract that the kernel will
   always eventually drain the ringbuffer, and will kick user-space when the
   buffer is empty. I think a requirement here would also be that the
   ringbuffer would be SPMC, or we decide that it's acceptable for some
   producers to sleep indefinitely if other producers keep reading samples,
   and that the caller just needs to be aware of this as a possibility.

My two cents are that we go with (1), and then consider our options later
if we need to optimize.

> As for making libbpf loop until enough space is available... I guess
> that would be the only reasonable implementation, right? I wonder if
> calling it "user_ring_buffer__reserve_blocking()" would be a better
> name than just "poll", though?

I went with user_ring_buffer__poll() to match the complementary function
for the user-space consumer function for kernel-producer ringbuffers:
ring_buffer__poll(). I personally prefer
user_ring_buffer__reserve_blocking() because the fact that it's doing an
epoll-wait is entirely an implementation detail. I'll go ahead and make
that change in v3.

Thanks,
David

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 3/5] bpf: Add bpf_user_ringbuf_drain() helper
  2022-08-16 22:59           ` Andrii Nakryiko
@ 2022-08-17 20:24             ` David Vernet
  2022-08-17 21:03               ` David Vernet
  0 siblings, 1 reply; 28+ messages in thread
From: David Vernet @ 2022-08-17 20:24 UTC (permalink / raw)
  To: Andrii Nakryiko
  Cc: bpf, ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, joannelkoong, linux-kernel,
	Kernel-team

On Tue, Aug 16, 2022 at 03:59:54PM -0700, Andrii Nakryiko wrote:

[...]

> > > > > > +       /* Synchronizes with smp_store_release() in
> > > > > > +        * __bpf_user_ringbuf_sample_release().
> > > > > > +        */
> > > > > > +       cons_pos = smp_load_acquire(&rb->consumer_pos);
> > > > > > +       if (cons_pos >= prod_pos) {
> > > > > > +               atomic_set(&rb->busy, 0);
> > > > > > +               return -ENODATA;
> > > > > > +       }
> > > > > > +
> > > > > > +       hdr = (u32 *)((uintptr_t)rb->data + (cons_pos & rb->mask));
> > > > > > +       sample_len = *hdr;
> > > > >
> > > > > do we need smp_load_acquire() here? libbpf's ring_buffer
> > > > > implementation uses load_acquire here
> > > >
> > > > I thought about this when I was first adding the logic, but I can't
> > > > convince myself that it's necessary and wasn't able to figure out why we
> > > > did a load acquire on the len in libbpf. The kernel doesn't do a store
> > > > release on the header, so I'm not sure what the load acquire in libbpf
> > > > actually accomplishes. I could certainly be missing something, but I
> > > > _think_ the important thing is that we have load-acquire / store-release
> > > > pairs for the consumer and producer positions.
> > >
> > > kernel does xchg on len on the kernel side, which is stronger than
> > > smp_store_release (I think it was Paul's suggestion instead of doing
> > > explicit memory barrier, but my memories are hazy for exact reasons).
> >
> > Hmmm, yeah, for the kernel-producer ringbuffer, I believe the explicit
> > memory barrier is unnecessary because:
> >
> > o The smp_store_release() on producer_pos provides ordering w.r.t.
> >   producer_pos, and the write to hdr->len which includes the busy-bit
> >   should therefore be visibile in libbpf, which does an
> >   smp_load_acquire().
> > o The xchg() when the sample is committed provides full barriers before
> >   and after, so the consumer is guaranteed to read the written contents of
> >   the sample IFF it also sees the BPF_RINGBUF_BUSY_BIT as unset.
> >
> > I can't see any scenario in which a barrier would add synchronization not
> > already provided, though this stuff is tricky so I may have missed
> > something.
> >
> > > Right now this might not be necessary, but if we add support for busy
> > > bit in a sample header, it will be closer to what BPF ringbuf is doing
> > > right now, with producer position being a reservation pointer, but
> > > sample itself won't be "readable" until sample header is updated and
> > > its busy bit is unset.
> >
> > Regarding this patch, after thinking about this more I _think_ I do need
> > an xchg() (or equivalent operation with full barrier semantics) in
> > userspace when updating the producer_pos when committing the sample.
> > Which, after applying your suggestion (which I agree with) of supporting
> > BPF_RINGBUF_BUSY_BIT and BPF_RINGBUF_DISCARD_BIT from the get-go, would
> > similarly be an xchg() when setting the header, paired with an
> > smp_load_acquire() when reading the header in the kernel.
> 
> 
> right, I think __atomic_store_n() can be used in libbpf for this with
> seq_cst ordering

__atomic_store_n(__ATOMIC_SEQ_CST) will do the correct thing on x86, but it
is not guaranteed to provide a full acq/rel barrier according to the C
standard. __atomic_store_n(__ATOMIC_SEQ_CST) means "store-release, and also
participates in the sequentially-consistent global ordering".

I believe we actually need an __atomic_store_n(__ATOMIC_ACQ_REL) here. I
don't _think_ __ATOMIC_SEQ_CST is necessary because we're not reliant on
any type of global, SC ordering. We just need to ensure that the sample is
only visible after the busy bit has been removed (or discard bit added) to
the header.

Thanks,
David

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 3/5] bpf: Add bpf_user_ringbuf_drain() helper
  2022-08-17 20:24             ` David Vernet
@ 2022-08-17 21:03               ` David Vernet
  0 siblings, 0 replies; 28+ messages in thread
From: David Vernet @ 2022-08-17 21:03 UTC (permalink / raw)
  To: Andrii Nakryiko
  Cc: bpf, ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, joannelkoong, linux-kernel,
	Kernel-team

On Wed, Aug 17, 2022 at 03:24:40PM -0500, David Vernet wrote:

> [...]
> > right, I think __atomic_store_n() can be used in libbpf for this with
> > seq_cst ordering
> 
> __atomic_store_n(__ATOMIC_SEQ_CST) will do the correct thing on x86, but it
> is not guaranteed to provide a full acq/rel barrier according to the C
> standard. __atomic_store_n(__ATOMIC_SEQ_CST) means "store-release, and also
> participates in the sequentially-consistent global ordering".
> 
> I believe we actually need an __atomic_store_n(__ATOMIC_ACQ_REL) here. I

Sorry, I meant __atomic_exchange_n() rather than __atomic_store_n().

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 4/5] bpf: Add libbpf logic for user-space ring buffer
  2022-08-17 14:02         ` David Vernet
@ 2022-08-18  3:05           ` David Vernet
  0 siblings, 0 replies; 28+ messages in thread
From: David Vernet @ 2022-08-18  3:05 UTC (permalink / raw)
  To: Andrii Nakryiko
  Cc: bpf, ast, daniel, andrii, john.fastabend, martin.lau, song, yhs,
	kpsingh, sdf, haoluo, jolsa, tj, joannelkoong, linux-kernel,
	Kernel-team

On Wed, Aug 17, 2022 at 09:02:14AM -0500, David Vernet wrote:

[...]

> > > > > +/* Poll for available space in the ringbuffer, and reserve a record when it
> > > > > + * becomes available.
> > > > > + */
> > > > > +void *ring_buffer_user__poll(struct ring_buffer_user *rb, uint32_t size,
> > > > > +                            int timeout_ms)
> > > > > +{
> > > > > +       int cnt;
> > > > > +
> > > > > +       cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, timeout_ms);
> > > > > +       if (cnt < 0)
> > > > > +               return NULL;
> > > > > +
> > > > > +       return ring_buffer_user__reserve(rb, size);
> > > >
> > > > it's not clear how just doing epoll_wait() guarantees that we have >=
> > > > size of space available?.. Seems like some tests are missing?
> > >
> > > Right now, the kernel only kicks the polling writer once it's drained all
> > > of the samples from the ring buffer. So at this point, if there's not
> > > enough size in the buffer, there would be nothing we could do regardless.
> > > This seemed like reasonable, simple behavior for the initial
> > > implementation. I can make it a bit more intelligent if you'd like, and
> > > return EPOLLRWNORM as soon as there is any space in the buffer, and have
> > > libbpf potentially make multiple calls to epoll_wait() until enough space
> > > has become available.
> > 
> > So this "drain all samples" notion is not great: you can end drain
> > prematurely and thus not really drain all the data in ringbuf.With
> > multiple producers there could also be always more data coming in in
> > parallel. Plus, when in the future we'll have BPF program associated
> > with such ringbuf on the kernel side, we won't have a notion of
> > draining queue, we'll be just submitting record and letting kernel
> > handle it eventually.
> 
> I don't disagree with any of your points. I think what we'll have to
> decide-on is a trade-off between performance and usability. As you pointed
> out, if we only kick user-space once the ringbuffer is empty, that imposes
> the requirement on the kernel that it will always drain the ringbuffer.
> That might not even be possible though if we have multiple producers
> posting data in parallel.
> 
> More on this below, but the TL;DR is that I agree with you, and I think
> having a model where we kick user-space whenever a sample is consumed from
> the buffer is a lot easier to reason about, and probably our only option if
> our plan is to make the ringbuffer MPMC. I'll make this change in v3.
> 
> > So I think yeah, you'd have to send notification when at least one
> > sample gets consumed. The problem is that it's going to be a
> > performance hit, potentially, if you are going to do this notification
> > for each consumed sample. BPF ringbuf gets somewhat around that by
> > using heuristic to avoid notification if we see that consumer is still
> > behind kernel when kernel submits a new sample.
> 
> Something perhaps worth pointing out here is that this heuristic works
> because the kernel-producer ringbuffer is MPSC. If it were MPMC, we'd
> potentially have the same problem you pointed out above where you'd never
> wake up an epoll-waiter because other consumers would drain the buffer, and
> by the time the kernel got around to posting another sample, could observe
> that consumer_pos == producer_pos, and either wouldn't wake up anyone on
> the waitq, or wouldn't return any events from ringbuf_map_poll(). If our
> intention is to make user-space ringbuffers MPMC, it becomes more difficult
> to use these nice heuristics.
> 
> > I don't know if we can do anything clever here for waiting for some
> > space to be available...  Any thoughts?
> 
> Hmmm, yeah, nothing clever is coming to mind. The problem is that we can't
> make assumptions about why user-space would be epoll-waiting on the
> ringbuffer because because it's a producer, and the user-space producer is
> free to post variably sized samples.
> 
> For example, I was initially considering whether we could do a heuristic
> where we notify the producer only if the buffer was previously full /
> producer_pos was ringbuf size away from consumer_pos when we drained a
> sample, but that doesn't work at all because there could be space in the
> ringbuffer, but user-space is epoll-waiting for *more* space to become
> available for some large sample that it wants to publish.
> 
> I think the only options we have are:
> 
> 1. Send a notification (i.e. schedule bpf_ringbuf_notify() using
>    irq_work_queue(), and then return EPOLLOUT | EPOLLWRNORM if
>    the ringbuffer is not full), every time a sample is drained.
> 
> 2. Keep the behavior in v1, wherein we have a contract that the kernel will
>    always eventually drain the ringbuffer, and will kick user-space when the
>    buffer is empty. I think a requirement here would also be that the
>    ringbuffer would be SPMC, or we decide that it's acceptable for some
>    producers to sleep indefinitely if other producers keep reading samples,
>    and that the caller just needs to be aware of this as a possibility.
> 
> My two cents are that we go with (1), and then consider our options later
> if we need to optimize.

Unfortunately this causes the system to hang because of how many times
we're invoking irq_work_queue() in rapid succession.

So, I'm not sure that notifying the user-space producer after _every_
sample is going to work. Another option that occurred to me, however, is to
augment the initial approach taken in v1. Rather than only sending a
notification when the ringbuffer is empty, we could instead guarantee that
we'll always send a notification when bpf_ringbuf_drain() is called and at
least one message was consumed. This is slightly stronger than our earlier
guarantee because we may send a notification even if the ringbuffer is only
partially drained, and it avoids some of the associated pitfalls that you
pointed out (e.g. that a BPF program is now responsible for always draining
the ringbuffer in order to ensure that an epoll-waiting user-space producer
will always be woken up).

The down-side of this is that it still may not be aggressive enough in
waking up user-space producers who could have had data to post as soon as
space was available in the ringbuffer. It won't potentially cause them to
block indefinitely as the first approach did, but they'll still have to
wait for the end of bpf_ringbuf_drain() to be woken up, when they could
potentially have been woken up sooner to start publishing samples. It's
hard to say whether this will be prohibitive to the feature being usable,
but in my opinion it's more useful to have this functionality (and make its
behavior very clearly described in comments and the public documentation
for the APIs) in an imperfect form than to not have it at all.

Additionally, a plus-side to this approach is that it gives us some
flexibility to send more event notifications if we want to. For example, we
could add a heuristic where we always send a notification if the buffer was
previously full before a sample was consumed (in contrast to always
invoking irq_work_queue() _whenever_ a sample is consumed), and then always
send a notification at the end of bpf_ringbuf_drain() to catch any
producers who were waiting for a larger sample than what was afforded by
the initial heuristic notification. I think this should address the likely
common use-case of samples being statically sized (so posting an event as
soon as the ringbuffer is no longer empty should almost always be
sufficient).

We discussed this offline, so I'll go ahead and implement it for the next
version. One thing we didn't discuss was the example heuristic described
above, but I'm going to include that because it seems like the
complementary behavior to what we do for kernel-producer ringbuffers. If
folks aren't a fan, we can remove it in favor of only sending a single
notification in bpf_ringbuf_drain().

Thanks,
David

^ permalink raw reply	[flat|nested] 28+ messages in thread

end of thread, other threads:[~2022-08-18  3:06 UTC | newest]

Thread overview: 28+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2022-08-08 15:53 [PATCH 1/5] bpf: Clear callee saved regs after updating REG0 David Vernet
2022-08-08 15:53 ` [PATCH 2/5] bpf: Define new BPF_MAP_TYPE_USER_RINGBUF map type David Vernet
2022-08-11 23:22   ` Andrii Nakryiko
2022-08-12 16:21     ` David Vernet
2022-08-11 23:29   ` Andrii Nakryiko
2022-08-12 16:23     ` David Vernet
2022-08-16 18:43       ` Andrii Nakryiko
2022-08-08 15:53 ` [PATCH 3/5] bpf: Add bpf_user_ringbuf_drain() helper David Vernet
2022-08-08 21:55   ` kernel test robot
2022-08-11 23:22   ` Andrii Nakryiko
2022-08-12  0:01     ` David Vernet
2022-08-12  0:46     ` David Vernet
2022-08-16 18:57       ` Andrii Nakryiko
2022-08-16 22:14         ` David Vernet
2022-08-16 22:59           ` Andrii Nakryiko
2022-08-17 20:24             ` David Vernet
2022-08-17 21:03               ` David Vernet
2022-08-08 15:53 ` [PATCH 4/5] bpf: Add libbpf logic for user-space ring buffer David Vernet
2022-08-11 23:39   ` Andrii Nakryiko
2022-08-12 17:28     ` David Vernet
2022-08-16 19:09       ` Andrii Nakryiko
2022-08-17 14:02         ` David Vernet
2022-08-18  3:05           ` David Vernet
2022-08-08 15:53 ` [PATCH 5/5] selftests/bpf: Add selftests validating the user ringbuf David Vernet
2022-08-08 18:14 ` [PATCH 1/5] bpf: Clear callee saved regs after updating REG0 Joanne Koong
2022-08-08 18:50   ` David Vernet
2022-08-08 23:32     ` Joanne Koong
2022-08-09 12:47       ` David Vernet

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).