RCU Archive on lore.kernel.org
 help / color / Atom feed
* [PATCH v2 1/2] rcu/tree: Add basic support for kfree_rcu batching
@ 2019-08-11  2:49 Joel Fernandes (Google)
  2019-08-11  2:49 ` [PATCH v2 2/2] rcuperf: Add kfree_rcu performance Tests Joel Fernandes (Google)
  2019-08-12 10:29 ` [PATCH v2 1/2] rcu/tree: Add basic support for kfree_rcu batching Byungchul Park
  0 siblings, 2 replies; 4+ messages in thread
From: Joel Fernandes (Google) @ 2019-08-11  2:49 UTC (permalink / raw)
  To: linux-kernel
  Cc: Joel Fernandes (Google),
	Rao Shoaib, max.byungchul.park, byungchul.park, kernel-team,
	kernel-team, Andrew Morton, Davidlohr Bueso, Josh Triplett,
	Lai Jiangshan, linux-doc, Mathieu Desnoyers, Paul E. McKenney,
	rcu, Steven Rostedt

Recently a discussion about stability and performance of a system
involving a high rate of kfree_rcu() calls surfaced on the list [1]
which led to another discussion how to prepare for this situation.

This patch adds basic batching support for kfree_rcu(). It is "basic"
because we do none of the slab management, dynamic allocation, code
moving or any of the other things, some of which previous attempts did
[2]. These fancier improvements can be follow-up patches and there are
several ideas being discussed in those regards.

Torture tests follow in the next patch and show improvements of around
400% in reduction of number of  grace periods on a 16 CPU system. More
details and test data are in that patch.

This is an effort to start simple, and build up from there. In the
future, an extension to use kfree_bulk and possibly per-slab batching
could be done to further improve performance due to cache-locality and
slab-specific bulk free optimizations.

There is an implication with rcu_barrier() with this patch. Since the
kfree_rcu() calls can be batched, and may not be handed yet to the RCU
machinery in fact, the monitor may not have even run yet to do the
queue_rcu_work(), there seems no easy way of implementing rcu_barrier()
to wait for those kfree_rcu()s that are already made. So this means a
kfree_rcu() followed by an rcu_barrier() does not imply that memory will
be freed once rcu_barrier() returns.

Another implication is higher active memory usage (although not
run-away..) until the kfree_rcu() flooding ends, in comparison to
without batching. More details about this are in the second patch which
adds an rcuperf test.

Finally, in the future we should get rid of kfree_rcu special casing
within RCU such as in rcu_do_batch and switch everything to just
batching. Currently we don't do that since timer subsystem is not yet up
and we cannot schedule the kfree_rcu() monitor as the timer subsystem's
lock are not initialized. That would also mean getting rid of
kfree_call_rcu_nobatch() entirely.

[1] http://lore.kernel.org/lkml/20190723035725-mutt-send-email-mst@kernel.org
[2] https://lkml.org/lkml/2017/12/19/824

Cc: Rao Shoaib <rao.shoaib@oracle.com>
Cc: max.byungchul.park@gmail.com
Cc: byungchul.park@lge.com
Cc: kernel-team@android.com
Cc: kernel-team@lge.com
Co-developed-by: Byungchul Park <byungchul.park@lge.com>
Signed-off-by: Byungchul Park <byungchul.park@lge.com>
Signed-off-by: Joel Fernandes (Google) <joel@joelfernandes.org>

---
RFC v1-> PATCH v2: Removed limits on the ->head list, just let it grow.
                   Dropped KFREE_MAX_JIFFIES to HZ/50 from HZ/20 to reduce OOM occurrence.
                   Removed sleeps in rcuperf test, just using cond_resched()in loop.
                   Better code comments ;)

 .../admin-guide/kernel-parameters.txt         |  17 ++
 include/linux/rcutiny.h                       |   5 +
 include/linux/rcutree.h                       |   1 +
 kernel/rcu/tree.c                             | 213 +++++++++++++++++-
 4 files changed, 230 insertions(+), 6 deletions(-)

diff --git a/Documentation/admin-guide/kernel-parameters.txt b/Documentation/admin-guide/kernel-parameters.txt
index 7ccd158b3894..a9156ca5de24 100644
--- a/Documentation/admin-guide/kernel-parameters.txt
+++ b/Documentation/admin-guide/kernel-parameters.txt
@@ -3895,6 +3895,23 @@
 			test until boot completes in order to avoid
 			interference.
 
+	rcuperf.kfree_rcu_test= [KNL]
+			Set to measure performance of kfree_rcu() flooding.
+
+	rcuperf.kfree_nthreads= [KNL]
+			The number of threads running loops of kfree_rcu().
+
+	rcuperf.kfree_alloc_num= [KNL]
+			Number of allocations and frees done in an iteration.
+
+	rcuperf.kfree_loops= [KNL]
+			Number of loops doing rcuperf.kfree_alloc_num number
+			of allocations and frees.
+
+	rcuperf.kfree_no_batch= [KNL]
+			Use the non-batching (slower) version of kfree_rcu.
+			This is useful for comparing with the batched version.
+
 	rcuperf.nreaders= [KNL]
 			Set number of RCU readers.  The value -1 selects
 			N, where N is the number of CPUs.  A value
diff --git a/include/linux/rcutiny.h b/include/linux/rcutiny.h
index 8e727f57d814..383f2481750f 100644
--- a/include/linux/rcutiny.h
+++ b/include/linux/rcutiny.h
@@ -39,6 +39,11 @@ static inline void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
 	call_rcu(head, func);
 }
 
+static inline void kfree_call_rcu_nobatch(struct rcu_head *head, rcu_callback_t func)
+{
+	call_rcu(head, func);
+}
+
 void rcu_qs(void);
 
 static inline void rcu_softirq_qs(void)
diff --git a/include/linux/rcutree.h b/include/linux/rcutree.h
index 735601ac27d3..7e38b39ec634 100644
--- a/include/linux/rcutree.h
+++ b/include/linux/rcutree.h
@@ -34,6 +34,7 @@ static inline void rcu_virt_note_context_switch(int cpu)
 
 void synchronize_rcu_expedited(void);
 void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func);
