linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH 0/3] ring-buffer: Restructure ftrace ring buffer time keeping to allow accurate nested timing
@ 2020-06-27  1:00 Steven Rostedt
  2020-06-27  1:00 ` [PATCH 1/3] ring-buffer: Have nested events still record running time stamp Steven Rostedt
                   ` (2 more replies)
  0 siblings, 3 replies; 7+ messages in thread
From: Steven Rostedt @ 2020-06-27  1:00 UTC (permalink / raw)
  To: linux-kernel
  Cc: Ingo Molnar, Mathieu Desnoyers, Peter Zijlstra, Thomas Gleixner,
	Masami Hiramatsu, Arnaldo Carvalho de Melo, Jiri Olsa,
	Namhyung Kim, Yordan Karadzhov, Tzvetomir Stoyanov, Tom Zanussi,
	Jason Behmer, Julia Lawall, Clark Williams, bristot,
	Daniel Wagner, Darren Hart, Jonathan Corbet, Suresh E. Warrier

I completed some thorough testing on these patches now, and have injected
trace_printk()s (in a way to allow it to safely recurse) to force various
data races and then examined the trace to make sure that everything it did
was exactly what I expect it to do, or in cases where it did something
surprising, I found that it was still a legitimate case! ;-)

I found a few bugs along the way, but it all still very much matches the
original design. The bugs were more in the implementation. But now that I
feel I have those all straighten out, I'm much more confident in this code
(famous last words!).

Although I did do a lot of custom testing (with all the injecting of
trace_printk()s), I have only run these through my standard smoke tests and
have not yet run these through my main test suite (13 hour run time). But
will do that when I have other non related patches ready to go with it.

But for now, these are very close to my finished product. Feel free to try
to poke holes in this as well.

Special thanks to Mathieu Desnoyers for his annoying critique (and I mean
that in a very positive way!). If it wasn't for his comments, I would have
missed fixing a small design flaw (the switch back to delta time instead of
keeping with the full time stamp). It turned out that that path had other
issues, and without removing that path, I would not have been able to add
the last patch of this series.

Cheers.

-- Steve


Steven Rostedt (VMware) (3):
      ring-buffer: Have nested events still record running time stamp
      ring-buffer: Incorporate absolute timestamp into add_timestamp logic
      ring-buffer: Add rb_time_t 64 bit operations for speeding up 32 bit

----
 kernel/trace/ring_buffer.c | 503 ++++++++++++++++++++++++++++++++++++---------
 1 file changed, 406 insertions(+), 97 deletions(-)

^ permalink raw reply	[flat|nested] 7+ messages in thread

* [PATCH 1/3] ring-buffer: Have nested events still record running time stamp
  2020-06-27  1:00 [PATCH 0/3] ring-buffer: Restructure ftrace ring buffer time keeping to allow accurate nested timing Steven Rostedt
@ 2020-06-27  1:00 ` Steven Rostedt
  2020-06-28 16:23   ` Masami Hiramatsu
  2020-06-27  1:00 ` [PATCH 2/3] ring-buffer: Incorporate absolute timestamp into add_timestamp logic Steven Rostedt
  2020-06-27  1:00 ` [PATCH 3/3] ring-buffer: Add rb_time_t 64 bit operations for speeding up 32 bit Steven Rostedt
  2 siblings, 1 reply; 7+ messages in thread
From: Steven Rostedt @ 2020-06-27  1:00 UTC (permalink / raw)
  To: linux-kernel
  Cc: Ingo Molnar, Mathieu Desnoyers, Peter Zijlstra, Thomas Gleixner,
	Masami Hiramatsu, Arnaldo Carvalho de Melo, Jiri Olsa,
	Namhyung Kim, Yordan Karadzhov, Tzvetomir Stoyanov, Tom Zanussi,
	Jason Behmer, Julia Lawall, Clark Williams, bristot,
	Daniel Wagner, Darren Hart, Jonathan Corbet, Suresh E. Warrier

From: "Steven Rostedt (VMware)" <rostedt@goodmis.org>

Up until now, if an event is interrupted while it is recorded by an
interrupt, and that interrupt records events, the time of those events will
all be the same. This is because events only record the delta of the time
since the previous event (or beginning of a page), and to handle updating
the time keeping for that of nested events is extremely racy. After years of
thinking about this and several failed attempts, I finally have a solution
to solve this puzzle.

The problem is that you need to atomically calculate the delta and then
update the time stamp you made the delta from, as well as then record it
into the buffer, all this while at any time an interrupt can come in and
do the same thing. This is easy to solve with heavy weight atomics, but that
would be detrimental to the performance of the ring buffer. The current
state of affairs sacrificed the time deltas for nested events for
performance.

The reason for previous failed attempts at solving this puzzle was because I
was trying to completely avoid slow atomic operations like cmpxchg. I final
came to the conclusion to always avoid cmpxchg is not possible, which is why
those previous attempts always failed. But it is possible to pick one path
(the most common case) and avoid cmpxchg in that path, which is the "fast
path". The most common case is that an event will not be interrupted and
have other events added into it. An event can detect if it has
interrupted another event, and for these cases we can make it the slow
path and use the heavy operations like cmpxchg.

One more player was added to the game that made this possible, and that is
the "absolute timestamp" (by Tom Zanussi) that allows us to inject a full 59
bit time stamp. (Of course this breaks if a machine is running for more than
18 years without a reboot!).

There's barrier() placements around for being paranoid, even when they
are not needed because of other atomic functions near by. But those
should not hurt, as if they are not needed, they basically become a nop.

Note, this also makes the race window much smaller, which means there
are less slow paths to slow down the performance.

Here's the design of this solution:

 All this is per cpu, and only needs to worry about nested events (not
 parallel events).

The players:

 write_tail: The index in the buffer where new events can be written to.
     It is incremented via local_add() to reserve space for a new event.

 before_stamp: A time stamp set by all events before reserving space.

 write_stamp: A time stamp updated by events after it has successfully
     reserved space.

 next_write: A copy of "write_tail" used to help with races.

	/* Save the current position of write */
 [A]	w = local_read(write_tail);
	barrier();
	/* Read both before and write stamps before touching anything */
	before = READ_ONCE(before_stamp);
	after = local_read(write_stamp);
	barrier();

	/*
	 * If before and after are the same, then this event is not
	 * interrupting a time update. If it is, then reserve space for adding
	 * a full time stamp (this can turn into a time extend which is
	 * just an extended time delta but fill up the extra space).
	 */
	if (after != before)
		abs = true;

	ts = clock();

	/* Now update the before_stamp (everyone does this!) */
 [B]	WRITE_ONCE(before_stamp, ts);

	/* Read the current next_write and update it to what we want write
	 * to be after we reserve space. */
 	next = READ_ONCE(next_write);
	WRITE_ONCE(next_write, w + len);

	/* Now reserve space on the buffer */
 [C]	write = local_add_return(len, write_tail);

	/* Set tail to be were this event's data is */
	tail = write - len;

 	if (w == tail) {

		/* Nothing interrupted this between A and C */
 [D]		local_set(write_stamp, ts);
		barrier();
 [E]		save_before = READ_ONCE(before_stamp);

 		if (!abs) {
			/* This did not interrupt a time update */
			delta = ts - after;
		} else {
			delta = ts; /* The full time stamp will be in use */
		}
		if (ts != save_before) {
			/* slow path - Was interrupted between C and E */
			/* The update to write_stamp could have overwritten the update to
			 * it by the interrupting event, but before and after should be
			 * the same for all completed top events */
			after = local_read(write_stamp);
			if (save_before > after)
				local_cmpxchg(write_stamp, after, save_before);
		}
	} else {
		/* slow path - Interrupted between A and C */

		after = local_read(write_stamp);
		temp_ts = clock();
		barrier();
 [F]		if (write == local_read(write_tail) && after < temp_ts) {
			/* This was not interrupted since C and F
			 * The last write_stamp is still valid for the previous event
			 * in the buffer. */
			delta = temp_ts - after;
			/* OK to keep this new time stamp */
			ts = temp_ts;
		} else {
			/* Interrupted between C and F
			 * Well, there's no use to try to know what the time stamp
			 * is for the previous event. Just set delta to zero and
			 * be the same time as that event that interrupted us before
			 * the reservation of the buffer. */

			delta = 0;
		}
		/* No need to use full timestamps here */
		abs = 0;
	}

Link: https://lkml.kernel.org/r/20200625094454.732790f7@oasis.local.home

Signed-off-by: Steven Rostedt (VMware) <rostedt@goodmis.org>
---
 kernel/trace/ring_buffer.c | 281 ++++++++++++++++++++++++-------------
 1 file changed, 186 insertions(+), 95 deletions(-)

diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index 00867ff82412..4f13ae38b8f8 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -27,6 +27,7 @@
 #include <linux/oom.h>
 
 #include <asm/local.h>
+#include <asm/local64.h>
 
 static void update_pages_handler(struct work_struct *work);
 
@@ -418,6 +419,17 @@ struct rb_event_info {
 	int			add_timestamp;
 };
 
+/*
+ * Used for the add_timestamp
+ *  NONE
+ *  NORMAL - may be for either time extend or absolute
+ *  FORCE - force a full time stamp.
+ */
+enum {
+	RB_ADD_STAMP_NONE,
+	RB_ADD_STAMP_NORMAL,
+	RB_ADD_STAMP_FORCE
+};
 /*
  * Used for which event context the event is in.
  *  NMI     = 0
@@ -470,7 +482,9 @@ struct ring_buffer_per_cpu {
 	size_t				shortest_full;
 	unsigned long			read;
 	unsigned long			read_bytes;
-	u64				write_stamp;
+	unsigned long			next_write;
+	local64_t			write_stamp;
+	local64_t			before_stamp;
 	u64				read_stamp;
 	/* ring buffer pages to update, > 0 to add, < 0 to remove */
 	long				nr_pages_to_update;
