linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
From: Steven Rostedt <rostedt@goodmis.org>
To: linux-kernel@vger.kernel.org
Cc: Ingo Molnar <mingo@kernel.org>,
	Andrew Morton <akpm@linux-foundation.org>
Subject: [for-next][PATCH 12/18] ring-buffer: Add rb_time_t 64 bit operations for speeding up 32 bit
Date: Thu, 02 Jul 2020 17:58:24 -0400	[thread overview]
Message-ID: <20200702215833.251640706@goodmis.org> (raw)
In-Reply-To: 20200702215812.428188663@goodmis.org

From: "Steven Rostedt (VMware)" <rostedt@goodmis.org>

After a discussion with the new time algorithm to have nested events still
have proper time keeping but required using local64_t atomic operations.
Mathieu was concerned about the performance this would have on 32 bit
machines, as in most cases, atomic 64 bit operations on them can be
expensive.

As the ring buffer's timing needs do not require full features of local64_t,
a wrapper is made to implement a new rb_time_t operation that uses two longs
on 32 bit machines but still uses the local64_t operations on 64 bit
machines. There's a switch that can be made in the file to force 64 bit to
use the 32 bit version just for testing purposes.

All reads do not need to succeed if a read happened while the stamp being
read is in the process of being updated. The requirement is that all reads
must succed that were done by an interrupting event (where this event was
interrupted by another event that did the write). Or if the event itself did
the write first. That is: rb_time_set(t, x) followed by rb_time_read(t) will
always succeed (even if it gets interrupted by another event that writes to
t. The result of the read will be either the previous set, or a set
performed by an interrupting event.

If the read is done by an event that interrupted another event that was in
the process of setting the time stamp, and no other event came along to
write to that time stamp, it will fail and the rb_time_read() will return
that it failed (the value to read will be undefined).

A set will always write to the time stamp and return with a valid time
stamp, such that any read after it will be valid.

A cmpxchg may fail if it interrupted an event that was in the process of
updating the time stamp just like the reads do. Other than that, it will act
like a normal cmpxchg.

The way this works is that the rb_time_t is made of of three fields. A cnt,
that gets updated atomically everyting a modification is made. A top that
represents the most significant 30 bits of the time, and a bottom to
represent the least significant 30 bits of the time. Notice, that the time
values is only 60 bits long (where the ring buffer only uses 59 bits, which
gives us 18 years of nanoseconds!).

The top two bits of both the top and bottom is a 2 bit counter that gets set
by the value of the least two significant bits of the cnt. A read of the top
and the bottom where both the top and bottom have the same most significant
top 2 bits, are considered a match and a valid 60 bit number can be created
from it. If they do not match, then the number is considered invalid, and
this must only happen if an event interrupted another event in the midst of
updating the time stamp.

This is only used for 32 bits machines as 64 bit machines can get better
performance out of the local64_t. This has been tested heavily by forcing 64
bit to use this logic.

Link: https://lore.kernel.org/r/20200625225345.18cf5881@oasis.local.home
Link: http://lkml.kernel.org/r/20200629025259.309232719@goodmis.org

Inspired-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: Steven Rostedt (VMware) <rostedt@goodmis.org>
---
 kernel/trace/ring_buffer.c | 270 +++++++++++++++++++++++++++++++++----
 1 file changed, 243 insertions(+), 27 deletions(-)

diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index 7ee6619951ea..802bb38d9c81 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -27,7 +27,6 @@
 #include <linux/oom.h>
 
 #include <asm/local.h>
-#include <asm/local64.h>
 
 static void update_pages_handler(struct work_struct *work);
 
@@ -449,6 +448,28 @@ enum {
 	RB_CTX_MAX
 };
 
+#if BITS_PER_LONG == 32
+#define RB_TIME_32
+#endif
+
+/* To test on 64 bit machines */
+//#define RB_TIME_32
+
+#ifdef RB_TIME_32
+
+struct rb_time_struct {
+	local_t		cnt;
+	local_t		top;
+	local_t		bottom;
+};
+#else
+#include <asm/local64.h>
+struct rb_time_struct {
+	local64_t	time;
+};
+#endif
+typedef struct rb_time_struct rb_time_t;
+
 /*
  * head_page == tail_page && head == tail then buffer is empty.
  */
@@ -484,8 +505,8 @@ struct ring_buffer_per_cpu {
 	size_t				shortest_full;
 	unsigned long			read;
 	unsigned long			read_bytes;
-	local64_t			write_stamp;
-	local64_t			before_stamp;
+	rb_time_t			write_stamp;
+	rb_time_t			before_stamp;
 	u64				read_stamp;
 	/* ring buffer pages to update, > 0 to add, < 0 to remove */
 	long				nr_pages_to_update;
@@ -528,6 +549,189 @@ struct ring_buffer_iter {
 	int				missed_events;
 };
 
+#ifdef RB_TIME_32
+
+/*
+ * On 32 bit machines, local64_t is very expensive. As the ring
+ * buffer doesn't need all the features of a true 64 bit atomic,
+ * on 32 bit, it uses these functions (64 still uses local64_t).
+ *
+ * For the ring buffer, 64 bit required operations for the time is
+ * the following:
+ *
+ *  - Only need 59 bits (uses 60 to make it even).
+ *  - Reads may fail if it interrupted a modification of the time stamp.
+ *      It will succeed if it did not interrupt another write even if
+ *      the read itself is interrupted by a write.
+ *      It returns whether it was successful or not.
+ *
+ *  - Writes always succeed and will overwrite other writes and writes
+ *      that were done by events interrupting the current write.
+ *
+ *  - A write followed by a read of the same time stamp will always succeed,
+ *      but may not contain the same value.
+ *
+ *  - A cmpxchg will fail if it interrupted another write or cmpxchg.
+ *      Other than that, it acts like a normal cmpxchg.
+ *
+ * The 60 bit time stamp is broken up by 30 bits in a top and bottom half
+ *  (bottom being the least significant 30 bits of the 60 bit time stamp).
+ *
+ * The two most significant bits of each half holds a 2 bit counter (0-3).
+ * Each update will increment this counter by one.
+ * When reading the top and bottom, if the two counter bits match then the
+ *  top and bottom together make a valid 60 bit number.
+ */
+#define RB_TIME_SHIFT	30
+#define RB_TIME_VAL_MASK ((1 << RB_TIME_SHIFT) - 1)
+
+static inline int rb_time_cnt(unsigned long val)
+{
+	return (val >> RB_TIME_SHIFT) & 3;
+}
+
+static inline u64 rb_time_val(unsigned long top, unsigned long bottom)
+{
+	u64 val;
+
+	val = top & RB_TIME_VAL_MASK;
+	val <<= RB_TIME_SHIFT;
+	val |= bottom & RB_TIME_VAL_MASK;
+
+	return val;
+}
+
+static inline bool __rb_time_read(rb_time_t *t, u64 *ret, unsigned long *cnt)
+{
+	unsigned long top, bottom;
+	unsigned long c;
+
+	/*
+	 * If the read is interrupted by a write, then the cnt will
+	 * be different. Loop until both top and bottom have been read
+	 * without interruption.
+	 */
+	do {
+		c = local_read(&t->cnt);
+		top = local_read(&t->top);
+		bottom = local_read(&t->bottom);
+	} while (c != local_read(&t->cnt));
+
+	*cnt = rb_time_cnt(top);
+
+	/* If top and bottom counts don't match, this interrupted a write */
+	if (*cnt != rb_time_cnt(bottom))
+		return false;
+
+	*ret = rb_time_val(top, bottom);
+	return true;
+}
+
+static bool rb_time_read(rb_time_t *t, u64 *ret)
+{
+	unsigned long cnt;
+
+	return __rb_time_read(t, ret, &cnt);
+}
+
+static inline unsigned long rb_time_val_cnt(unsigned long val, unsigned long cnt)
+{
+	return (val & RB_TIME_VAL_MASK) | ((cnt & 3) << RB_TIME_SHIFT);
+}
+
+static inline void rb_time_split(u64 val, unsigned long *top, unsigned long *bottom)
+{
+	*top = (unsigned long)((val >> RB_TIME_SHIFT) & RB_TIME_VAL_MASK);
+	*bottom = (unsigned long)(val & RB_TIME_VAL_MASK);
+}
+
+static inline void rb_time_val_set(local_t *t, unsigned long val, unsigned long cnt)
+{
+	val = rb_time_val_cnt(val, cnt);
+	local_set(t, val);
+}
+
+static void rb_time_set(rb_time_t *t, u64 val)
+{
+	unsigned long cnt, top, bottom;
+
+	rb_time_split(val, &top, &bottom);
+
+	/* Writes always succeed with a valid number even if it gets interrupted. */
+	do {
+		cnt = local_inc_return(&t->cnt);
+		rb_time_val_set(&t->top, top, cnt);
+		rb_time_val_set(&t->bottom, bottom, cnt);
+	} while (cnt != local_read(&t->cnt));
+}
+
+static inline bool
+rb_time_read_cmpxchg(local_t *l, unsigned long expect, unsigned long set)
+{
+	unsigned long ret;
+
+	ret = local_cmpxchg(l, expect, set);
+	return ret == expect;
+}
+
+static int rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set)
+{
+	unsigned long cnt, top, bottom;
+	unsigned long cnt2, top2, bottom2;
+	u64 val;
+
+	/* The cmpxchg always fails if it interrupted an update */
+	 if (!__rb_time_read(t, &val, &cnt2))
+		 return false;
+
+	 if (val != expect)
+		 return false;
+
+	 cnt = local_read(&t->cnt);
+	 if ((cnt & 3) != cnt2)
+		 return false;
+
+	 cnt2 = cnt + 1;
+
+	 rb_time_split(val, &top, &bottom);
+	 top = rb_time_val_cnt(top, cnt);
+	 bottom = rb_time_val_cnt(bottom, cnt);
+
+	 rb_time_split(set, &top2, &bottom2);
+	 top2 = rb_time_val_cnt(top2, cnt2);
+	 bottom2 = rb_time_val_cnt(bottom2, cnt2);
+
+	if (!rb_time_read_cmpxchg(&t->cnt, cnt, cnt2))
+		return false;
+	if (!rb_time_read_cmpxchg(&t->top, top, top2))
+		return false;
+	if (!rb_time_read_cmpxchg(&t->bottom, bottom, bottom2))
+		return false;
+	return true;
+}
+
+#else /* 64 bits */
+
+/* local64_t always succeeds */
+
+static inline bool rb_time_read(rb_time_t *t, u64 *ret)
+{
+	*ret = local64_read(&t->time);
+	return true;
+}
+static void rb_time_set(rb_time_t *t, u64 val)
+{
+	local64_set(&t->time, val);
+}
+
+static bool rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set)
+{
+	u64 val;
+	val = local64_cmpxchg(&t->time, expect, set);
+	return val == expect;
+}
+#endif
+
 /**
  * ring_buffer_nr_pages - get the number of buffer pages in the ring buffer
  * @buffer: The ring_buffer to get the number of pages from
@@ -2545,7 +2749,8 @@ rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
 
 	delta = rb_time_delta(event);
 
-	write_stamp = local64_read(&cpu_buffer->write_stamp);
+	if (!rb_time_read(&cpu_buffer->write_stamp, &write_stamp))
+		return 0;
 
 	/* Make sure the write stamp is read before testing the location */
 	barrier();