+void kfree_call_rcu_nobatch(struct rcu_head *head, rcu_callback_t func);
 
 void rcu_barrier(void);
 bool rcu_eqs_special_set(int cpu);
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index a14e5fbbea46..102a5f606d78 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -2593,17 +2593,204 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func)
 }
 EXPORT_SYMBOL_GPL(call_rcu);
 
+
+/* Maximum number of jiffies to wait before draining a batch. */
+#define KFREE_DRAIN_JIFFIES (HZ / 50)
+
 /*
- * Queue an RCU callback for lazy invocation after a grace period.
- * This will likely be later named something like "call_rcu_lazy()",
- * but this change will require some way of tagging the lazy RCU
- * callbacks in the list of pending callbacks. Until then, this
- * function may only be called from __kfree_rcu().
+ * Maximum number of kfree(s) to batch, if this limit is hit then the batch of
+ * kfree(s) is queued for freeing after a grace period, right away.
  */
-void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
+struct kfree_rcu_cpu {
+	/* The rcu_work node for queuing work with queue_rcu_work(). The work
+	 * is done after a grace period.
+	 */
+	struct rcu_work rcu_work;
+
+	/* The list of objects being queued in a batch but are not yet
+	 * scheduled to be freed.
+	 */
+	struct rcu_head *head;
+
+	/* The list of objects that have now left ->head and are queued for
+	 * freeing after a grace period.
+	 */
+	struct rcu_head *head_free;
+
+	/* Protect concurrent access to this structure. */
+	spinlock_t lock;
+
+	/* The delayed work that flushes ->head to ->head_free incase ->head
+	 * did not reach a length of KFREE_MAX_BATCH within KFREE_DRAIN_JIFFIES.
+	 * In case flushing cannot be done if RCU is busy, then ->head just
+	 * continues to grow beyond KFREE_MAX_BATCH and we retry flushing later.
+	 */
+	struct delayed_work monitor_work;
+	bool monitor_todo;	/* Is a delayed work pending execution? */
+};
+
+static DEFINE_PER_CPU(struct kfree_rcu_cpu, krc);
+
+/*
+ * This function is invoked in workqueue context after a grace period.
+ * It frees all the objects queued on ->head_free.
+ */
+static void kfree_rcu_work(struct work_struct *work)
+{
+	unsigned long flags;
+	struct rcu_head *head, *next;
+	struct kfree_rcu_cpu *krcp = container_of(to_rcu_work(work),
+					struct kfree_rcu_cpu, rcu_work);
+
+	spin_lock_irqsave(&krcp->lock, flags);
+	head = krcp->head_free;
+	krcp->head_free = NULL;
+	spin_unlock_irqrestore(&krcp->lock, flags);
+
+	/*
+	 * The head is detached and not referenced from anywhere, so lockless
+	 * access is Ok.
+	 */
+	for (; head; head = next) {
+		next = head->next;
+		head->next = NULL;
+		/* Could be possible to optimize with kfree_bulk in future */
+		__rcu_reclaim(rcu_state.name, head);
+		cond_resched_tasks_rcu_qs();
+	}
+}
+
+/*
+ * Schedule the kfree batch RCU work to run in workqueue context after a GP.
+ *
+ * This function is invoked when the ->head batch has reached size
+ * KFREE_MAX_BATCH or when kfree_rcu_monitor() sees that the
+ * KFREE_DRAIN_JIFFIES timeout has been reached.
+ *
+ * It can also be called from IRQ context if ->head fills up completely.
+ * (i.e. ->head reaches KFREE_MAX_BATCH under heavy kfree_rcu() load
+ *  and that last kfree_rcu() in the ->head batch happened from IRQ context).
+ */
+static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp)
+{
+	lockdep_assert_held(&krcp->lock);
+
+	/* If a previous RCU batch work is already in progress, we cannot queue
+	 * another one, just refuse the optimization and it will be retried
+	 * again in KFREE_DRAIN_JIFFIES time.
+	 */
+	if (krcp->head_free)
+		return false;
+
+	krcp->head_free = krcp->head;
+	krcp->head = NULL;
+	INIT_RCU_WORK(&krcp->rcu_work, kfree_rcu_work);
+	queue_rcu_work(system_wq, &krcp->rcu_work);
+
+	return true;
+}
+
+static inline void kfree_rcu_drain_unlock(struct kfree_rcu_cpu *krcp,
+				   unsigned long flags)
+{
+	/* Flush ->head to ->head_free, all objects on ->head_free will be
+	 * kfree'd after a grace period.
+	 */
+	if (queue_kfree_rcu_work(krcp)) {
+		/* Success! Our job is done here. */
+		spin_unlock_irqrestore(&krcp->lock, flags);
+		return;
+	}
+
+	/* Previous batch did not get free yet, let us try again soon. */
+	if (krcp->monitor_todo == false) {
+		schedule_delayed_work_on(smp_processor_id(),
+				&krcp->monitor_work,  KFREE_DRAIN_JIFFIES);
+		krcp->monitor_todo = true;
+	}
+	spin_unlock_irqrestore(&krcp->lock, flags);
+}
+
+/*
+ * This function is invoked after the KFREE_DRAIN_JIFFIES timeout has elapsed,
+ * so it drains the specified kfree_rcu_cpu structure's ->head list.
+ */
+static void kfree_rcu_monitor(struct work_struct *work)
+{
+	bool todo;
+	unsigned long flags;
+	struct kfree_rcu_cpu *krcp = container_of(work, struct kfree_rcu_cpu,
+						 monitor_work.work);
+
+	spin_lock_irqsave(&krcp->lock, flags);
+	todo = krcp->monitor_todo;
+	krcp->monitor_todo = false;
+	if (todo)
+		kfree_rcu_drain_unlock(krcp, flags);
+	else
+		spin_unlock_irqrestore(&krcp->lock, flags);
+}
+
+/*
+ * This version of kfree_call_rcu does not do batching of kfree_rcu() requests.
+ * Used only by rcuperf torture test for comparison with kfree_rcu_batch().
+ */
+void kfree_call_rcu_nobatch(struct rcu_head *head, rcu_callback_t func)
 {
 	__call_rcu(head, func, -1, 1);
 }