@@ -2416,16 +2430,13 @@ rb_update_event(struct ring_buffer_per_cpu *cpu_buffer,
 	unsigned length = info->length;
 	u64 delta = info->delta;
 
-	/* Only a commit updates the timestamp */
-	if (unlikely(!rb_event_is_commit(cpu_buffer, event)))
-		delta = 0;
-
 	/*
 	 * If we need to add a timestamp, then we
 	 * add it to the start of the reserved space.
 	 */
 	if (unlikely(info->add_timestamp)) {
-		bool abs = ring_buffer_time_stamp_abs(cpu_buffer->buffer);
+		bool abs = info->add_timestamp == RB_ADD_STAMP_FORCE ||
+			ring_buffer_time_stamp_abs(cpu_buffer->buffer);
 
 		event = rb_add_time_stamp(event, abs ? info->delta : delta, abs);
 		length -= RB_LEN_TIME_EXTEND;
@@ -2480,6 +2491,39 @@ static inline bool sched_clock_stable(void)
 }
 #endif
 
+static __always_inline bool
+rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
+		   struct ring_buffer_event *event)
+{
+	unsigned long addr = (unsigned long)event;
+	unsigned long index;
+
+	index = rb_event_index(event);
+	addr &= PAGE_MASK;
+
+	return cpu_buffer->commit_page->page == (void *)addr &&
+		rb_commit_index(cpu_buffer) == index;
+}
+
+static u64 rb_time_delta(struct ring_buffer_event *event)
+{
+	switch (event->type_len) {
+	case RINGBUF_TYPE_PADDING:
+		return 0;
+
+	case RINGBUF_TYPE_TIME_EXTEND:
+		return ring_buffer_event_time_stamp(event);
+
+	case RINGBUF_TYPE_TIME_STAMP:
+		return 0;
+
+	case RINGBUF_TYPE_DATA:
+		return event->time_delta;
+	default:
+		return 0;
+	}
+}
+
 static inline int
 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
 		  struct ring_buffer_event *event)
@@ -2488,6 +2532,8 @@ rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
 	struct buffer_page *bpage;
 	unsigned long index;
 	unsigned long addr;
+	u64 write_stamp;
+	u64 delta;
 
 	new_index = rb_event_index(event);
 	old_index = new_index + rb_event_ts_length(event);
@@ -2496,10 +2542,32 @@ rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
 
 	bpage = READ_ONCE(cpu_buffer->tail_page);
 
+	delta = rb_time_delta(event);
+
+	write_stamp = local64_read(&cpu_buffer->write_stamp);
+
+	/* Make sure the write stamp is read before testing the location */
+	barrier();
+
 	if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {
 		unsigned long write_mask =
 			local_read(&bpage->write) & ~RB_WRITE_MASK;
 		unsigned long event_length = rb_event_length(event);
+		u64 ret;
+
+		ret = local64_cmpxchg(&cpu_buffer->write_stamp, write_stamp, write_stamp - delta);
+		/* Something came in, can't discard */
+		if (ret != write_stamp)
+			return 0;
+
+		/*
+		 * If an event were to come in now, it would see that the
+		 * write_stamp and the before_stamp are different, and assume
+		 * that this event just added itself before updating
+		 * the write stamp. The interrupting event will fix the
+		 * write stamp for us, and use the before stamp as its delta.
+		 */
+
 		/*
 		 * This is on the tail page. It is possible that
 		 * a write could come in and move the tail page
@@ -2551,10 +2619,6 @@ rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
 		local_set(&cpu_buffer->commit_page->page->commit,
 			  rb_page_write(cpu_buffer->commit_page));
 		rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
-		/* Only update the write stamp if the page has an event */
-		if (rb_page_write(cpu_buffer->commit_page))
-			cpu_buffer->write_stamp =
-				cpu_buffer->commit_page->page->time_stamp;
 		/* add barrier to keep gcc from optimizing too much */
 		barrier();
 	}
@@ -2626,54 +2690,10 @@ static inline void rb_event_discard(struct ring_buffer_event *event)
 		event->time_delta = 1;
 }
 
-static __always_inline bool
-rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
-		   struct ring_buffer_event *event)
-{
-	unsigned long addr = (unsigned long)event;
-	unsigned long index;
-
-	index = rb_event_index(event);
-	addr &= PAGE_MASK;
-
-	return cpu_buffer->commit_page->page == (void *)addr &&
-		rb_commit_index(cpu_buffer) == index;
-}
-
-static __always_inline void
-rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer,
-		      struct ring_buffer_event *event)
-{
-	u64 delta;
-
-	/*
-	 * The event first in the commit queue updates the
-	 * time stamp.
-	 */
-	if (rb_event_is_commit(cpu_buffer, event)) {
-		/*
-		 * A commit event that is first on a page
-		 * updates the write timestamp with the page stamp
-		 */
-		if (!rb_event_index(event))
-			cpu_buffer->write_stamp =
-				cpu_buffer->commit_page->page->time_stamp;
-		else if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) {
-			delta = ring_buffer_event_time_stamp(event);
-			cpu_buffer->write_stamp += delta;
-		} else if (event->type_len == RINGBUF_TYPE_TIME_STAMP) {
-			delta = ring_buffer_event_time_stamp(event);
-			cpu_buffer->write_stamp = delta;
-		} else
-			cpu_buffer->write_stamp += event->time_delta;
-	}
-}
-
 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
 		      struct ring_buffer_event *event)
 {
 	local_inc(&cpu_buffer->entries);
-	rb_update_write_stamp(cpu_buffer, event);
 	rb_end_commit(cpu_buffer);
 }
 
@@ -2872,13 +2892,13 @@ rb_handle_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
 		  KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n%s",
 		  (unsigned long long)info->delta,
 		  (unsigned long long)info->ts,
-		  (unsigned long long)cpu_buffer->write_stamp,
+		  (unsigned long long)local64_read(&cpu_buffer->write_stamp),
 		  sched_clock_stable() ? "" :
 		  "If you just came from a suspend/resume,\n"
 		  "please switch to the trace global clock:\n"
 		  "  echo global > /sys/kernel/debug/tracing/trace_clock\n"
 		  "or add trace_clock=global to the kernel command line\n");
-	info->add_timestamp = 1;
+	info->add_timestamp = RB_ADD_STAMP_NORMAL;
 }
 
 static struct ring_buffer_event *
@@ -2887,8 +2907,36 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
 {
 	struct ring_buffer_event *event;
 	struct buffer_page *tail_page;
-	unsigned long tail, write;
+	unsigned long tail, write, w, next;
+	u64 delta, before, after;
+	bool abs = false;
+
+	/* Don't let the compiler play games with cpu_buffer->tail_page */
+	tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page);
+
+ /*A*/	w = local_read(&tail_page->write) & RB_WRITE_MASK;
+	barrier();
+	before = local64_read(&cpu_buffer->before_stamp);
+	after = local64_read(&cpu_buffer->write_stamp);
+	barrier();
+	info->ts = rb_time_stamp(cpu_buffer->buffer);
+
+	if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) {
+		info->delta = info->ts;
+		abs = true;
+	} else {
+		info->delta = info->ts - after;
+	}
+
+	if (unlikely(test_time_stamp(info->delta)))
+		rb_handle_timestamp(cpu_buffer, info);
 
+	/*
+	 * If interrupting an event time update, we may need an absolute timestamp.
+	 * Don't bother if this is the start of a new page (w == 0).
+	 */
+	if (unlikely(before != after && w))
+		info->add_timestamp = RB_ADD_STAMP_FORCE;
 	/*
 	 * If the time delta since the last event is too big to
 	 * hold in the time field of the event, then we append a
@@ -2897,25 +2945,91 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
 	if (unlikely(info->add_timestamp))
 		info->length += RB_LEN_TIME_EXTEND;
 
-	/* Don't let the compiler play games with cpu_buffer->tail_page */
-	tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page);
-	write = local_add_return(info->length, &tail_page->write);
+	next = READ_ONCE(cpu_buffer->next_write);
+	WRITE_ONCE(cpu_buffer->next_write, w + info->length);
+
+ /*B*/	local64_set(&cpu_buffer->before_stamp, info->ts);
+
+ /*C*/	write = local_add_return(info->length, &tail_page->write);
 
 	/* set write to only the index of the write */
 	write &= RB_WRITE_MASK;
+
 	tail = write - info->length;
 
+	/* See if we shot pass the end of this buffer page */
+	if (unlikely(write > BUF_PAGE_SIZE)) {
+		if (tail != w) {
+			/* before and after may now different, fix it up*/
+			before = local64_read(&cpu_buffer->before_stamp);
+			after = local64_read(&cpu_buffer->write_stamp);
+			if (before != after)
+				(void)local64_cmpxchg(&cpu_buffer->before_stamp, before, after);
+		}
+		return rb_move_tail(cpu_buffer, tail, info);
+	}
+
+	if (likely(tail == w)) {
+		u64 save_before;
+
+		/* Nothing interrupted us between A and C */
+ /*D*/		local64_set(&cpu_buffer->write_stamp, info->ts);
+		barrier();
+ /*E*/		save_before = local64_read(&cpu_buffer->before_stamp);
+		if (likely(info->add_timestamp != RB_ADD_STAMP_FORCE))
+			/* This did not interrupt any time update */
+			info->delta = info->ts - after;
+		else
+			/* Just use full timestamp for inerrupting event */
+			info->delta = info->ts;
+		barrier();
+		if (unlikely(info->ts != save_before)) {
+			/* SLOW PATH - Interrupted between C and E */
+
+			after = local64_read(&cpu_buffer->write_stamp);
+			/* Write stamp must only go forward */
+			if (save_before > after) {
+				/*
+				 * We do not care about the result, only that
+				 * it gets updated atomically.
+				 */
+				(void)local64_cmpxchg(&cpu_buffer->write_stamp, after, save_before);
+			}
+		}
+	} else {
+		u64 ts;
+		/* SLOW PATH - Interrupted between A and C */
+		after = local64_read(&cpu_buffer->write_stamp);
+		ts = rb_time_stamp(cpu_buffer->buffer);
+		barrier();
+ /*E*/		if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) &&
+		    after < ts) {
+			/* Nothing came after this event between C and E */
+			info->delta = ts - after;
+			(void)local64_cmpxchg(&cpu_buffer->write_stamp, after, info->ts);
+			info->ts = ts;
+		} else {
+			/*
+			 * Interrupted beween C and E:
+			 * Lost the previous events time stamp. Just set the
+			 * delta to zero, and this will be the same time as
+			 * the event this event interrupted. And the events that
+			 * came after this will still be correct (as they would
+			 * have built their delta on the previous event.
+			 */
+			info->delta = 0;
+		}
+		if (info->add_timestamp == RB_ADD_STAMP_FORCE)
+			info->add_timestamp = RB_ADD_STAMP_NORMAL;
+	}
+
 	/*
 	 * If this is the first commit on the page, then it has the same
 	 * timestamp as the page itself.
 	 */