@@ -2554,11 +2759,10 @@ rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
 		unsigned long write_mask =
 			local_read(&bpage->write) & ~RB_WRITE_MASK;
 		unsigned long event_length = rb_event_length(event);
-		u64 ret;
 
-		ret = local64_cmpxchg(&cpu_buffer->write_stamp, write_stamp, write_stamp - delta);
 		/* Something came in, can't discard */
-		if (ret != write_stamp)
+		if (!rb_time_cmpxchg(&cpu_buffer->write_stamp,
+				       write_stamp, write_stamp - delta))
 			return 0;
 
 		/*
@@ -2889,11 +3093,13 @@ static noinline void
 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
 		   struct rb_event_info *info)
 {
+	u64 write_stamp;
+
 	WARN_ONCE(info->delta > (1ULL << 59),
 		  KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n%s",
 		  (unsigned long long)info->delta,
 		  (unsigned long long)info->ts,
-		  (unsigned long long)local64_read(&cpu_buffer->write_stamp),
+		  (unsigned long long)(rb_time_read(&cpu_buffer->write_stamp, &write_stamp) ? write_stamp : 0),
 		  sched_clock_stable() ? "" :
 		  "If you just came from a suspend/resume,\n"
 		  "please switch to the trace global clock:\n"
@@ -2909,14 +3115,16 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
 	struct buffer_page *tail_page;
 	unsigned long tail, write, w;
 	u64 before, after;
+	bool a_ok;
+	bool b_ok;
 
 	/* Don't let the compiler play games with cpu_buffer->tail_page */
 	tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page);
 
  /*A*/	w = local_read(&tail_page->write) & RB_WRITE_MASK;
 	barrier();