+EXPORT_SYMBOL_GPL(kfree_call_rcu_nobatch);
+
+/*
+ * Queue a request for lazy invocation of kfree() after a grace period.
+ *
+ * Each kfree_call_rcu() request is added to a batch. Once the batch reaches a
+ * threshold of KFREE_MAX_BATCH, all the objects in the batch will be kfree'd
+ * in workqueue context. This allows us to:
+ *
+ * 1. Batch requests together to reduce the number of grace periods during
+ * heavy kfree_rcu() load.
+ *
+ * 2. In the future, makes it possible to use kfree_bulk() on a large number of
+ * kfree_rcu() requests thus reducing the per-object overhead of kfree().
+ *
+ * If the batch does not reach a threshold of KFREE_MAX_BATCH within
+ * KFREE_DRAIN_JIFFIES, then kfree_rcu_monitor() queues them for freeing in
+ * workqueue context after a grace period.
+ */
+void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
+{
+	unsigned long flags;
+	struct kfree_rcu_cpu *krcp;
+	bool monitor_todo;
+
+	/* kfree_call_rcu() batching requires timers to be up. If the scheduler
+	 * is not yet up, just skip batching and do non-batched kfree_call_rcu().
+	 */
+	if (rcu_scheduler_active != RCU_SCHEDULER_RUNNING)
+		return kfree_call_rcu_nobatch(head, func);
+
+	local_irq_save(flags);
+	krcp = this_cpu_ptr(&krc);
+
+	spin_lock(&krcp->lock);
+
+	/* Queue the kfree but don't yet schedule the batch. */
+	head->func = func;
+	head->next = krcp->head;
+	krcp->head = head;
+
+	/* Schedule monitor for timely drain after KFREE_DRAIN_JIFFIES. */
+	monitor_todo = krcp->monitor_todo;
+	krcp->monitor_todo = true;
+	spin_unlock(&krcp->lock);
+
+	if (!monitor_todo) {
+		schedule_delayed_work_on(smp_processor_id(),
+				&krcp->monitor_work,  KFREE_DRAIN_JIFFIES);
+	}
+	local_irq_restore(flags);
+}
 EXPORT_SYMBOL_GPL(kfree_call_rcu);
 
 /*
@@ -3455,10 +3642,24 @@ static void __init rcu_dump_rcu_node_tree(void)
 struct workqueue_struct *rcu_gp_wq;
 struct workqueue_struct *rcu_par_gp_wq;
 
+static void __init kfree_rcu_batch_init(void)
+{
+	int cpu;
+
+	for_each_possible_cpu(cpu) {
+		struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
+
+		spin_lock_init(&krcp->lock);
+		INIT_DELAYED_WORK(&krcp->monitor_work, kfree_rcu_monitor);
+	}
+}
+
 void __init rcu_init(void)
 {
 	int cpu;
 
+	kfree_rcu_batch_init();
+
 	rcu_early_boot_tests();
 
 	rcu_bootup_announce();
-- 
2.23.0.rc1.153.gdeed80330f-goog

^ permalink raw reply	[flat|nested] 4+ messages in thread

* [PATCH v2 2/2] rcuperf: Add kfree_rcu performance Tests
  2019-08-11  2:49 [PATCH v2 1/2] rcu/tree: Add basic support for kfree_rcu batching Joel Fernandes (Google)
@ 2019-08-11  2:49 ` Joel Fernandes (Google)
  2019-08-12 10:29 ` [PATCH v2 1/2] rcu/tree: Add basic support for kfree_rcu batching Byungchul Park
  1 sibling, 0 replies; 4+ messages in thread
From: Joel Fernandes (Google) @ 2019-08-11  2:49 UTC (permalink / raw)
  To: linux-kernel
  Cc: Joel Fernandes (Google),
	Andrew Morton, byungchul.park, Davidlohr Bueso, Josh Triplett,
	kernel-team, kernel-team, Lai Jiangshan, linux-doc,
	Mathieu Desnoyers, max.byungchul.park, Paul E. McKenney,
	Rao Shoaib, rcu, Steven Rostedt

This test runs kfree_rcu in a loop to measure performance of the new
kfree_rcu batching functionality.

The following table shows results when booting with arguments:
rcuperf.kfree_loops=200000 rcuperf.kfree_alloc_num=1000 rcuperf.kfree_rcu_test=1

In addition, rcuperf.kfree_no_batch is used to toggle the batching of
kfree_rcu()s for a test run.

rcuperf.kfree_no_batch	GPs	time (seconds)
 0 (default)		1732	15.9
 1			9133 	14.5

Note that the results are the same for the case:
1. Patch is not applied and rcuperf.kfree_no_batch=0
2. Patch is applied     and rcuperf.kfree_no_batch=1

On a 16 CPU system with the above boot parameters, we see that the total
number of grace periods that elapse during the test drops from 9133 when
not batching to 1732 when batching (a ~427% improvement). The
kfree_rcu() flood itself slows down a bit when batching, though, as
shown. This is likely due to rcuperf threads contending with the
additional worker threads that are now running both before (the monitor)
and after (the work done to kfree()) the grace period.

Note that the active memory consumption during the kfree_rcu() flood
does increase to around 300-400MB due to the batching (from around 50MB
without batching). However, this memory consumption is relatively
constant and is just an effect of the buffering. In other words, the
system is able to keep up with the kfree_rcu() load.

Also, when running the test, please disable CONFIG_DEBUG_PREEMPT and
CONFIG_PROVE_RCU for realistic comparisons with/without batching.

Signed-off-by: Joel Fernandes (Google) <joel@joelfernandes.org>
---
 kernel/rcu/rcuperf.c | 189 +++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 181 insertions(+), 8 deletions(-)

diff --git a/kernel/rcu/rcuperf.c b/kernel/rcu/rcuperf.c
index 7a6890b23c5f..70d6ac19cbff 100644
--- a/kernel/rcu/rcuperf.c
+++ b/kernel/rcu/rcuperf.c
@@ -86,6 +86,7 @@ torture_param(bool, shutdown, RCUPERF_SHUTDOWN,
 	      "Shutdown at end of performance tests.");
 torture_param(int, verbose, 1, "Enable verbose debugging printk()s");
 torture_param(int, writer_holdoff, 0, "Holdoff (us) between GPs, zero to disable");
+torture_param(int, kfree_rcu_test, 0, "Do we run a kfree_rcu perf test?");
 
 static char *perf_type = "rcu";
 module_param(perf_type, charp, 0444);
@@ -105,8 +106,8 @@ static atomic_t n_rcu_perf_writer_finished;
 static wait_queue_head_t shutdown_wq;
 static u64 t_rcu_perf_writer_started;
 static u64 t_rcu_perf_writer_finished;
-static unsigned long b_rcu_perf_writer_started;
-static unsigned long b_rcu_perf_writer_finished;
+static unsigned long b_rcu_gp_test_started;
+static unsigned long b_rcu_gp_test_finished;
 static DEFINE_PER_CPU(atomic_t, n_async_inflight);
 
 static int rcu_perf_writer_state;
@@ -379,10 +380,10 @@ rcu_perf_writer(void *arg)
 	if (atomic_inc_return(&n_rcu_perf_writer_started) >= nrealwriters) {
 		t_rcu_perf_writer_started = t;
 		if (gp_exp) {
-			b_rcu_perf_writer_started =
+			b_rcu_gp_test_started =
 				cur_ops->exp_completed() / 2;
 		} else {
-			b_rcu_perf_writer_started = cur_ops->get_gp_seq();
+			b_rcu_gp_test_started = cur_ops->get_gp_seq();
 		}
 	}
 
@@ -435,10 +436,10 @@ rcu_perf_writer(void *arg)
 				PERFOUT_STRING("Test complete");
 				t_rcu_perf_writer_finished = t;
 				if (gp_exp) {
-					b_rcu_perf_writer_finished =
+					b_rcu_gp_test_finished =
 						cur_ops->exp_completed() / 2;
 				} else {
-					b_rcu_perf_writer_finished =
+					b_rcu_gp_test_finished =
 						cur_ops->get_gp_seq();
 				}
 				if (shutdown) {
@@ -523,8 +524,8 @@ rcu_perf_cleanup(void)
 			 t_rcu_perf_writer_finished -
 			 t_rcu_perf_writer_started,
 			 ngps,
-			 rcuperf_seq_diff(b_rcu_perf_writer_finished,
-					  b_rcu_perf_writer_started));
+			 rcuperf_seq_diff(b_rcu_gp_test_finished,
+					  b_rcu_gp_test_started));
 		for (i = 0; i < nrealwriters; i++) {
 			if (!writer_durations)
 				break;
@@ -592,6 +593,175 @@ rcu_perf_shutdown(void *arg)
 	return -EINVAL;
 }
 
+/*
+ * kfree_rcu performance tests: Start a kfree_rcu loop on all CPUs for number
+ * of iterations and measure total time and number of GP for all iterations to complete.
+ */
+
+torture_param(int, kfree_nthreads, -1, "Number of threads running loops of kfree_rcu().");
+torture_param(int, kfree_alloc_num, 8000, "Number of allocations and frees done in an iteration.");
+torture_param(int, kfree_loops, 10, "Number of loops doing kfree_alloc_num allocations and frees.");
+torture_param(int, kfree_no_batch, 0, "Use the non-batching (slower) version of kfree_rcu.");
+
+static struct task_struct **kfree_reader_tasks;
+static int kfree_nrealthreads;
+static atomic_t n_kfree_perf_thread_started;
+static atomic_t n_kfree_perf_thread_ended;
+
+struct kfree_obj {
+	char kfree_obj[8];
+	struct rcu_head rh;
+};
+
+static int
+kfree_perf_thread(void *arg)
+{
+	int i, loop = 0;
+	long me = (long)arg;
+	struct kfree_obj **alloc_ptrs;
+	u64 start_time, end_time;
+
+	VERBOSE_PERFOUT_STRING("kfree_perf_thread task started");
+	set_cpus_allowed_ptr(current, cpumask_of(me % nr_cpu_ids));
+	set_user_nice(current, MAX_NICE);
+
+	alloc_ptrs = (struct kfree_obj **)kmalloc(sizeof(struct kfree_obj *) * kfree_alloc_num,
+						  GFP_KERNEL);
+	if (!alloc_ptrs)
+		return -ENOMEM;
+
+	start_time = ktime_get_mono_fast_ns();
+
+	if (atomic_inc_return(&n_kfree_perf_thread_started) >= kfree_nrealthreads) {
+		if (gp_exp)
+			b_rcu_gp_test_started = cur_ops->exp_completed() / 2;
+		else
+			b_rcu_gp_test_started = cur_ops->get_gp_seq();
+	}
+
+	do {
+		for (i = 0; i < kfree_alloc_num; i++) {
+			alloc_ptrs[i] = kmalloc(sizeof(struct kfree_obj), GFP_KERNEL);
+			if (!alloc_ptrs[i])
+				return -ENOMEM;
+		}
+
+		for (i = 0; i < kfree_alloc_num; i++) {
+			if (!kfree_no_batch) {
+				kfree_rcu(alloc_ptrs[i], rh);
+			} else {
+				rcu_callback_t cb;
+
+				cb = (rcu_callback_t)(unsigned long)offsetof(struct kfree_obj, rh);
+				kfree_call_rcu_nobatch(&(alloc_ptrs[i]->rh), cb);
+			}
+		}
+
+		cond_resched();
+	} while (!torture_must_stop() && ++loop < kfree_loops);
+
+	if (atomic_inc_return(&n_kfree_perf_thread_ended) >= kfree_nrealthreads) {
+		end_time = ktime_get_mono_fast_ns();
+
+		if (gp_exp)
+			b_rcu_gp_test_finished = cur_ops->exp_completed() / 2;
+		else
+			b_rcu_gp_test_finished = cur_ops->get_gp_seq();
+
+		pr_alert("Total time taken by all kfree'ers: %llu ns, loops: %d, batches: %ld\n",
+		       (unsigned long long)(end_time - start_time), kfree_loops,
+		       rcuperf_seq_diff(b_rcu_gp_test_finished, b_rcu_gp_test_started));
+		if (shutdown) {
+			smp_mb(); /* Assign before wake. */
+			wake_up(&shutdown_wq);
+		}
+	}
+
+	kfree(alloc_ptrs);
+	torture_kthread_stopping("kfree_perf_thread");
+	return 0;
+}
+
+static void
+kfree_perf_cleanup(void)
+{
+	int i;
+
+	if (torture_cleanup_begin())
+		return;
+
+	if (kfree_reader_tasks) {
+		for (i = 0; i < kfree_nrealthreads; i++)
+			torture_stop_kthread(kfree_perf_thread,
+					     kfree_reader_tasks[i]);
+		kfree(kfree_reader_tasks);
+	}
+
+	torture_cleanup_end();
+}
+
+/*
+ * shutdown kthread.  Just waits to be awakened, then shuts down system.
+ */
+static int
+kfree_perf_shutdown(void *arg)
+{
+	do {
+		wait_event(shutdown_wq,
+			   atomic_read(&n_kfree_perf_thread_ended) >=
+			   kfree_nrealthreads);
+	} while (atomic_read(&n_kfree_perf_thread_ended) < kfree_nrealthreads);
+
+	smp_mb(); /* Wake before output. */
+
+	kfree_perf_cleanup();
+	kernel_power_off();
+	return -EINVAL;
+}
+
+static int __init
+kfree_perf_init(void)
+{
+	long i;
+	int firsterr = 0;
+
+	kfree_nrealthreads = compute_real(kfree_nthreads);
+	/* Start up the kthreads. */
+	if (shutdown) {
+		init_waitqueue_head(&shutdown_wq);
+		firsterr = torture_create_kthread(kfree_perf_shutdown, NULL,
+						  shutdown_task);
+		if (firsterr)
+			goto unwind;
+		schedule_timeout_uninterruptible(1);
+	}
+
+	kfree_reader_tasks = kcalloc(kfree_nrealthreads, sizeof(kfree_reader_tasks[0]),
+			       GFP_KERNEL);
+	if (kfree_reader_tasks == NULL) {
+		firsterr = -ENOMEM;
+		goto unwind;
+	}
+
+	for (i = 0; i < kfree_nrealthreads; i++) {
+		firsterr = torture_create_kthread(kfree_perf_thread, (void *)i,
+						  kfree_reader_tasks[i]);
+		if (firsterr)
+			goto unwind;
+	}
+
+	while (atomic_read(&n_kfree_perf_thread_started) < kfree_nrealthreads)
+		schedule_timeout_uninterruptible(1);
+
+	torture_init_end();
+	return 0;
+
+unwind:
+	torture_init_end();
+	kfree_perf_cleanup();
+	return firsterr;
+}
+
 static int __init
 rcu_perf_init(void)
 {
@@ -624,6 +794,9 @@ rcu_perf_init(void)
 	if (cur_ops->init)
 		cur_ops->init();
 
+	if (kfree_rcu_test)
+		return kfree_perf_init();
+
 	nrealwriters = compute_real(nwriters);
 	nrealreaders = compute_real(nreaders);
 	atomic_set(&n_rcu_perf_reader_started, 0);
-- 
2.23.0.rc1.153.gdeed80330f-goog


^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [PATCH v2 1/2] rcu/tree: Add basic support for kfree_rcu batching
  2019-08-11  2:49 [PATCH v2 1/2] rcu/tree: Add basic support for kfree_rcu batching Joel Fernandes (Google)
  2019-08-11  2:49 ` [PATCH v2 2/2] rcuperf: Add kfree_rcu performance Tests Joel Fernandes (Google)
@ 2019-08-12 10:29 ` Byungchul Park
  2019-08-12 13:08   ` Joel Fernandes
  1 sibling, 1 reply; 4+ messages in thread
From: Byungchul Park @ 2019-08-12 10:29 UTC (permalink / raw)
  To: Joel Fernandes (Google)
  Cc: linux-kernel, Rao Shoaib, max.byungchul.park, kernel-team,
	kernel-team, Andrew Morton, Davidlohr Bueso, Josh Triplett,
	Lai Jiangshan, linux-doc, Mathieu Desnoyers, Paul E. McKenney,
	rcu, Steven Rostedt

On Sat, Aug 10, 2019 at 10:49:55PM -0400, Joel Fernandes (Google) wrote:
> Recently a discussion about stability and performance of a system
> involving a high rate of kfree_rcu() calls surfaced on the list [1]
> which led to another discussion how to prepare for this situation.
> 
> This patch adds basic batching support for kfree_rcu(). It is "basic"
> because we do none of the slab management, dynamic allocation, code
> moving or any of the other things, some of which previous attempts did
> [2]. These fancier improvements can be follow-up patches and there are
> several ideas being discussed in those regards.
> 
> Torture tests follow in the next patch and show improvements of around
> 400% in reduction of number of  grace periods on a 16 CPU system. More
> details and test data are in that patch.
> 
> This is an effort to start simple, and build up from there. In the
> future, an extension to use kfree_bulk and possibly per-slab batching
> could be done to further improve performance due to cache-locality and
> slab-specific bulk free optimizations.
> 
> There is an implication with rcu_barrier() with this patch. Since the
> kfree_rcu() calls can be batched, and may not be handed yet to the RCU
> machinery in fact, the monitor may not have even run yet to do the
> queue_rcu_work(), there seems no easy way of implementing rcu_barrier()
> to wait for those kfree_rcu()s that are already made. So this means a
> kfree_rcu() followed by an rcu_barrier() does not imply that memory will
> be freed once rcu_barrier() returns.
> 
> Another implication is higher active memory usage (although not
> run-away..) until the kfree_rcu() flooding ends, in comparison to
> without batching. More details about this are in the second patch which
> adds an rcuperf test.
> 
> Finally, in the future we should get rid of kfree_rcu special casing
> within RCU such as in rcu_do_batch and switch everything to just
> batching. Currently we don't do that since timer subsystem is not yet up
> and we cannot schedule the kfree_rcu() monitor as the timer subsystem's
> lock are not initialized. That would also mean getting rid of
> kfree_call_rcu_nobatch() entirely.
> 
> [1] http://lore.kernel.org/lkml/20190723035725-mutt-send-email-mst@kernel.org
> [2] https://lkml.org/lkml/2017/12/19/824
> 
> Cc: Rao Shoaib <rao.shoaib@oracle.com>
> Cc: max.byungchul.park@gmail.com
> Cc: byungchul.park@lge.com
> Cc: kernel-team@android.com
> Cc: kernel-team@lge.com
> Co-developed-by: Byungchul Park <byungchul.park@lge.com>
> Signed-off-by: Byungchul Park <byungchul.park@lge.com>
> Signed-off-by: Joel Fernandes (Google) <joel@joelfernandes.org>
> 
> ---
> RFC v1-> PATCH v2: Removed limits on the ->head list, just let it grow.
>                    Dropped KFREE_MAX_JIFFIES to HZ/50 from HZ/20 to reduce OOM occurrence.
>                    Removed sleeps in rcuperf test, just using cond_resched()in loop.
>                    Better code comments ;)
> 
>  .../admin-guide/kernel-parameters.txt         |  17 ++
>  include/linux/rcutiny.h                       |   5 +
>  include/linux/rcutree.h                       |   1 +
>  kernel/rcu/tree.c                             | 213 +++++++++++++++++-
>  4 files changed, 230 insertions(+), 6 deletions(-)
> 
> diff --git a/Documentation/admin-guide/kernel-parameters.txt b/Documentation/admin-guide/kernel-parameters.txt
> index 7ccd158b3894..a9156ca5de24 100644
> --- a/Documentation/admin-guide/kernel-parameters.txt
> +++ b/Documentation/admin-guide/kernel-parameters.txt
> @@ -3895,6 +3895,23 @@
>  			test until boot completes in order to avoid
>  			interference.
>  
> +	rcuperf.kfree_rcu_test= [KNL]
> +			Set to measure performance of kfree_rcu() flooding.
> +
> +	rcuperf.kfree_nthreads= [KNL]
> +			The number of threads running loops of kfree_rcu().
> +
> +	rcuperf.kfree_alloc_num= [KNL]
> +			Number of allocations and frees done in an iteration.
> +
> +	rcuperf.kfree_loops= [KNL]
> +			Number of loops doing rcuperf.kfree_alloc_num number
> +			of allocations and frees.
> +
> +	rcuperf.kfree_no_batch= [KNL]
> +			Use the non-batching (slower) version of kfree_rcu.
> +			This is useful for comparing with the batched version.
> +
>  	rcuperf.nreaders= [KNL]
>  			Set number of RCU readers.  The value -1 selects
>  			N, where N is the number of CPUs.  A value
> diff --git a/include/linux/rcutiny.h b/include/linux/rcutiny.h
> index 8e727f57d814..383f2481750f 100644
> --- a/include/linux/rcutiny.h
> +++ b/include/linux/rcutiny.h
> @@ -39,6 +39,11 @@ static inline void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
>  	call_rcu(head, func);
>  }
>  
> +static inline void kfree_call_rcu_nobatch(struct rcu_head *head, rcu_callback_t func)
> +{
> +	call_rcu(head, func);
> +}
> +
>  void rcu_qs(void);
>  
>  static inline void rcu_softirq_qs(void)
> diff --git a/include/linux/rcutree.h b/include/linux/rcutree.h
> index 735601ac27d3..7e38b39ec634 100644
> --- a/include/linux/rcutree.h
> +++ b/include/linux/rcutree.h
> @@ -34,6 +34,7 @@ static inline void rcu_virt_note_context_switch(int cpu)
>  
>  void synchronize_rcu_expedited(void);
>  void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func);
> +void kfree_call_rcu_nobatch(struct rcu_head *head, rcu_callback_t func);
>  
>  void rcu_barrier(void);
>  bool rcu_eqs_special_set(int cpu);
> diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> index a14e5fbbea46..102a5f606d78 100644
> --- a/kernel/rcu/tree.c
> +++ b/kernel/rcu/tree.c
> @@ -2593,17 +2593,204 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func)
>  }
>  EXPORT_SYMBOL_GPL(call_rcu);
>  
> +
> +/* Maximum number of jiffies to wait before draining a batch. */
> +#define KFREE_DRAIN_JIFFIES (HZ / 50)

JFYI, I also can see oom with a larger value of this. I hope this magic
value works well for all kind of systems.

> +
>  /*
> - * Queue an RCU callback for lazy invocation after a grace period.
> - * This will likely be later named something like "call_rcu_lazy()",
> - * but this change will require some way of tagging the lazy RCU
> - * callbacks in the list of pending callbacks. Until then, this
> - * function may only be called from __kfree_rcu().
> + * Maximum number of kfree(s) to batch, if this limit is hit then the batch of
> + * kfree(s) is queued for freeing after a grace period, right away.
>   */
> -void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
> +struct kfree_rcu_cpu {
> +	/* The rcu_work node for queuing work with queue_rcu_work(). The work
> +	 * is done after a grace period.
> +	 */
> +	struct rcu_work rcu_work;
> +
> +	/* The list of objects being queued in a batch but are not yet
> +	 * scheduled to be freed.
> +	 */
> +	struct rcu_head *head;
> +
> +	/* The list of objects that have now left ->head and are queued for
> +	 * freeing after a grace period.
> +	 */
> +	struct rcu_head *head_free;
> +
> +	/* Protect concurrent access to this structure. */
> +	spinlock_t lock;
> +
> +	/* The delayed work that flushes ->head to ->head_free incase ->head
> +	 * did not reach a length of KFREE_MAX_BATCH within KFREE_DRAIN_JIFFIES.
> +	 * In case flushing cannot be done if RCU is busy, then ->head just
> +	 * continues to grow beyond KFREE_MAX_BATCH and we retry flushing later.

Minor one. We don't use KFREE_MAX_BATCH anymore.

> +	 */
> +	struct delayed_work monitor_work;
> +	bool monitor_todo;	/* Is a delayed work pending execution? */
> +};
> +
> +static DEFINE_PER_CPU(struct kfree_rcu_cpu, krc);
> +
> +/*
> + * This function is invoked in workqueue context after a grace period.
> + * It frees all the objects queued on ->head_free.
> + */
> +static void kfree_rcu_work(struct work_struct *work)
> +{
> +	unsigned long flags;
> +	struct rcu_head *head, *next;
> +	struct kfree_rcu_cpu *krcp = container_of(to_rcu_work(work),
> +					struct kfree_rcu_cpu, rcu_work);
> +
> +	spin_lock_irqsave(&krcp->lock, flags);
> +	head = krcp->head_free;
> +	krcp->head_free = NULL;
> +	spin_unlock_irqrestore(&krcp->lock, flags);
> +
> +	/*
> +	 * The head is detached and not referenced from anywhere, so lockless
> +	 * access is Ok.
> +	 */
> +	for (; head; head = next) {
> +		next = head->next;
> +		head->next = NULL;
> +		/* Could be possible to optimize with kfree_bulk in future */
> +		__rcu_reclaim(rcu_state.name, head);
> +		cond_resched_tasks_rcu_qs();
> +	}
> +}
> +
> +/*
> + * Schedule the kfree batch RCU work to run in workqueue context after a GP.
> + *
> + * This function is invoked when the ->head batch has reached size
> + * KFREE_MAX_BATCH or when kfree_rcu_monitor() sees that the
> + * KFREE_DRAIN_JIFFIES timeout has been reached.

KFREE_MAX_BATCH again.

> + *
> + * It can also be called from IRQ context if ->head fills up completely.
> + * (i.e. ->head reaches KFREE_MAX_BATCH under heavy kfree_rcu() load
> + *  and that last kfree_rcu() in the ->head batch happened from IRQ context).

Not true anymore.

> + */
> +static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp)
> +{
> +	lockdep_assert_held(&krcp->lock);
> +
> +	/* If a previous RCU batch work is already in progress, we cannot queue
> +	 * another one, just refuse the optimization and it will be retried
> +	 * again in KFREE_DRAIN_JIFFIES time.
> +	 */
> +	if (krcp->head_free)
> +		return false;
> +
> +	krcp->head_free = krcp->head;
> +	krcp->head = NULL;
> +	INIT_RCU_WORK(&krcp->rcu_work, kfree_rcu_work);
> +	queue_rcu_work(system_wq, &krcp->rcu_work);
> +
> +	return true;
> +}
> +
> +static inline void kfree_rcu_drain_unlock(struct kfree_rcu_cpu *krcp,
> +				   unsigned long flags)
> +{
> +	/* Flush ->head to ->head_free, all objects on ->head_free will be
> +	 * kfree'd after a grace period.
> +	 */
> +	if (queue_kfree_rcu_work(krcp)) {
> +		/* Success! Our job is done here. */
> +		spin_unlock_irqrestore(&krcp->lock, flags);
> +		return;
> +	}
> +
> +	/* Previous batch did not get free yet, let us try again soon. */
> +	if (krcp->monitor_todo == false) {
> +		schedule_delayed_work_on(smp_processor_id(),
> +				&krcp->monitor_work,  KFREE_DRAIN_JIFFIES);
> +		krcp->monitor_todo = true;
> +	}
> +	spin_unlock_irqrestore(&krcp->lock, flags);
> +}
> +
> +/*
> + * This function is invoked after the KFREE_DRAIN_JIFFIES timeout has elapsed,
> + * so it drains the specified kfree_rcu_cpu structure's ->head list.
> + */
> +static void kfree_rcu_monitor(struct work_struct *work)
> +{
> +	bool todo;
> +	unsigned long flags;
> +	struct kfree_rcu_cpu *krcp = container_of(work, struct kfree_rcu_cpu,
> +						 monitor_work.work);
> +
> +	spin_lock_irqsave(&krcp->lock, flags);
> +	todo = krcp->monitor_todo;
> +	krcp->monitor_todo = false;
> +	if (todo)
> +		kfree_rcu_drain_unlock(krcp, flags);
> +	else
> +		spin_unlock_irqrestore(&krcp->lock, flags);
> +}
> +
> +/*
> + * This version of kfree_call_rcu does not do batching of kfree_rcu() requests.
> + * Used only by rcuperf torture test for comparison with kfree_rcu_batch().
> + */
> +void kfree_call_rcu_nobatch(struct rcu_head *head, rcu_callback_t func)
>  {
>  	__call_rcu(head, func, -1, 1);
>  }
> +EXPORT_SYMBOL_GPL(kfree_call_rcu_nobatch);
> +
> +/*
> + * Queue a request for lazy invocation of kfree() after a grace period.
> + *
> + * Each kfree_call_rcu() request is added to a batch. Once the batch reaches a
> + * threshold of KFREE_MAX_BATCH, all the objects in the batch will be kfree'd
> + * in workqueue context. This allows us to:
> + *
> + * 1. Batch requests together to reduce the number of grace periods during
> + * heavy kfree_rcu() load.
> + *
> + * 2. In the future, makes it possible to use kfree_bulk() on a large number of
> + * kfree_rcu() requests thus reducing the per-object overhead of kfree().
> + *
> + * If the batch does not reach a threshold of KFREE_MAX_BATCH within
> + * KFREE_DRAIN_JIFFIES, then kfree_rcu_monitor() queues them for freeing in
> + * workqueue context after a grace period.

KFREE_MAX_BATCH again.

Thanks,
Byungchul

> + */
> +void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
> +{
> +	unsigned long flags;
> +	struct kfree_rcu_cpu *krcp;
> +	bool monitor_todo;
> +
> +	/* kfree_call_rcu() batching requires timers to be up. If the scheduler
> +	 * is not yet up, just skip batching and do non-batched kfree_call_rcu().
> +	 */
> +	if (rcu_scheduler_active != RCU_SCHEDULER_RUNNING)
> +		return kfree_call_rcu_nobatch(head, func);
> +
> +	local_irq_save(flags);
> +	krcp = this_cpu_ptr(&krc);
> +
> +	spin_lock(&krcp->lock);
> +
> +	/* Queue the kfree but don't yet schedule the batch. */
> +	head->func = func;
> +	head->next = krcp->head;
> +	krcp->head = head;
> +
> +	/* Schedule monitor for timely drain after KFREE_DRAIN_JIFFIES. */
> +	monitor_todo = krcp->monitor_todo;
> +	krcp->monitor_todo = true;
> +	spin_unlock(&krcp->lock);
> +
> +	if (!monitor_todo) {
> +		schedule_delayed_work_on(smp_processor_id(),
> +				&krcp->monitor_work,  KFREE_DRAIN_JIFFIES);
> +	}
> +	local_irq_restore(flags);
> +}
>  EXPORT_SYMBOL_GPL(kfree_call_rcu);
>  
>  /*
> @@ -3455,10 +3642,24 @@ static void __init rcu_dump_rcu_node_tree(void)
>  struct workqueue_struct *rcu_gp_wq;
>  struct workqueue_struct *rcu_par_gp_wq;
>  
> +static void __init kfree_rcu_batch_init(void)
> +{
> +	int cpu;
> +
> +	for_each_possible_cpu(cpu) {
> +		struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
> +
> +		spin_lock_init(&krcp->lock);
> +		INIT_DELAYED_WORK(&krcp->monitor_work, kfree_rcu_monitor);
> +	}
> +}
> +
>  void __init rcu_init(void)
>  {
>  	int cpu;
>  
> +	kfree_rcu_batch_init();
> +
>  	rcu_early_boot_tests();
>  
>  	rcu_bootup_announce();
> -- 
> 2.23.0.rc1.153.gdeed80330f-goog

^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [PATCH v2 1/2] rcu/tree: Add basic support for kfree_rcu batching
  2019-08-12 10:29 ` [PATCH v2 1/2] rcu/tree: Add basic support for kfree_rcu batching Byungchul Park
@ 2019-08-12 13:08   ` Joel Fernandes
  0 siblings, 0 replies; 4+ messages in thread
From: Joel Fernandes @ 2019-08-12 13:08 UTC (permalink / raw)
  To: Byungchul Park
  Cc: linux-kernel, Rao Shoaib, max.byungchul.park, kernel-team,
	kernel-team, Andrew Morton, Davidlohr Bueso, Josh Triplett,
	Lai Jiangshan, linux-doc, Mathieu Desnoyers, Paul E. McKenney,
	rcu, Steven Rostedt

On Mon, Aug 12, 2019 at 07:29:17PM +0900, Byungchul Park wrote:
[snip]
> > diff --git a/include/linux/rcutiny.h b/include/linux/rcutiny.h
> > index 8e727f57d814..383f2481750f 100644
> > --- a/include/linux/rcutiny.h
> > +++ b/include/linux/rcutiny.h
> > @@ -39,6 +39,11 @@ static inline void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
> >  	call_rcu(head, func);
> >  }
> >  
> > +static inline void kfree_call_rcu_nobatch(struct rcu_head *head, rcu_callback_t func)
> > +{
> > +	call_rcu(head, func);
> > +}
> > +
> >  void rcu_qs(void);
> >  
> >  static inline void rcu_softirq_qs(void)
> > diff --git a/include/linux/rcutree.h b/include/linux/rcutree.h
> > index 735601ac27d3..7e38b39ec634 100644
> > --- a/include/linux/rcutree.h
> > +++ b/include/linux/rcutree.h
> > @@ -34,6 +34,7 @@ static inline void rcu_virt_note_context_switch(int cpu)
> >  
> >  void synchronize_rcu_expedited(void);
> >  void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func);
> > +void kfree_call_rcu_nobatch(struct rcu_head *head, rcu_callback_t func);
> >  
> >  void rcu_barrier(void);
> >  bool rcu_eqs_special_set(int cpu);
> > diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> > index a14e5fbbea46..102a5f606d78 100644
> > --- a/kernel/rcu/tree.c
> > +++ b/kernel/rcu/tree.c
> > @@ -2593,17 +2593,204 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func)
> >  }
> >  EXPORT_SYMBOL_GPL(call_rcu);
> >  
> > +
> > +/* Maximum number of jiffies to wait before draining a batch. */
> > +#define KFREE_DRAIN_JIFFIES (HZ / 50)
> 
> JFYI, I also can see oom with a larger value of this. I hope this magic
> value works well for all kind of systems.

It seems to work well in my testing. I am glad you could not perceive OOMs at
this value, either.

> > - * Queue an RCU callback for lazy invocation after a grace period.
> > - * This will likely be later named something like "call_rcu_lazy()",
> > - * but this change will require some way of tagging the lazy RCU
> > - * callbacks in the list of pending callbacks. Until then, this
> > - * function may only be called from __kfree_rcu().
> > + * Maximum number of kfree(s) to batch, if this limit is hit then the batch of
> > + * kfree(s) is queued for freeing after a grace period, right away.
> >   */
> > -void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
> > +struct kfree_rcu_cpu {
> > +	/* The rcu_work node for queuing work with queue_rcu_work(). The work
> > +	 * is done after a grace period.
> > +	 */
> > +	struct rcu_work rcu_work;
> > +
> > +	/* The list of objects being queued in a batch but are not yet
> > +	 * scheduled to be freed.
> > +	 */
> > +	struct rcu_head *head;
> > +
> > +	/* The list of objects that have now left ->head and are queued for
> > +	 * freeing after a grace period.
> > +	 */
> > +	struct rcu_head *head_free;
> > +
> > +	/* Protect concurrent access to this structure. */
> > +	spinlock_t lock;
> > +
> > +	/* The delayed work that flushes ->head to ->head_free incase ->head
> > +	 * did not reach a length of KFREE_MAX_BATCH within KFREE_DRAIN_JIFFIES.
> > +	 * In case flushing cannot be done if RCU is busy, then ->head just
> > +	 * continues to grow beyond KFREE_MAX_BATCH and we retry flushing later.
> 
> Minor one. We don't use KFREE_MAX_BATCH anymore.

Sorry for leaving these KFREE_MAX_BATCH comments stale, I cleaned up many of
them already but some where still left behind. I will fix these in the v3.

thanks for review!

 - Joel

[snip]

^ permalink raw reply	[flat|nested] 4+ messages in thread

end of thread, back to index

Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-08-11  2:49 [PATCH v2 1/2] rcu/tree: Add basic support for kfree_rcu batching Joel Fernandes (Google)
2019-08-11  2:49 ` [PATCH v2 2/2] rcuperf: Add kfree_rcu performance Tests Joel Fernandes (Google)
2019-08-12 10:29 ` [PATCH v2 1/2] rcu/tree: Add basic support for kfree_rcu batching Byungchul Park
2019-08-12 13:08   ` Joel Fernandes

RCU Archive on lore.kernel.org

Archives are clonable:
	git clone --mirror https://lore.kernel.org/rcu/0 rcu/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 rcu rcu/ https://lore.kernel.org/rcu \
		rcu@vger.kernel.org rcu@archiver.kernel.org
	public-inbox-index rcu


Newsgroup available over NNTP:
	nntp://nntp.lore.kernel.org/org.kernel.vger.rcu


AGPL code for this site: git clone https://public-inbox.org/ public-inbox