-	if (!tail && !ring_buffer_time_stamp_abs(cpu_buffer->buffer))
+	if (unlikely(!tail && info->add_timestamp != RB_ADD_STAMP_FORCE && !abs))
 		info->delta = 0;
 
-	/* See if we shot pass the end of this buffer page */
-	if (unlikely(write > BUF_PAGE_SIZE))
-		return rb_move_tail(cpu_buffer, tail, info);
-
 	/* We reserved something on the buffer */
 
 	event = __rb_page_index(tail_page, tail);
@@ -2944,9 +3058,9 @@ rb_reserve_next_event(struct trace_buffer *buffer,
 	struct ring_buffer_event *event;
 	struct rb_event_info info;
 	int nr_loops = 0;
-	u64 diff;
 
 	rb_start_commit(cpu_buffer);
+	/* The commit page can not change after this */
 
 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
 	/*
@@ -2965,7 +3079,7 @@ rb_reserve_next_event(struct trace_buffer *buffer,
 
 	info.length = rb_calculate_event_length(length);
  again:
-	info.add_timestamp = 0;
+	info.add_timestamp = RB_ADD_STAMP_NONE;
 	info.delta = 0;
 
 	/*
@@ -2980,22 +3094,6 @@ rb_reserve_next_event(struct trace_buffer *buffer,
 	if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
 		goto out_fail;
 
-	info.ts = rb_time_stamp(cpu_buffer->buffer);
-	diff = info.ts - cpu_buffer->write_stamp;
-
-	/* make sure this diff is calculated here */
-	barrier();
-
-	if (ring_buffer_time_stamp_abs(buffer)) {
-		info.delta = info.ts;
-		rb_handle_timestamp(cpu_buffer, &info);
-	} else /* Did the write stamp get updated already? */
-		if (likely(info.ts >= cpu_buffer->write_stamp)) {
-		info.delta = diff;
-		if (unlikely(test_time_stamp(info.delta)))
-			rb_handle_timestamp(cpu_buffer, &info);
-	}
-
 	event = __rb_reserve_next(cpu_buffer, &info);
 
 	if (unlikely(PTR_ERR(event) == -EAGAIN)) {
@@ -3004,11 +3102,8 @@ rb_reserve_next_event(struct trace_buffer *buffer,
 		goto again;
 	}
 
-	if (!event)
-		goto out_fail;
-
-	return event;
-
+	if (likely(event))
+		return event;
  out_fail:
 	rb_end_commit(cpu_buffer);
 	return NULL;
@@ -3154,11 +3249,6 @@ void ring_buffer_discard_commit(struct trace_buffer *buffer,
 	if (rb_try_to_discard(cpu_buffer, event))
 		goto out;
 
-	/*
-	 * The commit is still visible by the reader, so we
-	 * must still update the timestamp.
-	 */
-	rb_update_write_stamp(cpu_buffer, event);
  out:
 	rb_end_commit(cpu_buffer);
 
@@ -4475,8 +4565,9 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
 	cpu_buffer->read = 0;
 	cpu_buffer->read_bytes = 0;
 
-	cpu_buffer->write_stamp = 0;
-	cpu_buffer->read_stamp = 0;
+	local64_set(&cpu_buffer->write_stamp, 0);
+	local64_set(&cpu_buffer->before_stamp, 0);
+	cpu_buffer->next_write = 0;
 
 	cpu_buffer->lost_events = 0;
 	cpu_buffer->last_overrun = 0;
-- 
2.26.2



^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH 2/3] ring-buffer: Incorporate absolute timestamp into add_timestamp logic
  2020-06-27  1:00 [PATCH 0/3] ring-buffer: Restructure ftrace ring buffer time keeping to allow accurate nested timing Steven Rostedt
  2020-06-27  1:00 ` [PATCH 1/3] ring-buffer: Have nested events still record running time stamp Steven Rostedt
@ 2020-06-27  1:00 ` Steven Rostedt
  2020-06-27  1:00 ` [PATCH 3/3] ring-buffer: Add rb_time_t 64 bit operations for speeding up 32 bit Steven Rostedt
  2 siblings, 0 replies; 7+ messages in thread
From: Steven Rostedt @ 2020-06-27  1:00 UTC (permalink / raw)
  To: linux-kernel
  Cc: Ingo Molnar, Mathieu Desnoyers, Peter Zijlstra, Thomas Gleixner,
	Masami Hiramatsu, Arnaldo Carvalho de Melo, Jiri Olsa,
	Namhyung Kim, Yordan Karadzhov, Tzvetomir Stoyanov, Tom Zanussi,
	Jason Behmer, Julia Lawall, Clark Williams, bristot,
	Daniel Wagner, Darren Hart, Jonathan Corbet, Suresh E. Warrier

From: "Steven Rostedt (VMware)" <rostedt@goodmis.org>

Instead of calling out the absolute test for each time to check if the
ring buffer wants absolute time stamps for all its recording, incorporate it
with the add_timestamp field and turn it into flags for faster processing
between wanting a absolute tag and needing to force one.

Signed-off-by: Steven Rostedt (VMware) <rostedt@goodmis.org>
---
 kernel/trace/ring_buffer.c | 42 +++++++++++++++++++++-----------------
 1 file changed, 23 insertions(+), 19 deletions(-)

diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index 4f13ae38b8f8..986ddc8eba93 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -422,13 +422,15 @@ struct rb_event_info {
 /*
  * Used for the add_timestamp
  *  NONE
- *  NORMAL - may be for either time extend or absolute
+ *  EXTEND - wants a time extend
+ *  ABSOLUTE - the buffer requests all events to have absolute time stamps
  *  FORCE - force a full time stamp.
  */
 enum {
-	RB_ADD_STAMP_NONE,
-	RB_ADD_STAMP_NORMAL,
-	RB_ADD_STAMP_FORCE
+	RB_ADD_STAMP_NONE		= 0,
+	RB_ADD_STAMP_EXTEND		= BIT(1),
+	RB_ADD_STAMP_ABSOLUTE		= BIT(2),
+	RB_ADD_STAMP_FORCE		= BIT(3)
 };
 /*
  * Used for which event context the event is in.
@@ -2435,8 +2437,8 @@ rb_update_event(struct ring_buffer_per_cpu *cpu_buffer,
 	 * add it to the start of the reserved space.
 	 */
 	if (unlikely(info->add_timestamp)) {
-		bool abs = info->add_timestamp == RB_ADD_STAMP_FORCE ||
-			ring_buffer_time_stamp_abs(cpu_buffer->buffer);
+		bool abs = info->add_timestamp &
+			(RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE);
 
 		event = rb_add_time_stamp(event, abs ? info->delta : delta, abs);
 		length -= RB_LEN_TIME_EXTEND;
@@ -2885,8 +2887,8 @@ int ring_buffer_unlock_commit(struct trace_buffer *buffer,
 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit);
 
 static noinline void
-rb_handle_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
-		    struct rb_event_info *info)
+rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
+		   struct rb_event_info *info)
 {
 	WARN_ONCE(info->delta > (1ULL << 59),
 		  KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n%s",
@@ -2898,7 +2900,6 @@ rb_handle_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
 		  "please switch to the trace global clock:\n"
 		  "  echo global > /sys/kernel/debug/tracing/trace_clock\n"
 		  "or add trace_clock=global to the kernel command line\n");
-	info->add_timestamp = RB_ADD_STAMP_NORMAL;
 }
 
 static struct ring_buffer_event *
@@ -2908,8 +2909,7 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
 	struct ring_buffer_event *event;
 	struct buffer_page *tail_page;
 	unsigned long tail, write, w, next;
-	u64 delta, before, after;
-	bool abs = false;
+	u64 before, after;
 
 	/* Don't let the compiler play games with cpu_buffer->tail_page */
 	tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page);
@@ -2923,20 +2923,23 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
 
 	if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) {
 		info->delta = info->ts;
-		abs = true;
+		info->add_timestamp = RB_ADD_STAMP_ABSOLUTE;
 	} else {
 		info->delta = info->ts - after;
 	}
 
-	if (unlikely(test_time_stamp(info->delta)))
-		rb_handle_timestamp(cpu_buffer, info);
+	if (unlikely(test_time_stamp(info->delta))) {
+		rb_check_timestamp(cpu_buffer, info);
+		info->add_timestamp |= RB_ADD_STAMP_EXTEND;
+	}
 
 	/*
 	 * If interrupting an event time update, we may need an absolute timestamp.
 	 * Don't bother if this is the start of a new page (w == 0).
 	 */
 	if (unlikely(before != after && w))
-		info->add_timestamp = RB_ADD_STAMP_FORCE;
+		info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND;
+
 	/*
 	 * If the time delta since the last event is too big to
 	 * hold in the time field of the event, then we append a
@@ -2976,7 +2979,8 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
  /*D*/		local64_set(&cpu_buffer->write_stamp, info->ts);
 		barrier();
  /*E*/		save_before = local64_read(&cpu_buffer->before_stamp);
-		if (likely(info->add_timestamp != RB_ADD_STAMP_FORCE))
+		if (likely(!(info->add_timestamp &
+			     (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE))))
 			/* This did not interrupt any time update */
 			info->delta = info->ts - after;
 		else
@@ -3019,15 +3023,15 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
 			 */
 			info->delta = 0;
 		}
-		if (info->add_timestamp == RB_ADD_STAMP_FORCE)
-			info->add_timestamp = RB_ADD_STAMP_NORMAL;
+		info->add_timestamp &= ~RB_ADD_STAMP_FORCE;
 	}
 
 	/*
 	 * If this is the first commit on the page, then it has the same
 	 * timestamp as the page itself.
 	 */
-	if (unlikely(!tail && info->add_timestamp != RB_ADD_STAMP_FORCE && !abs))
+	if (unlikely(!tail && !(info->add_timestamp &
+				(RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE))))
 		info->delta = 0;
 
 	/* We reserved something on the buffer */
-- 
2.26.2



^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH 3/3] ring-buffer: Add rb_time_t 64 bit operations for speeding up 32 bit
  2020-06-27  1:00 [PATCH 0/3] ring-buffer: Restructure ftrace ring buffer time keeping to allow accurate nested timing Steven Rostedt
  2020-06-27  1:00 ` [PATCH 1/3] ring-buffer: Have nested events still record running time stamp Steven Rostedt
  2020-06-27  1:00 ` [PATCH 2/3] ring-buffer: Incorporate absolute timestamp into add_timestamp logic Steven Rostedt
@ 2020-06-27  1:00 ` Steven Rostedt
  2020-06-27  6:42   ` kernel test robot
  2 siblings, 1 reply; 7+ messages in thread
From: Steven Rostedt @ 2020-06-27  1:00 UTC (permalink / raw)
  To: linux-kernel
  Cc: Ingo Molnar, Mathieu Desnoyers, Peter Zijlstra, Thomas Gleixner,
	Masami Hiramatsu, Arnaldo Carvalho de Melo, Jiri Olsa,
	Namhyung Kim, Yordan Karadzhov, Tzvetomir Stoyanov, Tom Zanussi,
	Jason Behmer, Julia Lawall, Clark Williams, bristot,
	Daniel Wagner, Darren Hart, Jonathan Corbet, Suresh E. Warrier

From: "Steven Rostedt (VMware)" <rostedt@goodmis.org>

After a discussion with the new time algorithm to have nested events still
have proper time keeping but required using local64_t atomic operations.
Mathieu was concerned about the performance this would have on 32 bit
machines, as in most cases, atomic 64 bit operations on them can be
expensive.

As the ring buffer's timing needs do not require full features of local64_t,
a wrapper is made to implement a new rb_time_t operation that uses two longs
on 32 bit machines but still uses the local64_t operations on 64 bit
machines. There's a switch that can be made in the file to force 64 bit to
use the 32 bit version just for testing purposes.

All reads do not need to succeed if a read happened while the stamp being
read is in the process of being updated. The requirement is that all reads
must succed that were done by an interrupting event (where this event was
interrupted by another event that did the write). Or if the event itself did
the write first. That is: rb_time_set(t, x) followed by rb_time_read(t) will
always succeed (even if it gets interrupted by another event that writes to
t. The result of the read will be either the previous set, or a set
performed by an interrupting event.

If the read is done by an event that interrupted another event that was in
the process of setting the time stamp, and no other event came along to
write to that time stamp, it will fail and the rb_time_read() will return
that it failed (the value to read will be undefined).

A set will always write to the time stamp and return with a valid time
stamp, such that any read after it will be valid.

A cmpxchg may fail if it interrupted an event that was in the process of
updating the time stamp just like the reads do. Other than that, it will act
like a normal cmpxchg.

The way this works is that the rb_time_t is made of of three fields. A cnt,
that gets updated atomically everyting a modification is made. A top that
represents the most significant 30 bits of the time, and a bottom to
represent the least significant 30 bits of the time. Notice, that the time
values is only 60 bits long (where the ring buffer only uses 59 bits, which
gives us 18 years of nanoseconds!).

The top two bits of both the top and bottom is a 2 bit counter that gets set
by the value of the least two significant bits of the cnt. A read of the top
and the bottom where both the top and bottom have the same most significant
top 2 bits, are considered a match and a valid 60 bit number can be created
from it. If they do not match, then the number is considered invalid, and
this must only happen if an event interrupted another event in the midst of
updating the time stamp.

This is only used for 32 bits machines as 64 bit machines can get better
performance out of the local64_t. This has been tested heavily by forcing 64
bit to use this logic.

Link: https://lore.kernel.org/r/20200625225345.18cf5881@oasis.local.home

Inspired-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: Steven Rostedt (VMware) <rostedt@goodmis.org>
---
 kernel/trace/ring_buffer.c | 260 +++++++++++++++++++++++++++++++++----
 1 file changed, 237 insertions(+), 23 deletions(-)

diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index 986ddc8eba93..c2c0d25ea004 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -449,6 +449,27 @@ enum {
 	RB_CTX_MAX
 };
 
+#if BITS_PER_LONG == 32
+#define RB_TIME_32
+#endif
+
+/* To test on 64 bit machines */
+//#define RB_TIME_32
+
+#ifdef RB_TIME_32
+
+struct rb_time_struct {
+	local_t		cnt;
+	local_t		top;
+	local_t		bottom;
+};
+#else
+struct rb_time_struct {
+	local64_t	time;
+};
+#endif
+typedef struct rb_time_struct rb_time_t;
+
 /*
  * head_page == tail_page && head == tail then buffer is empty.
  */
@@ -485,8 +506,8 @@ struct ring_buffer_per_cpu {
 	unsigned long			read;
 	unsigned long			read_bytes;
 	unsigned long			next_write;
-	local64_t			write_stamp;
-	local64_t			before_stamp;
+	rb_time_t			write_stamp;
+	rb_time_t			before_stamp;
 	u64				read_stamp;
 	/* ring buffer pages to update, > 0 to add, < 0 to remove */
 	long				nr_pages_to_update;
@@ -529,6 +550,189 @@ struct ring_buffer_iter {
 	int				missed_events;
 };
 
+#ifdef RB_TIME_32
+
+/*
+ * On 32 bit machines, local64_t is very expensive. As the ring
+ * buffer doesn't need all the features of a true 64 bit atomic,
+ * on 32 bit, it uses these functions (64 still uses local64_t).
+ *
+ * For the ring buffer, 64 bit required operations for the time is
+ * the following:
+ *
+ *  - Only need 59 bits (uses 60 to make it even).
+ *  - Reads may fail if it interrupted a modification of the time stamp.
+ *      It will succeed if it did not interrupt another write even if
+ *      the read itself is interrupted by a write.
+ *      It returns whether it was successful or not.
+ *
+ *  - Writes always succeed and will overwrite other writes and writes
+ *      that were done by events interrupting the current write.
+ *
+ *  - A write followed by a read of the same time stamp will always succeed,
+ *      but may not contain the same value.
+ *
+ *  - A cmpxchg will fail if it interrupted another write or cmpxchg.
+ *      Other than that, it acts like a normal cmpxchg.
+ *
+ * The 60 bit time stamp is broken up by 30 bits in a top and bottom half
+ *  (bottom being the least significant 30 bits of the 60 bit time stamp).
+ *
+ * The two most significant bits of each half holds a 2 bit counter (0-3).
+ * Each update will increment this counter by one.
+ * When reading the top and bottom, if the two counter bits match then the
+ *  top and bottom together make a valid 60 bit number.
+ */
+#define RB_TIME_SHIFT	30
+#define RB_TIME_VAL_MASK ((1 << RB_TIME_SHIFT) - 1)
+
+static inline int rb_time_cnt(unsigned long val)
+{
+	return (val >> RB_TIME_SHIFT) & 3;
+}
+
+static inline u64 rb_time_val(unsigned long top, unsigned long bottom)
+{
+	u64 val;
+
+	val = top & RB_TIME_VAL_MASK;
+	val <<= RB_TIME_SHIFT;
+	val |= bottom & RB_TIME_VAL_MASK;
+
+	return val;
+}
+
+static inline bool __rb_time_read(rb_time_t *t, u64 *ret, unsigned long *cnt)
+{
+	unsigned long top, bottom;
+	unsigned long c;
+
+	/*
+	 * If the read is interrupted by a write, then the cnt will
+	 * be different. Loop until both top and bottom have been read
+	 * without interruption.
+	 */
+	do {
+		c = local_read(&t->cnt);
+		top = local_read(&t->top);
+		bottom = local_read(&t->bottom);
+	} while (c != local_read(&t->cnt));
+
+	*cnt = rb_time_cnt(top);
+
+	/* If top and bottom counts don't match, this interrupted a write */
+	if (*cnt != rb_time_cnt(bottom))
+		return false;
+
+	*ret = rb_time_val(top, bottom);
+	return true;
+}
+
+static bool rb_time_read(rb_time_t *t, u64 *ret)
+{
+	unsigned long cnt;
+
+	return __rb_time_read(t, ret, &cnt);
+}
+
+static inline unsigned long rb_time_val_cnt(unsigned long val, unsigned long cnt)
+{
+	return (val & RB_TIME_VAL_MASK) | ((cnt & 3) << RB_TIME_SHIFT);
+}
+
+static inline void rb_time_split(u64 val, unsigned long *top, unsigned long *bottom)
+{
+	*top = (unsigned long)((val >> RB_TIME_SHIFT) & RB_TIME_VAL_MASK);
+	*bottom = (unsigned long)(val & RB_TIME_VAL_MASK);
+}
+
+static inline void rb_time_val_set(local_t *t, unsigned long val, unsigned long cnt)
+{
+	val = rb_time_val_cnt(val, cnt);
+	local_set(t, val);
+}
+
+static void rb_time_set(rb_time_t *t, u64 val)
+{
+	unsigned long cnt, top, bottom;
+
+	rb_time_split(val, &top, &bottom);
+
+	/* Writes always succeed with a valid number even if it gets interrupted. */
+	do {
+		cnt = local_inc_return(&t->cnt);
+		rb_time_val_set(&t->top, top, cnt);
+		rb_time_val_set(&t->bottom, bottom, cnt);
+	} while (cnt != local_read(&t->cnt));
+}
+
+static inline bool
+rb_time_read_cmpxchg(local_t *l, unsigned long expect, unsigned long set)
+{
+	unsigned long ret;
+
+	ret = local_cmpxchg(l, expect, set);
+	return ret == expect;
+}
+
+static int rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set)
+{
+	unsigned long cnt, top, bottom;
+	unsigned long cnt2, top2, bottom2;
+	u64 val;
+
+	/* The cmpxchg always fails if it interrupted an update */
+	 if (!__rb_time_read(t, &val, &cnt2))
+		 return false;
+
+	 if (val != expect)
+		 return false;
+
+	 cnt = local_read(&t->cnt);
+	 if ((cnt & 3) != cnt2)
+		 return false;
+
+	 cnt2 = cnt + 1;
+
+	 rb_time_split(val, &top, &bottom);
+	 top = rb_time_val_cnt(top, cnt);
+	 bottom = rb_time_val_cnt(bottom, cnt);
+
+	 rb_time_split(set, &top2, &bottom2);
+	 top2 = rb_time_val_cnt(top2, cnt2);
+	 bottom2 = rb_time_val_cnt(bottom2, cnt2);
+
+	if (!rb_time_read_cmpxchg(&t->cnt, cnt, cnt2))
+		return false;
+	if (!rb_time_read_cmpxchg(&t->top, top, top2))
+		return false;
+	if (!rb_time_read_cmpxchg(&t->bottom, bottom, bottom2))
+		return false;
+	return true;
+}
+
+#else /* 64 bits */
+
+/* local64_t always succeeds */
+
+static inline bool rb_time_read(rb_time_t *t, u64 *ret)
+{
+	*ret = local64_read(&t->time);
+	return true;
+}
+static void rb_time_set(rb_time_t *t, u64 val)
+{
+	local64_set(&t->time, val);
+}
+
+static bool rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set)
+{
+	u64 val;
+	val = local64_cmpxchg(&t->time, expect, set);
+	return val == expect;
+}
+#endif
+
 /**
  * ring_buffer_nr_pages - get the number of buffer pages in the ring buffer
  * @buffer: The ring_buffer to get the number of pages from
@@ -2546,7 +2750,8 @@ rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
 
 	delta = rb_time_delta(event);
 
-	write_stamp = local64_read(&cpu_buffer->write_stamp);
+	if (!rb_time_read(&cpu_buffer->write_stamp, &write_stamp))
+		return 0;
 
 	/* Make sure the write stamp is read before testing the location */
 	barrier();
@@ -2555,11 +2760,10 @@ rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
 		unsigned long write_mask =
 			local_read(&bpage->write) & ~RB_WRITE_MASK;
 		unsigned long event_length = rb_event_length(event);
-		u64 ret;
 
-		ret = local64_cmpxchg(&cpu_buffer->write_stamp, write_stamp, write_stamp - delta);
 		/* Something came in, can't discard */
-		if (ret != write_stamp)
+		if (!rb_time_cmpxchg(&cpu_buffer->write_stamp,
+				       write_stamp, write_stamp - delta))
 			return 0;
 
 		/*
@@ -2890,11 +3094,13 @@ static noinline void
 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
 		   struct rb_event_info *info)
 {
+	u64 write_stamp;
+
 	WARN_ONCE(info->delta > (1ULL << 59),
 		  KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n%s",
 		  (unsigned long long)info->delta,
 		  (unsigned long long)info->ts,
-		  (unsigned long long)local64_read(&cpu_buffer->write_stamp),
+		  (unsigned long long)(rb_time_read(&cpu_buffer->write_stamp, &write_stamp) ? write_stamp : 0),
 		  sched_clock_stable() ? "" :
 		  "If you just came from a suspend/resume,\n"
 		  "please switch to the trace global clock:\n"
@@ -2910,14 +3116,16 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
 	struct buffer_page *tail_page;
 	unsigned long tail, write, w, next;
 	u64 before, after;
+	bool a_ok;
+	bool b_ok;
 
 	/* Don't let the compiler play games with cpu_buffer->tail_page */
 	tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page);
 
  /*A*/	w = local_read(&tail_page->write) & RB_WRITE_MASK;
 	barrier();
-	before = local64_read(&cpu_buffer->before_stamp);
-	after = local64_read(&cpu_buffer->write_stamp);
+	b_ok = rb_time_read(&cpu_buffer->before_stamp, &before);
+	a_ok = rb_time_read(&cpu_buffer->write_stamp, &after);
 	barrier();
 	info->ts = rb_time_stamp(cpu_buffer->buffer);
 
@@ -2937,7 +3145,7 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
 	 * If interrupting an event time update, we may need an absolute timestamp.
 	 * Don't bother if this is the start of a new page (w == 0).
 	 */
-	if (unlikely(before != after && w))
+	if (unlikely(!a_ok || !b_ok || (before != after && w)))
 		info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND;
 
 	/*
@@ -2951,7 +3159,7 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
 	next = READ_ONCE(cpu_buffer->next_write);
 	WRITE_ONCE(cpu_buffer->next_write, w + info->length);
 
- /*B*/	local64_set(&cpu_buffer->before_stamp, info->ts);
+ /*B*/	rb_time_set(&cpu_buffer->before_stamp, info->ts);
 
  /*C*/	write = local_add_return(info->length, &tail_page->write);
 
@@ -2964,21 +3172,23 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
 	if (unlikely(write > BUF_PAGE_SIZE)) {
 		if (tail != w) {
 			/* before and after may now different, fix it up*/
-			before = local64_read(&cpu_buffer->before_stamp);
-			after = local64_read(&cpu_buffer->write_stamp);
-			if (before != after)
-				(void)local64_cmpxchg(&cpu_buffer->before_stamp, before, after);
+			b_ok = rb_time_read(&cpu_buffer->before_stamp, &before);
+			a_ok = rb_time_read(&cpu_buffer->write_stamp, &after);
+			if (a_ok && b_ok && before != after)
+				(void)rb_time_cmpxchg(&cpu_buffer->before_stamp, before, after);
 		}
 		return rb_move_tail(cpu_buffer, tail, info);
 	}
 
 	if (likely(tail == w)) {
 		u64 save_before;
+		bool s_ok;
 
 		/* Nothing interrupted us between A and C */
- /*D*/		local64_set(&cpu_buffer->write_stamp, info->ts);
+ /*D*/		rb_time_set(&cpu_buffer->write_stamp, info->ts);
 		barrier();
- /*E*/		save_before = local64_read(&cpu_buffer->before_stamp);
+ /*E*/		s_ok = rb_time_read(&cpu_buffer->before_stamp, &save_before);
+		RB_WARN_ON(cpu_buffer, !s_ok);
 		if (likely(!(info->add_timestamp &
 			     (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE))))
 			/* This did not interrupt any time update */
@@ -2990,27 +3200,31 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
 		if (unlikely(info->ts != save_before)) {
 			/* SLOW PATH - Interrupted between C and E */
 
-			after = local64_read(&cpu_buffer->write_stamp);
+			a_ok = rb_time_read(&cpu_buffer->write_stamp, &after);
+			RB_WARN_ON(cpu_buffer, !a_ok);
+
 			/* Write stamp must only go forward */
 			if (save_before > after) {
 				/*
 				 * We do not care about the result, only that
 				 * it gets updated atomically.
 				 */
-				(void)local64_cmpxchg(&cpu_buffer->write_stamp, after, save_before);
+				(void)rb_time_cmpxchg(&cpu_buffer->write_stamp, after, save_before);
 			}
 		}
 	} else {
 		u64 ts;
 		/* SLOW PATH - Interrupted between A and C */
-		after = local64_read(&cpu_buffer->write_stamp);
+		a_ok = rb_time_read(&cpu_buffer->write_stamp, &after);
+		/* Was interrupted before here, write_stamp must be valid */
+		RB_WARN_ON(cpu_buffer, !a_ok);
 		ts = rb_time_stamp(cpu_buffer->buffer);
 		barrier();
  /*E*/		if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) &&
 		    after < ts) {
 			/* Nothing came after this event between C and E */
 			info->delta = ts - after;
-			(void)local64_cmpxchg(&cpu_buffer->write_stamp, after, info->ts);
+			(void)rb_time_cmpxchg(&cpu_buffer->write_stamp, after, info->ts);
 			info->ts = ts;
 		} else {
 			/*
@@ -4569,8 +4783,8 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
 	cpu_buffer->read = 0;
 	cpu_buffer->read_bytes = 0;
 
-	local64_set(&cpu_buffer->write_stamp, 0);
-	local64_set(&cpu_buffer->before_stamp, 0);
+	rb_time_set(&cpu_buffer->write_stamp, 0);
+	rb_time_set(&cpu_buffer->before_stamp, 0);
 	cpu_buffer->next_write = 0;
 
 	cpu_buffer->lost_events = 0;
-- 
2.26.2



^ permalink raw reply related	[flat|nested] 7+ messages in thread

* Re: [PATCH 3/3] ring-buffer: Add rb_time_t 64 bit operations for speeding up 32 bit
  2020-06-27  1:00 ` [PATCH 3/3] ring-buffer: Add rb_time_t 64 bit operations for speeding up 32 bit Steven Rostedt
@ 2020-06-27  6:42   ` kernel test robot
  0 siblings, 0 replies; 7+ messages in thread
From: kernel test robot @ 2020-06-27  6:42 UTC (permalink / raw)
  To: Steven Rostedt, linux-kernel
  Cc: kbuild-all, Ingo Molnar, Mathieu Desnoyers, Peter Zijlstra,
	Thomas Gleixner, Masami Hiramatsu, Arnaldo Carvalho de Melo,
	Jiri Olsa, Namhyung Kim, Yordan Karadzhov

[-- Attachment #1: Type: text/plain, Size: 2233 bytes --]

Hi Steven,

Thank you for the patch! Perhaps something to improve:

[auto build test WARNING on linus/master]
[also build test WARNING on v5.8-rc2 next-20200626]
[cannot apply to tip/perf/core linux/master]
[If your patch is applied to the wrong git tree, kindly drop us a note.
And when submitting patch, we suggest to use  as documented in
https://git-scm.com/docs/git-format-patch]

url:    https://github.com/0day-ci/linux/commits/Steven-Rostedt/ring-buffer-Restructure-ftrace-ring-buffer-time-keeping-to-allow-accurate-nested-timing/20200627-091520
base:   https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git 1590a2e1c681b0991bd42c992cabfd380e0338f2
config: i386-randconfig-m021-20200624 (attached as .config)
compiler: gcc-9 (Debian 9.3.0-13) 9.3.0

If you fix the issue, kindly add following tag as appropriate
Reported-by: kernel test robot <lkp@intel.com>

smatch warnings:
kernel/trace/ring_buffer.c:685 rb_time_cmpxchg() warn: inconsistent indenting

vim +685 kernel/trace/ring_buffer.c

   677	
   678	static int rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set)
   679	{
   680		unsigned long cnt, top, bottom;
   681		unsigned long cnt2, top2, bottom2;
   682		u64 val;
   683	
   684		/* The cmpxchg always fails if it interrupted an update */
 > 685		 if (!__rb_time_read(t, &val, &cnt2))
   686			 return false;
   687	
   688		 if (val != expect)
   689			 return false;
   690	
   691		 cnt = local_read(&t->cnt);
   692		 if ((cnt & 3) != cnt2)
   693			 return false;
   694	
   695		 cnt2 = cnt + 1;
   696	
   697		 rb_time_split(val, &top, &bottom);
   698		 top = rb_time_val_cnt(top, cnt);
   699		 bottom = rb_time_val_cnt(bottom, cnt);
   700	
   701		 rb_time_split(set, &top2, &bottom2);
   702		 top2 = rb_time_val_cnt(top2, cnt2);
   703		 bottom2 = rb_time_val_cnt(bottom2, cnt2);
   704	
   705		if (!rb_time_read_cmpxchg(&t->cnt, cnt, cnt2))
   706			return false;
   707		if (!rb_time_read_cmpxchg(&t->top, top, top2))
   708			return false;
   709		if (!rb_time_read_cmpxchg(&t->bottom, bottom, bottom2))
   710			return false;
   711		return true;
   712	}
   713	

---
0-DAY CI Kernel Test Service, Intel Corporation
https://lists.01.org/hyperkitty/list/kbuild-all@lists.01.org

[-- Attachment #2: .config.gz --]
[-- Type: application/gzip, Size: 33656 bytes --]

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [PATCH 1/3] ring-buffer: Have nested events still record running time stamp
  2020-06-27  1:00 ` [PATCH 1/3] ring-buffer: Have nested events still record running time stamp Steven Rostedt
@ 2020-06-28 16:23   ` Masami Hiramatsu
  2020-06-28 16:43     ` Steven Rostedt
  0 siblings, 1 reply; 7+ messages in thread
From: Masami Hiramatsu @ 2020-06-28 16:23 UTC (permalink / raw)
  To: Steven Rostedt
  Cc: linux-kernel, Ingo Molnar, Mathieu Desnoyers, Peter Zijlstra,
	Thomas Gleixner, Masami Hiramatsu, Arnaldo Carvalho de Melo,
	Jiri Olsa, Namhyung Kim, Yordan Karadzhov, Tzvetomir Stoyanov,
	Tom Zanussi, Jason Behmer, Julia Lawall, Clark Williams, bristot,
	Daniel Wagner, Darren Hart, Jonathan Corbet, Suresh E. Warrier

Hi Steve,

On Fri, 26 Jun 2020 21:00:42 -0400
Steven Rostedt <rostedt@goodmis.org> wrote:

> From: "Steven Rostedt (VMware)" <rostedt@goodmis.org>
> 
> Up until now, if an event is interrupted while it is recorded by an
> interrupt, and that interrupt records events, the time of those events will
> all be the same. This is because events only record the delta of the time
> since the previous event (or beginning of a page), and to handle updating
> the time keeping for that of nested events is extremely racy. After years of
> thinking about this and several failed attempts, I finally have a solution
> to solve this puzzle.
> 
> The problem is that you need to atomically calculate the delta and then
> update the time stamp you made the delta from, as well as then record it
> into the buffer, all this while at any time an interrupt can come in and
> do the same thing. This is easy to solve with heavy weight atomics, but that
> would be detrimental to the performance of the ring buffer. The current
> state of affairs sacrificed the time deltas for nested events for
> performance.
> 
> The reason for previous failed attempts at solving this puzzle was because I
> was trying to completely avoid slow atomic operations like cmpxchg. I final
> came to the conclusion to always avoid cmpxchg is not possible, which is why
> those previous attempts always failed. But it is possible to pick one path
> (the most common case) and avoid cmpxchg in that path, which is the "fast
> path". The most common case is that an event will not be interrupted and
> have other events added into it. An event can detect if it has
> interrupted another event, and for these cases we can make it the slow
> path and use the heavy operations like cmpxchg.
> 
> One more player was added to the game that made this possible, and that is
> the "absolute timestamp" (by Tom Zanussi) that allows us to inject a full 59
> bit time stamp. (Of course this breaks if a machine is running for more than
> 18 years without a reboot!).
> 
> There's barrier() placements around for being paranoid, even when they
> are not needed because of other atomic functions near by. But those
> should not hurt, as if they are not needed, they basically become a nop.
> 
> Note, this also makes the race window much smaller, which means there
> are less slow paths to slow down the performance.
> 
> Here's the design of this solution:
> 
>  All this is per cpu, and only needs to worry about nested events (not
>  parallel events).
>

This looks basically good to me, but I have some comments below.
(just a clean up)
 
> The players:
> 
>  write_tail: The index in the buffer where new events can be written to.
>      It is incremented via local_add() to reserve space for a new event.
> 
>  before_stamp: A time stamp set by all events before reserving space.
> 
>  write_stamp: A time stamp updated by events after it has successfully
>      reserved space.

So these stamps works like a seq-lock to detect interruption (from both
interrupted process and interrpting process)

> 
>  next_write: A copy of "write_tail" used to help with races.

It seems this player does nothing.

> 
> 	/* Save the current position of write */
>  [A]	w = local_read(write_tail);
> 	barrier();
> 	/* Read both before and write stamps before touching anything */
> 	before = READ_ONCE(before_stamp);
> 	after = local_read(write_stamp);

Are there any reason to use READ_ONCE() and local_read()?
(In the code, both are local64_read())

> 	barrier();
> 
> 	/*
> 	 * If before and after are the same, then this event is not
> 	 * interrupting a time update. If it is, then reserve space for adding
> 	 * a full time stamp (this can turn into a time extend which is
> 	 * just an extended time delta but fill up the extra space).
> 	 */
> 	if (after != before)
> 		abs = true;
> 
> 	ts = clock();
> 
> 	/* Now update the before_stamp (everyone does this!) */
>  [B]	WRITE_ONCE(before_stamp, ts);
> 
> 	/* Read the current next_write and update it to what we want write
> 	 * to be after we reserve space. */
>  	next = READ_ONCE(next_write);
> 	WRITE_ONCE(next_write, w + len);

This seems meaningless, because neither "next" nor "next_write"
are not refered.

> 
> 	/* Now reserve space on the buffer */
>  [C]	write = local_add_return(len, write_tail);
> 
> 	/* Set tail to be were this event's data is */
> 	tail = write - len;
> 
>  	if (w == tail) {
> 
> 		/* Nothing interrupted this between A and C */
>  [D]		local_set(write_stamp, ts);
> 		barrier();
>  [E]		save_before = READ_ONCE(before_stamp);
> 
>  		if (!abs) {
> 			/* This did not interrupt a time update */
> 			delta = ts - after;
> 		} else {
> 			delta = ts; /* The full time stamp will be in use */
> 		}
> 		if (ts != save_before) {
> 			/* slow path - Was interrupted between C and E */
> 			/* The update to write_stamp could have overwritten the update to
> 			 * it by the interrupting event, but before and after should be
> 			 * the same for all completed top events */
> 			after = local_read(write_stamp);
> 			if (save_before > after)
> 				local_cmpxchg(write_stamp, after, save_before);
> 		}
> 	} else {
> 		/* slow path - Interrupted between A and C */
> 
> 		after = local_read(write_stamp);
> 		temp_ts = clock();
> 		barrier();
>  [F]		if (write == local_read(write_tail) && after < temp_ts) {
> 			/* This was not interrupted since C and F
> 			 * The last write_stamp is still valid for the previous event
> 			 * in the buffer. */
> 			delta = temp_ts - after;
> 			/* OK to keep this new time stamp */
> 			ts = temp_ts;
> 		} else {
> 			/* Interrupted between C and F
> 			 * Well, there's no use to try to know what the time stamp
> 			 * is for the previous event. Just set delta to zero and
> 			 * be the same time as that event that interrupted us before
> 			 * the reservation of the buffer. */
> 
> 			delta = 0;
> 		}
> 		/* No need to use full timestamps here */
> 		abs = 0;
> 	}
> 
> Link: https://lkml.kernel.org/r/20200625094454.732790f7@oasis.local.home
> 
> Signed-off-by: Steven Rostedt (VMware) <rostedt@goodmis.org>
> ---
>  kernel/trace/ring_buffer.c | 281 ++++++++++++++++++++++++-------------
>  1 file changed, 186 insertions(+), 95 deletions(-)
> 
> diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
> index 00867ff82412..4f13ae38b8f8 100644
> --- a/kernel/trace/ring_buffer.c
> +++ b/kernel/trace/ring_buffer.c
> @@ -27,6 +27,7 @@
>  #include <linux/oom.h>
>  
>  #include <asm/local.h>
> +#include <asm/local64.h>
>  
>  static void update_pages_handler(struct work_struct *work);
>  
> @@ -418,6 +419,17 @@ struct rb_event_info {
>  	int			add_timestamp;
>  };
>  
> +/*
> + * Used for the add_timestamp
> + *  NONE
> + *  NORMAL - may be for either time extend or absolute
> + *  FORCE - force a full time stamp.
> + */
> +enum {
> +	RB_ADD_STAMP_NONE,
> +	RB_ADD_STAMP_NORMAL,
> +	RB_ADD_STAMP_FORCE
> +};
>  /*
>   * Used for which event context the event is in.
>   *  NMI     = 0
> @@ -470,7 +482,9 @@ struct ring_buffer_per_cpu {
>  	size_t				shortest_full;
>  	unsigned long			read;
>  	unsigned long			read_bytes;
> -	u64				write_stamp;
> +	unsigned long			next_write;
> +	local64_t			write_stamp;
> +	local64_t			before_stamp;
>  	u64				read_stamp;
>  	/* ring buffer pages to update, > 0 to add, < 0 to remove */
>  	long				nr_pages_to_update;
> @@ -2416,16 +2430,13 @@ rb_update_event(struct ring_buffer_per_cpu *cpu_buffer,
>  	unsigned length = info->length;
>  	u64 delta = info->delta;
>  
> -	/* Only a commit updates the timestamp */
> -	if (unlikely(!rb_event_is_commit(cpu_buffer, event)))
> -		delta = 0;
> -
>  	/*
>  	 * If we need to add a timestamp, then we
>  	 * add it to the start of the reserved space.
>  	 */
>  	if (unlikely(info->add_timestamp)) {
> -		bool abs = ring_buffer_time_stamp_abs(cpu_buffer->buffer);
> +		bool abs = info->add_timestamp == RB_ADD_STAMP_FORCE ||
> +			ring_buffer_time_stamp_abs(cpu_buffer->buffer);
>  
>  		event = rb_add_time_stamp(event, abs ? info->delta : delta, abs);
>  		length -= RB_LEN_TIME_EXTEND;
> @@ -2480,6 +2491,39 @@ static inline bool sched_clock_stable(void)
>  }
>  #endif
>  
> +static __always_inline bool
> +rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
> +		   struct ring_buffer_event *event)
> +{
> +	unsigned long addr = (unsigned long)event;
> +	unsigned long index;
> +
> +	index = rb_event_index(event);
> +	addr &= PAGE_MASK;
> +
> +	return cpu_buffer->commit_page->page == (void *)addr &&
> +		rb_commit_index(cpu_buffer) == index;
> +}
> +
> +static u64 rb_time_delta(struct ring_buffer_event *event)
> +{
> +	switch (event->type_len) {
> +	case RINGBUF_TYPE_PADDING:
> +		return 0;
> +
> +	case RINGBUF_TYPE_TIME_EXTEND:
> +		return ring_buffer_event_time_stamp(event);
> +
> +	case RINGBUF_TYPE_TIME_STAMP:
> +		return 0;
> +
> +	case RINGBUF_TYPE_DATA:
> +		return event->time_delta;
> +	default:
> +		return 0;
> +	}
> +}
> +
>  static inline int
>  rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
>  		  struct ring_buffer_event *event)
> @@ -2488,6 +2532,8 @@ rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
>  	struct buffer_page *bpage;
>  	unsigned long index;
>  	unsigned long addr;
> +	u64 write_stamp;
> +	u64 delta;
>  
>  	new_index = rb_event_index(event);
>  	old_index = new_index + rb_event_ts_length(event);
> @@ -2496,10 +2542,32 @@ rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
>  
>  	bpage = READ_ONCE(cpu_buffer->tail_page);
>  
> +	delta = rb_time_delta(event);
> +
> +	write_stamp = local64_read(&cpu_buffer->write_stamp);
> +
> +	/* Make sure the write stamp is read before testing the location */
> +	barrier();
> +
>  	if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {
>  		unsigned long write_mask =
>  			local_read(&bpage->write) & ~RB_WRITE_MASK;
>  		unsigned long event_length = rb_event_length(event);
> +		u64 ret;
> +
> +		ret = local64_cmpxchg(&cpu_buffer->write_stamp, write_stamp, write_stamp - delta);
> +		/* Something came in, can't discard */
> +		if (ret != write_stamp)
> +			return 0;
> +
> +		/*
> +		 * If an event were to come in now, it would see that the
> +		 * write_stamp and the before_stamp are different, and assume
> +		 * that this event just added itself before updating
> +		 * the write stamp. The interrupting event will fix the
> +		 * write stamp for us, and use the before stamp as its delta.
> +		 */
> +
>  		/*
>  		 * This is on the tail page. It is possible that
>  		 * a write could come in and move the tail page
> @@ -2551,10 +2619,6 @@ rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
>  		local_set(&cpu_buffer->commit_page->page->commit,
>  			  rb_page_write(cpu_buffer->commit_page));
>  		rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
> -		/* Only update the write stamp if the page has an event */
> -		if (rb_page_write(cpu_buffer->commit_page))
> -			cpu_buffer->write_stamp =
> -				cpu_buffer->commit_page->page->time_stamp;
>  		/* add barrier to keep gcc from optimizing too much */
>  		barrier();
>  	}
> @@ -2626,54 +2690,10 @@ static inline void rb_event_discard(struct ring_buffer_event *event)
>  		event->time_delta = 1;
>  }
>  
> -static __always_inline bool
> -rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
> -		   struct ring_buffer_event *event)
> -{
> -	unsigned long addr = (unsigned long)event;
> -	unsigned long index;
> -
> -	index = rb_event_index(event);
> -	addr &= PAGE_MASK;
> -
> -	return cpu_buffer->commit_page->page == (void *)addr &&
> -		rb_commit_index(cpu_buffer) == index;
> -}
> -
> -static __always_inline void
> -rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer,
> -		      struct ring_buffer_event *event)
> -{
> -	u64 delta;
> -
> -	/*
> -	 * The event first in the commit queue updates the
> -	 * time stamp.
> -	 */
> -	if (rb_event_is_commit(cpu_buffer, event)) {
> -		/*
> -		 * A commit event that is first on a page
> -		 * updates the write timestamp with the page stamp
> -		 */
> -		if (!rb_event_index(event))
> -			cpu_buffer->write_stamp =
> -				cpu_buffer->commit_page->page->time_stamp;
> -		else if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) {
> -			delta = ring_buffer_event_time_stamp(event);
> -			cpu_buffer->write_stamp += delta;
> -		} else if (event->type_len == RINGBUF_TYPE_TIME_STAMP) {
> -			delta = ring_buffer_event_time_stamp(event);
> -			cpu_buffer->write_stamp = delta;
> -		} else
> -			cpu_buffer->write_stamp += event->time_delta;
> -	}
> -}
> -
>  static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
>  		      struct ring_buffer_event *event)
>  {
>  	local_inc(&cpu_buffer->entries);
> -	rb_update_write_stamp(cpu_buffer, event);
>  	rb_end_commit(cpu_buffer);
>  }
>  
> @@ -2872,13 +2892,13 @@ rb_handle_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
>  		  KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n%s",
>  		  (unsigned long long)info->delta,
>  		  (unsigned long long)info->ts,
> -		  (unsigned long long)cpu_buffer->write_stamp,
> +		  (unsigned long long)local64_read(&cpu_buffer->write_stamp),
>  		  sched_clock_stable() ? "" :
>  		  "If you just came from a suspend/resume,\n"
>  		  "please switch to the trace global clock:\n"
>  		  "  echo global > /sys/kernel/debug/tracing/trace_clock\n"
>  		  "or add trace_clock=global to the kernel command line\n");
> -	info->add_timestamp = 1;
> +	info->add_timestamp = RB_ADD_STAMP_NORMAL;
>  }
>  
>  static struct ring_buffer_event *
> @@ -2887,8 +2907,36 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
>  {
>  	struct ring_buffer_event *event;
>  	struct buffer_page *tail_page;
> -	unsigned long tail, write;
> +	unsigned long tail, write, w, next;
> +	u64 delta, before, after;
> +	bool abs = false;
> +
> +	/* Don't let the compiler play games with cpu_buffer->tail_page */
> +	tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page);
> +
> + /*A*/	w = local_read(&tail_page->write) & RB_WRITE_MASK;
> +	barrier();
> +	before = local64_read(&cpu_buffer->before_stamp);
> +	after = local64_read(&cpu_buffer->write_stamp);
> +	barrier();
> +	info->ts = rb_time_stamp(cpu_buffer->buffer);
> +
> +	if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) {
> +		info->delta = info->ts;
> +		abs = true;
> +	} else {
> +		info->delta = info->ts - after;
> +	}
> +
> +	if (unlikely(test_time_stamp(info->delta)))
> +		rb_handle_timestamp(cpu_buffer, info);
>  
> +	/*
> +	 * If interrupting an event time update, we may need an absolute timestamp.
> +	 * Don't bother if this is the start of a new page (w == 0).
> +	 */
> +	if (unlikely(before != after && w))
> +		info->add_timestamp = RB_ADD_STAMP_FORCE;
>  	/*
>  	 * If the time delta since the last event is too big to
>  	 * hold in the time field of the event, then we append a
> @@ -2897,25 +2945,91 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
>  	if (unlikely(info->add_timestamp))
>  		info->length += RB_LEN_TIME_EXTEND;
>  
> -	/* Don't let the compiler play games with cpu_buffer->tail_page */
> -	tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page);
> -	write = local_add_return(info->length, &tail_page->write);
> +	next = READ_ONCE(cpu_buffer->next_write);
> +	WRITE_ONCE(cpu_buffer->next_write, w + info->length);