-	before = local64_read(&cpu_buffer->before_stamp);
-	after = local64_read(&cpu_buffer->write_stamp);
+	b_ok = rb_time_read(&cpu_buffer->before_stamp, &before);
+	a_ok = rb_time_read(&cpu_buffer->write_stamp, &after);
 	barrier();
 	info->ts = rb_time_stamp(cpu_buffer->buffer);
 
@@ -2927,16 +3135,18 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
 		info->delta = info->ts - after;
 	}
 
-	if (unlikely(test_time_stamp(info->delta))) {
-		rb_check_timestamp(cpu_buffer, info);
-		info->add_timestamp |= RB_ADD_STAMP_EXTEND;
+	if (likely(a_ok && b_ok)) {
+		if (unlikely(test_time_stamp(info->delta))) {
+			rb_check_timestamp(cpu_buffer, info);
+			info->add_timestamp |= RB_ADD_STAMP_EXTEND;
+		}
 	}
 
 	/*
 	 * If interrupting an event time update, we may need an absolute timestamp.
 	 * Don't bother if this is the start of a new page (w == 0).
 	 */
-	if (unlikely(before != after && w))
+	if (unlikely(!a_ok || !b_ok || (before != after && w)))
 		info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND;
 
 	/*
@@ -2947,7 +3157,7 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
 	if (unlikely(info->add_timestamp))
 		info->length += RB_LEN_TIME_EXTEND;
 
- /*B*/	local64_set(&cpu_buffer->before_stamp, info->ts);
+ /*B*/	rb_time_set(&cpu_buffer->before_stamp, info->ts);
 
  /*C*/	write = local_add_return(info->length, &tail_page->write);
 
@@ -2960,21 +3170,23 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
 	if (unlikely(write > BUF_PAGE_SIZE)) {
 		if (tail != w) {
 			/* before and after may now different, fix it up*/
-			before = local64_read(&cpu_buffer->before_stamp);
-			after = local64_read(&cpu_buffer->write_stamp);
-			if (before != after)
-				(void)local64_cmpxchg(&cpu_buffer->before_stamp, before, after);
+			b_ok = rb_time_read(&cpu_buffer->before_stamp, &before);
+			a_ok = rb_time_read(&cpu_buffer->write_stamp, &after);
+			if (a_ok && b_ok && before != after)
+				(void)rb_time_cmpxchg(&cpu_buffer->before_stamp, before, after);
 		}
 		return rb_move_tail(cpu_buffer, tail, info);
 	}
 
 	if (likely(tail == w)) {
 		u64 save_before;
+		bool s_ok;
 
 		/* Nothing interrupted us between A and C */
- /*D*/		local64_set(&cpu_buffer->write_stamp, info->ts);
+ /*D*/		rb_time_set(&cpu_buffer->write_stamp, info->ts);
 		barrier();
- /*E*/		save_before = local64_read(&cpu_buffer->before_stamp);
+ /*E*/		s_ok = rb_time_read(&cpu_buffer->before_stamp, &save_before);
+		RB_WARN_ON(cpu_buffer, !s_ok);
 		if (likely(!(info->add_timestamp &
 			     (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE))))
 			/* This did not interrupt any time update */
@@ -2986,27 +3198,31 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
 		if (unlikely(info->ts != save_before)) {
 			/* SLOW PATH - Interrupted between C and E */
 
-			after = local64_read(&cpu_buffer->write_stamp);
+			a_ok = rb_time_read(&cpu_buffer->write_stamp, &after);
+			RB_WARN_ON(cpu_buffer, !a_ok);
+
 			/* Write stamp must only go forward */
 			if (save_before > after) {
 				/*
 				 * We do not care about the result, only that
 				 * it gets updated atomically.
 				 */
-				(void)local64_cmpxchg(&cpu_buffer->write_stamp, after, save_before);
+				(void)rb_time_cmpxchg(&cpu_buffer->write_stamp, after, save_before);
 			}
 		}
 	} else {
 		u64 ts;
 		/* SLOW PATH - Interrupted between A and C */
-		after = local64_read(&cpu_buffer->write_stamp);
+		a_ok = rb_time_read(&cpu_buffer->write_stamp, &after);
+		/* Was interrupted before here, write_stamp must be valid */
+		RB_WARN_ON(cpu_buffer, !a_ok);
 		ts = rb_time_stamp(cpu_buffer->buffer);
 		barrier();
  /*E*/		if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) &&
 		    after < ts) {
 			/* Nothing came after this event between C and E */
 			info->delta = ts - after;
-			(void)local64_cmpxchg(&cpu_buffer->write_stamp, after, info->ts);
+			(void)rb_time_cmpxchg(&cpu_buffer->write_stamp, after, info->ts);
 			info->ts = ts;
 		} else {
 			/*
@@ -4565,8 +4781,8 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
 	cpu_buffer->read = 0;
 	cpu_buffer->read_bytes = 0;
 
-	local64_set(&cpu_buffer->write_stamp, 0);
-	local64_set(&cpu_buffer->before_stamp, 0);
+	rb_time_set(&cpu_buffer->write_stamp, 0);
+	rb_time_set(&cpu_buffer->before_stamp, 0);
 
 	cpu_buffer->lost_events = 0;
 	cpu_buffer->last_overrun = 0;
-- 
2.26.2



  parent reply	other threads:[~2020-07-02 21:58 UTC|newest]

Thread overview: 24+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-07-02 21:58 [for-next][PATCH 00/18] tracing: Updatse for 5.9 Steven Rostedt
2020-07-02 21:58 ` [for-next][PATCH 01/18] tracing: Only allow trace_array_printk() to be used by instances Steven Rostedt
2020-07-02 21:58 ` [for-next][PATCH 02/18] x86/ftrace: Make non direct case the default in ftrace_regs_caller Steven Rostedt
2020-07-02 21:58 ` [for-next][PATCH 03/18] x86/ftrace: Only have the builtin ftrace_regs_caller call direct hooks Steven Rostedt
2020-07-02 21:58 ` [for-next][PATCH 04/18] x86/ftrace: Do not jump to direct code in created trampolines Steven Rostedt
2020-07-03  8:10   ` Peter Zijlstra
2020-07-14  3:24     ` Steven Rostedt
2020-07-02 21:58 ` [for-next][PATCH 05/18] tracing: not necessary to undefine DEFINE_EVENT again Steven Rostedt
2020-07-02 21:58 ` [for-next][PATCH 06/18] tracing: not necessary re-define DEFINE_EVENT_PRINT Steven Rostedt
2020-07-02 21:58 ` [for-next][PATCH 07/18] tracing: define DEFINE_EVENT_PRINT not related to DEFINE_EVENT Steven Rostedt
2020-07-02 21:58 ` [for-next][PATCH 08/18] tracing: not necessary to define DEFINE_EVENT_PRINT to be empty again Steven Rostedt
2020-07-02 21:58 ` [for-next][PATCH 09/18] tracing: Move pipe reference to trace array instead of current_tracer Steven Rostedt
2020-07-31 19:33   ` dann frazier
2020-07-31 21:16     ` Steven Rostedt
2020-07-31 21:17       ` dann frazier
2020-07-02 21:58 ` [for-next][PATCH 10/18] ring-buffer: Have nested events still record running time stamp Steven Rostedt
2020-07-02 21:58 ` [for-next][PATCH 11/18] ring-buffer: Incorporate absolute timestamp into add_timestamp logic Steven Rostedt
2020-07-02 21:58 ` Steven Rostedt [this message]
2020-07-02 21:58 ` [for-next][PATCH 13/18] ring-buffer: speed up buffer resets by avoiding synchronize_rcu for each CPU Steven Rostedt
2020-07-02 21:58 ` [for-next][PATCH 14/18] ring-buffer: Mark the !tail (crossing a page) as unlikely Steven Rostedt
2020-07-02 21:58 ` [for-next][PATCH 15/18] ring-buffer: Consolidate add_timestamp to remove some branches Steven Rostedt
2020-07-02 21:58 ` [for-next][PATCH 16/18] ring-buffer: Move the add_timestamp into its own function Steven Rostedt
2020-07-02 21:58 ` [for-next][PATCH 17/18] ring-buffer: Call trace_clock_local() directly for RETPOLINE kernels Steven Rostedt
2020-07-02 21:58 ` [for-next][PATCH 18/18] ring-buffer: Do not trigger a WARN if clock going backwards is detected Steven Rostedt

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20200702215833.251640706@goodmis.org \
    --to=rostedt@goodmis.org \
    --cc=akpm@linux-foundation.org \
    --cc=linux-kernel@vger.kernel.org \
    --cc=mingo@kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).