So, this next may have no effect.

Thank you,

> +
> + /*B*/	local64_set(&cpu_buffer->before_stamp, info->ts);
> +
> + /*C*/	write = local_add_return(info->length, &tail_page->write);
>  
>  	/* set write to only the index of the write */
>  	write &= RB_WRITE_MASK;
> +
>  	tail = write - info->length;
>  
> +	/* See if we shot pass the end of this buffer page */
> +	if (unlikely(write > BUF_PAGE_SIZE)) {
> +		if (tail != w) {
> +			/* before and after may now different, fix it up*/
> +			before = local64_read(&cpu_buffer->before_stamp);
> +			after = local64_read(&cpu_buffer->write_stamp);
> +			if (before != after)
> +				(void)local64_cmpxchg(&cpu_buffer->before_stamp, before, after);
> +		}
> +		return rb_move_tail(cpu_buffer, tail, info);
> +	}
> +
> +	if (likely(tail == w)) {
> +		u64 save_before;
> +
> +		/* Nothing interrupted us between A and C */
> + /*D*/		local64_set(&cpu_buffer->write_stamp, info->ts);
> +		barrier();
> + /*E*/		save_before = local64_read(&cpu_buffer->before_stamp);
> +		if (likely(info->add_timestamp != RB_ADD_STAMP_FORCE))
> +			/* This did not interrupt any time update */
> +			info->delta = info->ts - after;
> +		else
> +			/* Just use full timestamp for inerrupting event */
> +			info->delta = info->ts;
> +		barrier();
> +		if (unlikely(info->ts != save_before)) {
> +			/* SLOW PATH - Interrupted between C and E */
> +
> +			after = local64_read(&cpu_buffer->write_stamp);
> +			/* Write stamp must only go forward */
> +			if (save_before > after) {
> +				/*
> +				 * We do not care about the result, only that
> +				 * it gets updated atomically.
> +				 */
> +				(void)local64_cmpxchg(&cpu_buffer->write_stamp, after, save_before);
> +			}
> +		}
> +	} else {
> +		u64 ts;
> +		/* SLOW PATH - Interrupted between A and C */
> +		after = local64_read(&cpu_buffer->write_stamp);
> +		ts = rb_time_stamp(cpu_buffer->buffer);
> +		barrier();
> + /*E*/		if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) &&
> +		    after < ts) {
> +			/* Nothing came after this event between C and E */
> +			info->delta = ts - after;
> +			(void)local64_cmpxchg(&cpu_buffer->write_stamp, after, info->ts);
> +			info->ts = ts;
> +		} else {
> +			/*
> +			 * Interrupted beween C and E:
> +			 * Lost the previous events time stamp. Just set the
> +			 * delta to zero, and this will be the same time as
> +			 * the event this event interrupted. And the events that
> +			 * came after this will still be correct (as they would
> +			 * have built their delta on the previous event.
> +			 */
> +			info->delta = 0;
> +		}
> +		if (info->add_timestamp == RB_ADD_STAMP_FORCE)
> +			info->add_timestamp = RB_ADD_STAMP_NORMAL;
> +	}
> +
>  	/*
>  	 * If this is the first commit on the page, then it has the same
>  	 * timestamp as the page itself.
>  	 */
> -	if (!tail && !ring_buffer_time_stamp_abs(cpu_buffer->buffer))
> +	if (unlikely(!tail && info->add_timestamp != RB_ADD_STAMP_FORCE && !abs))
>  		info->delta = 0;
>  
> -	/* See if we shot pass the end of this buffer page */
> -	if (unlikely(write > BUF_PAGE_SIZE))
> -		return rb_move_tail(cpu_buffer, tail, info);
> -
>  	/* We reserved something on the buffer */
>  
>  	event = __rb_page_index(tail_page, tail);
> @@ -2944,9 +3058,9 @@ rb_reserve_next_event(struct trace_buffer *buffer,
>  	struct ring_buffer_event *event;
>  	struct rb_event_info info;
>  	int nr_loops = 0;
> -	u64 diff;
>  
>  	rb_start_commit(cpu_buffer);
> +	/* The commit page can not change after this */
>  
>  #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
>  	/*
> @@ -2965,7 +3079,7 @@ rb_reserve_next_event(struct trace_buffer *buffer,
>  
>  	info.length = rb_calculate_event_length(length);
>   again:
> -	info.add_timestamp = 0;
> +	info.add_timestamp = RB_ADD_STAMP_NONE;
>  	info.delta = 0;
>  
>  	/*
> @@ -2980,22 +3094,6 @@ rb_reserve_next_event(struct trace_buffer *buffer,
>  	if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
>  		goto out_fail;
>  
> -	info.ts = rb_time_stamp(cpu_buffer->buffer);
> -	diff = info.ts - cpu_buffer->write_stamp;
> -
> -	/* make sure this diff is calculated here */
> -	barrier();
> -
> -	if (ring_buffer_time_stamp_abs(buffer)) {
> -		info.delta = info.ts;
> -		rb_handle_timestamp(cpu_buffer, &info);
> -	} else /* Did the write stamp get updated already? */
> -		if (likely(info.ts >= cpu_buffer->write_stamp)) {
> -		info.delta = diff;
> -		if (unlikely(test_time_stamp(info.delta)))
> -			rb_handle_timestamp(cpu_buffer, &info);
> -	}
> -
>  	event = __rb_reserve_next(cpu_buffer, &info);
>  
>  	if (unlikely(PTR_ERR(event) == -EAGAIN)) {
> @@ -3004,11 +3102,8 @@ rb_reserve_next_event(struct trace_buffer *buffer,
>  		goto again;
>  	}
>  
> -	if (!event)
> -		goto out_fail;
> -
> -	return event;
> -
> +	if (likely(event))
> +		return event;
>   out_fail:
>  	rb_end_commit(cpu_buffer);
>  	return NULL;
> @@ -3154,11 +3249,6 @@ void ring_buffer_discard_commit(struct trace_buffer *buffer,
>  	if (rb_try_to_discard(cpu_buffer, event))
>  		goto out;
>  
> -	/*
> -	 * The commit is still visible by the reader, so we
> -	 * must still update the timestamp.
> -	 */
> -	rb_update_write_stamp(cpu_buffer, event);
>   out:
>  	rb_end_commit(cpu_buffer);
>  
> @@ -4475,8 +4565,9 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
>  	cpu_buffer->read = 0;
>  	cpu_buffer->read_bytes = 0;
>  
> -	cpu_buffer->write_stamp = 0;
> -	cpu_buffer->read_stamp = 0;
> +	local64_set(&cpu_buffer->write_stamp, 0);
> +	local64_set(&cpu_buffer->before_stamp, 0);
> +	cpu_buffer->next_write = 0;
>  
>  	cpu_buffer->lost_events = 0;
>  	cpu_buffer->last_overrun = 0;
> -- 
> 2.26.2
> 
> 


-- 
Masami Hiramatsu <mhiramat@kernel.org>

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [PATCH 1/3] ring-buffer: Have nested events still record running time stamp
  2020-06-28 16:23   ` Masami Hiramatsu
@ 2020-06-28 16:43     ` Steven Rostedt
  0 siblings, 0 replies; 7+ messages in thread
From: Steven Rostedt @ 2020-06-28 16:43 UTC (permalink / raw)
  To: Masami Hiramatsu
  Cc: linux-kernel, Ingo Molnar, Mathieu Desnoyers, Peter Zijlstra,
	Thomas Gleixner, Arnaldo Carvalho de Melo, Jiri Olsa,
	Namhyung Kim, Yordan Karadzhov, Tzvetomir Stoyanov, Tom Zanussi,
	Jason Behmer, Julia Lawall, Clark Williams, bristot,
	Daniel Wagner, Darren Hart, Jonathan Corbet, Suresh E. Warrier

On Mon, 29 Jun 2020 01:23:23 +0900
Masami Hiramatsu <mhiramat@kernel.org> wrote:
> > 
> > Here's the design of this solution:
> > 
> >  All this is per cpu, and only needs to worry about nested events (not
> >  parallel events).
> >  
> 
> This looks basically good to me, but I have some comments below.
> (just a clean up)

Thanks Masami!

>  
> > The players:
> > 
> >  write_tail: The index in the buffer where new events can be written to.
> >      It is incremented via local_add() to reserve space for a new event.
> > 
> >  before_stamp: A time stamp set by all events before reserving space.
> > 
> >  write_stamp: A time stamp updated by events after it has successfully
> >      reserved space.  
> 
> So these stamps works like a seq-lock to detect interruption (from both
> interrupted process and interrpting process)

Yes.

> 
> > 
> >  next_write: A copy of "write_tail" used to help with races.  
> 
> It seems this player does nothing.

Bah, you're right. With removing the one path that Mathieu suggested,
took this player out of the game. Will remove in v2. Thanks for
pointing this out.

> 
> > 
> > 	/* Save the current position of write */
> >  [A]	w = local_read(write_tail);
> > 	barrier();
> > 	/* Read both before and write stamps before touching anything */
> > 	before = READ_ONCE(before_stamp);
> > 	after = local_read(write_stamp);  
> 
> Are there any reason to use READ_ONCE() and local_read()?
> (In the code, both are local64_read())

Thanks for pointing this out. before_stamp was originally just a normal
variable, but in discussions with Mathieu, it became apparent that it
needed to be atomic as well.

I'll update the change log in v2.


> 
> > 	barrier();
> > 
> > 	/*
> > 	 * If before and after are the same, then this event is not
> > 	 * interrupting a time update. If it is, then reserve space for adding
> > 	 * a full time stamp (this can turn into a time extend which is
> > 	 * just an extended time delta but fill up the extra space).
> > 	 */
> > 	if (after != before)
> > 		abs = true;
> > 
> > 	ts = clock();
> > 
> > 	/* Now update the before_stamp (everyone does this!) */
> >  [B]	WRITE_ONCE(before_stamp, ts);
> > 
> > 	/* Read the current next_write and update it to what we want write
> > 	 * to be after we reserve space. */
> >  	next = READ_ONCE(next_write);
> > 	WRITE_ONCE(next_write, w + len);  
> 
> This seems meaningless, because neither "next" nor "next_write"
> are not refered.

and they are now meaningless, but wasn't in the RFC. I'll remove it.

> 
> > 
> > 	/* Now reserve space on the buffer */
> >  [C]	write = local_add_return(len, write_tail);
> > 
> > 	/* Set tail to be were this event's data is */
> > 	tail = write - len;

[...]

> >  
> > -	/* Don't let the compiler play games with cpu_buffer->tail_page */
> > -	tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page);
> > -	write = local_add_return(info->length, &tail_page->write);
> > +	next = READ_ONCE(cpu_buffer->next_write);
> > +	WRITE_ONCE(cpu_buffer->next_write, w + info->length);  
> 
> So, this next may have no effect.

And I'll remove them.

Thanks for reviewing!

-- Steve

^ permalink raw reply	[flat|nested] 7+ messages in thread

end of thread, other threads:[~2020-06-28 16:43 UTC | newest]

Thread overview: 7+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-06-27  1:00 [PATCH 0/3] ring-buffer: Restructure ftrace ring buffer time keeping to allow accurate nested timing Steven Rostedt
2020-06-27  1:00 ` [PATCH 1/3] ring-buffer: Have nested events still record running time stamp Steven Rostedt
2020-06-28 16:23   ` Masami Hiramatsu
2020-06-28 16:43     ` Steven Rostedt
2020-06-27  1:00 ` [PATCH 2/3] ring-buffer: Incorporate absolute timestamp into add_timestamp logic Steven Rostedt
2020-06-27  1:00 ` [PATCH 3/3] ring-buffer: Add rb_time_t 64 bit operations for speeding up 32 bit Steven Rostedt
2020-06-27  6:42   ` kernel test robot

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).