linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH v2 0/3] printk: replace ringbuffer
@ 2020-05-01  9:40 John Ogness
  2020-05-01  9:40 ` [PATCH v2 1/3] crash: add VMCOREINFO macro for anonymous structs John Ogness
                   ` (4 more replies)
  0 siblings, 5 replies; 30+ messages in thread
From: John Ogness @ 2020-05-01  9:40 UTC (permalink / raw)
  To: Petr Mladek
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel

Hello,

Here is a v2 for the first series to rework the printk subsystem. The
v1 and history are here [0]. This first series only replaces the
existing ringbuffer implementation. No locking is removed. No
semantics/behavior of printk are changed.

The VMCOREINFO is updated. RFC patches for the external tools
crash(8) [1] and makedumpfile(8) [2] have been submitted that allow
the new ringbuffer to be correctly read.

This series is in line with the agreements [3] made at the meeting
during LPC2019 in Lisbon, with 1 exception: support for dictionaries
will not be discontinued [4]. Dictionaries are stored in a separate
buffer so that they cannot interfere with the human-readable buffer.

The list of changes since v1:

printk_ringbuffer
=================
- documentation formatting/wording cleanup
- main memory barrier comments moved from memory accesses to
  memory barriers
- remove all symbol exporting of printk_ringbuffer functions
- add prb_count_lines() for line counting so printk.c can also use
  this functionality
- add prb_read_valid_info() to just read record meta-data
- remove @line_count out of printk_record (there is now an interface
  for line counting)
- to_block(): remove intermediate (char *) typecast
- data_alloc(): use WRITE_ONCE for writing block ID
- data_push_tail(): fix test condition for tail LPOS already pushed
- data_push_tail(): continue loop rather than try cmpxchg() if
  tail LPOS was pushed
- data_push_tail(): add memory barrier before pushing tail LPOS so
  readers can recognize if data has expired while reading the
  descriptor
- data_make_reusable(): added memory barrier and logic to protect
  against invalid read of block ID
- data_alloc(): add memory barrier to guarantee the tail ID is stored
  before updating the block ID
- desc_reserve(): add memory barrier before checking ID to make sure
  the head ID is read before the tail ID
- desc_reserve(): move memory barrier to before the head ID is pushed
  to guarantee the tail ID is stored before the head ID

printk.c
========
- remove static syslog_record/console_record (use dynamic allocation
  and in-place buffer manipulation instead)
- reduce expected average message size from 64 to 32
- add printk_record initializers to simplify reader and writer code
- fix wrong return value when log_store() truncates
- add full VMCOREINFO needed by makedumpfile(8)
- change record_print_text() to add prefix data to the buffer
  in-place instead of requiring a 2nd buffer
- add helper to calculate text size when prefix added:
  get_record_text_size() (used together with prb_count_lines() or
  prb_read_valid_info() to get full text size, rather than abusing
  record_print_text() for this purpose)
- syslog_print_all(): fix missing break if copy_to_user() failed
- moved "messages dropped" printing to call_console_drivers()

John Ogness

[0] https://lkml.kernel.org/r/20200128161948.8524-1-john.ogness@linutronix.de
[1] https://www.redhat.com/archives/crash-utility/2020-April/msg00080.html
[2] https://lists.infradead.org/pipermail/kexec/2020-April/024906.html
[3] https://lkml.kernel.org/r/87k1acz5rx.fsf@linutronix.de
[4] https://lkml.kernel.org/r/20191007120134.ciywr3wale4gxa6v@pathway.suse.cz

John Ogness (3):
  crash: add VMCOREINFO macro for anonymous structs
  printk: add lockless buffer
  printk: use the lockless ringbuffer

 include/linux/crash_core.h        |    3 +
 include/linux/kmsg_dump.h         |    2 -
 kernel/printk/Makefile            |    1 +
 kernel/printk/printk.c            |  938 +++++++++--------
 kernel/printk/printk_ringbuffer.c | 1626 +++++++++++++++++++++++++++++
 kernel/printk/printk_ringbuffer.h |  369 +++++++
 6 files changed, 2491 insertions(+), 448 deletions(-)
 create mode 100644 kernel/printk/printk_ringbuffer.c
 create mode 100644 kernel/printk/printk_ringbuffer.h

-- 
2.20.1


^ permalink raw reply	[flat|nested] 30+ messages in thread

* [PATCH v2 1/3] crash: add VMCOREINFO macro for anonymous structs
  2020-05-01  9:40 [PATCH v2 0/3] printk: replace ringbuffer John Ogness
@ 2020-05-01  9:40 ` John Ogness
  2020-06-03 10:16   ` Petr Mladek
  2020-05-01  9:40 ` [PATCH v2 2/3] printk: add lockless buffer John Ogness
                   ` (3 subsequent siblings)
  4 siblings, 1 reply; 30+ messages in thread
From: John Ogness @ 2020-05-01  9:40 UTC (permalink / raw)
  To: Petr Mladek
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel

Some structs are not named and are only available via their typedef.
Add a VMCOREINFO macro to export field offsets for such structs.

Signed-off-by: John Ogness <john.ogness@linutronix.de>
---
 include/linux/crash_core.h | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/include/linux/crash_core.h b/include/linux/crash_core.h
index 525510a9f965..43b51c9df571 100644
--- a/include/linux/crash_core.h
+++ b/include/linux/crash_core.h
@@ -53,6 +53,9 @@ phys_addr_t paddr_vmcoreinfo_note(void);
 #define VMCOREINFO_OFFSET(name, field) \
 	vmcoreinfo_append_str("OFFSET(%s.%s)=%lu\n", #name, #field, \
 			      (unsigned long)offsetof(struct name, field))
+#define VMCOREINFO_TYPE_OFFSET(name, field) \
+	vmcoreinfo_append_str("OFFSET(%s.%s)=%lu\n", #name, #field, \
+			      (unsigned long)offsetof(name, field))
 #define VMCOREINFO_LENGTH(name, value) \
 	vmcoreinfo_append_str("LENGTH(%s)=%lu\n", #name, (unsigned long)value)
 #define VMCOREINFO_NUMBER(name) \
-- 
2.20.1


^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH v2 2/3] printk: add lockless buffer
  2020-05-01  9:40 [PATCH v2 0/3] printk: replace ringbuffer John Ogness
  2020-05-01  9:40 ` [PATCH v2 1/3] crash: add VMCOREINFO macro for anonymous structs John Ogness
@ 2020-05-01  9:40 ` John Ogness
       [not found]   ` <87v9ktcs3q.fsf@vostro.fn.ogness.net>
                     ` (5 more replies)
  2020-05-01  9:40 ` [PATCH v2 3/3] printk: use the lockless ringbuffer John Ogness
                   ` (2 subsequent siblings)
  4 siblings, 6 replies; 30+ messages in thread
From: John Ogness @ 2020-05-01  9:40 UTC (permalink / raw)
  To: Petr Mladek
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel

Introduce a multi-reader multi-writer lockless ringbuffer for storing
the kernel log messages. Readers and writers may use their API from
any context (including scheduler and NMI). This ringbuffer will make
it possible to decouple printk() callers from any context, locking,
or console constraints. It also makes it possible for readers to have
full access to the ringbuffer contents at any time and context (for
example from any panic situation).

The printk_ringbuffer is made up of 3 internal ringbuffers:

desc_ring:
A ring of descriptors. A descriptor contains all record meta data
(sequence number, timestamp, loglevel, etc.) as well as internal state
information about the record and logical positions specifying where in
the other ringbuffers the text and dictionary strings are located.

text_data_ring:
A ring of data blocks. A data block consists of an unsigned long
integer (ID) that maps to a desc_ring index followed by the text
string of the record.

dict_data_ring:
A ring of data blocks. A data block consists of an unsigned long
integer (ID) that maps to a desc_ring index followed by the dictionary
string of the record.

The internal state information of a descriptor is the key element to
allow readers and writers to locklessly synchronize access to the data.

Co-developed-by: Petr Mladek <pmladek@suse.com>
Signed-off-by: John Ogness <john.ogness@linutronix.de>
---
 kernel/printk/Makefile            |    1 +
 kernel/printk/printk_ringbuffer.c | 1626 +++++++++++++++++++++++++++++
 kernel/printk/printk_ringbuffer.h |  369 +++++++
 3 files changed, 1996 insertions(+)
 create mode 100644 kernel/printk/printk_ringbuffer.c
 create mode 100644 kernel/printk/printk_ringbuffer.h

diff --git a/kernel/printk/Makefile b/kernel/printk/Makefile
index 4d052fc6bcde..eee3dc9b60a9 100644
--- a/kernel/printk/Makefile
+++ b/kernel/printk/Makefile
@@ -2,3 +2,4 @@
 obj-y	= printk.o
 obj-$(CONFIG_PRINTK)	+= printk_safe.o
 obj-$(CONFIG_A11Y_BRAILLE_CONSOLE)	+= braille.o
+obj-$(CONFIG_PRINTK)	+= printk_ringbuffer.o
diff --git a/kernel/printk/printk_ringbuffer.c b/kernel/printk/printk_ringbuffer.c
new file mode 100644
index 000000000000..e0a66468d4f3
--- /dev/null
+++ b/kernel/printk/printk_ringbuffer.c
@@ -0,0 +1,1626 @@
+// SPDX-License-Identifier: GPL-2.0
+
+#include <linux/kernel.h>
+#include <linux/irqflags.h>
+#include <linux/string.h>
+#include <linux/errno.h>
+#include <linux/bug.h>
+#include "printk_ringbuffer.h"
+
+/**
+ * DOC: printk_ringbuffer overview
+ *
+ * Data Structure
+ * --------------
+ * The printk_ringbuffer is made up of 3 internal ringbuffers:
+ *
+ *   desc_ring
+ *     A ring of descriptors. A descriptor contains all record meta data
+ *     (sequence number, timestamp, loglevel, etc.) as well as internal state
+ *     information about the record and logical positions specifying where in
+ *     the other ringbuffers the text and dictionary strings are located.
+ *
+ *   text_data_ring
+ *     A ring of data blocks. A data block consists of an unsigned long
+ *     integer (ID) that maps to a desc_ring index followed by the text
+ *     string of the record.
+ *
+ *   dict_data_ring
+ *     A ring of data blocks. A data block consists of an unsigned long
+ *     integer (ID) that maps to a desc_ring index followed by the dictionary
+ *     string of the record.
+ *
+ * The internal state information of a descriptor is the key element to allow
+ * readers and writers to locklessly synchronize access to the data.
+ *
+ * Implementation
+ * --------------
+ *
+ * ABA Issues
+ * ~~~~~~~~~~
+ * To help avoid ABA issues, descriptors are referenced by IDs (array index
+ * values with tagged states) and data blocks are referenced by logical
+ * positions (array index values with tagged states). However, on 32-bit
+ * systems the number of tagged states is relatively small such that an ABA
+ * incident is (at least theoretically) possible. For example, if 4 million
+ * maximally sized (1KiB) printk messages were to occur in NMI context on a
+ * 32-bit system, the interrupted task would not be able to recognize that
+ * the 32-bit integer completely wrapped and thus represents a different
+ * data block than the one the interrupted task expects.
+ *
+ * To help combat this possibility, additional state checking is performed
+ * (such as using cmpxchg() even though set() would suffice). These extra
+ * checks are commented as such and will hopefully catch any ABA issue that
+ * a 32-bit system might experience.
+ *
+ * Memory Barriers
+ * ~~~~~~~~~~~~~~~
+ * Multiple memory barriers are used. To simplify proving correctness and
+ * generating litmus tests, lines of code related to memory barriers
+ * (loads, stores, and the associated memory barriers) are labeled::
+ *
+ *	LMM(function:letter)
+ *
+ * Comments reference the labels using only the "function:letter" part.
+ *
+ * Descriptor Ring
+ * ~~~~~~~~~~~~~~~
+ * The descriptor ring is an array of descriptors. A descriptor contains all
+ * the meta data of a printk record as well as blk_lpos structs pointing to
+ * associated text and dictionary data blocks (see "Data Rings" below). Each
+ * descriptor is assigned an ID that maps directly to index values of the
+ * descriptor array and has a state. The ID and the state are bitwise combined
+ * into a single descriptor field named @state_var, allowing ID and state to
+ * be synchronously and atomically updated.
+ *
+ * Descriptors have three states:
+ *
+ *   reserved
+ *     A writer is modifying the record.
+ *
+ *   committed
+ *     The record and all its data are complete and available for reading.
+ *
+ *   reusable
+ *     The record exists, but its text and/or dictionary data may no longer
+ *     be available.
+ *
+ * Querying the @state_var of a record requires providing the ID of the
+ * descriptor to query. This can yield a possible fourth (pseudo) state:
+ *
+ *   miss
+ *     The descriptor being queried has an unexpected ID.
+ *
+ * The descriptor ring has a @tail_id that contains the ID of the oldest
+ * descriptor and @head_id that contains the ID of the newest descriptor.
+ *
+ * When a new descriptor should be created (and the ring is full), the tail
+ * descriptor is invalidated by first transitioning to the reusable state and
+ * then invalidating all tail data blocks up to and including the data blocks
+ * associated with the tail descriptor (for text and dictionary rings). Then
+ * @tail_id is advanced, followed by advancing @head_id. And finally the
+ * @state_var of the new descriptor is initialized to the new ID and reserved
+ * state.
+ *
+ * The @tail_id can only be advanced if the the new @tail_id would be in the
+ * committed or reusable queried state. This makes it possible that a valid
+ * sequence number of the tail is always available.
+ *
+ * Data Rings
+ * ~~~~~~~~~~
+ * The two data rings (text and dictionary) function identically. They exist
+ * separately so that their buffer sizes can be individually set and they do
+ * not affect one another.
+ *
+ * Data rings are byte arrays composed of data blocks. Data blocks are
+ * referenced by blk_lpos structs that point to the logical position of the
+ * beginning of a data block and the beginning of the next adjacent data
+ * block. Logical positions are mapped directly to index values of the byte
+ * array ringbuffer.
+ *
+ * Each data block consists of an ID followed by the raw data. The ID is the
+ * identifier of a descriptor that is associated with the data block. A data
+ * block is considered valid if all of the following conditions are met:
+ *
+ *   1) The descriptor associated with the data block is in the committed
+ *      or reusable queried state.
+ *
+ *   2) The blk_lpos struct within the descriptor associated with the data
+ *      block references back to the same data block.
+ *
+ *   3) The data block is within the head/tail logical position range.
+ *
+ * If the raw data of a data block would extend beyond the end of the byte
+ * array, only the ID of the data block is stored at the logical position
+ * and the full data block (ID and raw data) is stored at the beginning of
+ * the byte array. The referencing blk_lpos will point to the ID before the
+ * wrap and the next data block will be at the logical position adjacent the
+ * full data block after the wrap.
+ *
+ * Data rings have a @tail_lpos that points to the beginning of the oldest
+ * data block and a @head_lpos that points to the logical position of the
+ * next (not yet existing) data block.
+ *
+ * When a new data block should be created (and the ring is full), tail data
+ * blocks will first be invalidated by putting their associated descriptors
+ * into the reusable state and then pushing the @tail_lpos forward beyond
+ * them. Then the @head_lpos is pushed forward and is associated with a new
+ * descriptor. If a data block is not valid, the @tail_lpos cannot be
+ * advanced beyond it.
+ *
+ * Usage
+ * -----
+ * Here are some simple examples demonstrating writers and readers. For the
+ * examples a global ringbuffer (test_rb) is available (which is not the
+ * actual ringbuffer used by printk)::
+ *
+ *	DECLARE_PRINTKRB(test_rb, 15, 5, 3);
+ *
+ * This ringbuffer allows up to 32768 records (2 ^ 15) and has a size of
+ * 1 MiB (2 ^ (15 + 5)) for text data and 256 KiB (2 ^ (15 + 3)) for
+ * dictionary data.
+ *
+ * Sample writer code::
+ *
+ *	const char *dictstr = "dictionary text";
+ *	const char *textstr = "message text";
+ *	struct prb_reserved_entry e;
+ *	struct printk_record r;
+ *
+ *	// specify how much to allocate
+ *	prb_rec_init_wr(&r, strlen(textstr) + 1, strlen(dictstr) + 1);
+ *
+ *	if (prb_reserve(&e, &test_rb, &r)) {
+ *		snprintf(r.text_buf, r.text_buf_size, "%s", textstr);
+ *
+ *		// dictionary allocation may have failed
+ *		if (r.dict_buf)
+ *			snprintf(r.dict_buf, r.dict_buf_size, "%s", dictstr);
+ *
+ *		r.info->ts_nsec = local_clock();
+ *
+ *		prb_commit(&e);
+ *	}
+ *
+ * Sample reader code::
+ *
+ *	struct printk_info info;
+ *	struct printk_record r;
+ *	char text_buf[32];
+ *	char dict_buf[32];
+ *	u64 seq;
+ *
+ *	prb_rec_init_rd(&r, &info, &text_buf[0], sizeof(text_buf),
+ *			&dict_buf[0], sizeof(dict_buf));
+ *
+ *	prb_for_each_record(0, &test_rb, &seq, &r) {
+ *		if (info.seq != seq)
+ *			pr_warn("lost %llu records\n", info.seq - seq);
+ *
+ *		if (info.text_len > r.text_buf_size) {
+ *			pr_warn("record %llu text truncated\n", info.seq);
+ *			text_buf[sizeof(text_buf) - 1] = 0;
+ *		}
+ *
+ *		if (info.dict_len > r.dict_buf_size) {
+ *			pr_warn("record %llu dict truncated\n", info.seq);
+ *			dict_buf[sizeof(dict_buf) - 1] = 0;
+ *		}
+ *
+ *		pr_info("%llu: %llu: %s;%s\n", info.seq, info.ts_nsec,
+ *			&text_buf[0], info.dict_len ? &dict_buf[0] : "");
+ *	}
+ */
+
+#define DATA_SIZE(data_ring)		_DATA_SIZE((data_ring)->size_bits)
+#define DATA_SIZE_MASK(data_ring)	(DATA_SIZE(data_ring) - 1)
+
+#define DESCS_COUNT(desc_ring)		_DESCS_COUNT((desc_ring)->count_bits)
+#define DESCS_COUNT_MASK(desc_ring)	(DESCS_COUNT(desc_ring) - 1)
+
+/* Determine the data array index from a logical position. */
+#define DATA_INDEX(data_ring, lpos)	((lpos) & DATA_SIZE_MASK(data_ring))
+
+/* Determine the desc array index from an ID or sequence number. */
+#define DESC_INDEX(desc_ring, n)	((n) & DESCS_COUNT_MASK(desc_ring))
+
+/* Determine how many times the data array has wrapped. */
+#define DATA_WRAPS(data_ring, lpos)	((lpos) >> (data_ring)->size_bits)
+
+/* Get the logical position at index 0 of the current wrap. */
+#define DATA_THIS_WRAP_START_LPOS(data_ring, lpos) \
+	((lpos) & ~DATA_SIZE_MASK(data_ring))
+
+/* Get the ID for the same index of the previous wrap as the given ID. */
+#define DESC_ID_PREV_WRAP(desc_ring, id) \
+	DESC_ID((id) - DESCS_COUNT(desc_ring))
+
+/* A data block: maps to the raw data within the data ring. */
+struct prb_data_block {
+	unsigned long	id;
+	char		data[0];
+};
+
+static struct prb_desc *to_desc(struct prb_desc_ring *desc_ring, u64 n)
+{
+	return &desc_ring->descs[DESC_INDEX(desc_ring, n)];
+}
+
+static struct prb_data_block *to_block(struct prb_data_ring *data_ring,
+				       unsigned long begin_lpos)
+{
+	return (void *)&data_ring->data[DATA_INDEX(data_ring, begin_lpos)];
+}
+
+/*
+ * Increase the data size to account for data block meta data plus any
+ * padding so that the adjacent data block is aligned on the ID size.
+ */
+static unsigned long to_blk_size(unsigned long size)
+{
+	struct prb_data_block *db = NULL;
+
+	size += sizeof(*db);
+	size = ALIGN(size, sizeof(db->id));
+	return size;
+}
+
+/*
+ * Sanity checker for reserve size. The ringbuffer code assumes that a data
+ * block does not exceed the maximum possible size that could fit within the
+ * ringbuffer. This function provides that basic size check so that the
+ * assumption is safe.
+ *
+ * Writers are also not allowed to write 0-sized (data-less) records. Such
+ * records are used only internally by the ringbuffer.
+ */
+static bool data_check_size(struct prb_data_ring *data_ring, unsigned int size)
+{
+	struct prb_data_block *db = NULL;
+
+	/*
+	 * Writers are not allowed to write data-less records. Such records
+	 * are used only internally by the ringbuffer to denote records where
+	 * their data failed to allocate or have been lost.
+	 */
+	if (size == 0)
+		return false;
+
+	/*
+	 * Ensure the alignment padded size could possibly fit in the data
+	 * array. The largest possible data block must still leave room for
+	 * at least the ID of the next block.
+	 */
+	size = to_blk_size(size);
+	if (size > DATA_SIZE(data_ring) - sizeof(db->id))
+		return false;
+
+	return true;
+}
+
+/* The possible responses of a descriptor state-query. */
+enum desc_state {
+	desc_miss,	/* ID mismatch */
+	desc_reserved,	/* reserved, in use by writer */
+	desc_committed, /* committed, writer is done */
+	desc_reusable,	/* free, not yet used by any writer */
+};
+
+/* Query the state of a descriptor. */
+static enum desc_state get_desc_state(unsigned long id,
+				      unsigned long state_val)
+{
+	if (id != DESC_ID(state_val))
+		return desc_miss;
+
+	if (state_val & DESC_REUSE_MASK)
+		return desc_reusable;
+
+	if (state_val & DESC_COMMITTED_MASK)
+		return desc_committed;
+
+	return desc_reserved;
+}
+
+/*
+ * Get a copy of a specified descriptor and its queried state. A descriptor
+ * that is not in the committed or reusable state must be considered garbage
+ * by the reader.
+ */
+static enum desc_state desc_read(struct prb_desc_ring *desc_ring,
+				 unsigned long id, struct prb_desc *desc_out)
+{
+	struct prb_desc *desc = to_desc(desc_ring, id);
+	atomic_long_t *state_var = &desc->state_var;
+	enum desc_state d_state;
+	unsigned long state_val;
+
+	/* Check the descriptor state. */
+	state_val = atomic_long_read(state_var); /* LMM(desc_read:A) */
+	d_state = get_desc_state(id, state_val);
+	if (d_state != desc_committed && d_state != desc_reusable)
+		return d_state;
+
+	/*
+	 * Guarantee the state is loaded before copying the descriptor. This
+	 * avoids copying obsolete descriptor data that might not apply to
+	 * the descriptor state. This pairs with prb_commit:B.
+	 *
+	 * Memory barrier involvement:
+	 *
+	 * If desc_read:A reads from prb_commit:C, then desc_read:C reads
+	 * from prb_commit:A.
+	 *
+	 * Relies on:
+	 *
+	 * WMB from prb_commit:A to prb_commit:C
+	 *    matching
+	 * RMB from desc_read:A to desc_read:C
+	 */
+	smp_rmb(); /* LMM(desc_read:B) */
+
+	/* Copy the descriptor data. */
+	*desc_out = READ_ONCE(*desc); /* LMM(desc_read:C) */
+
+	/*
+	 * Guarantee the descriptor and/or record data is loaded before
+	 * re-checking the state. This avoids reading an obsolete descriptor
+	 * state that may not apply to the copied data. For copying descriptor
+	 * data, this pairs with desc_reserve:H when a descriptor transitions
+	 * to reserved. For copying record data, this pairs with
+	 * data_push_tail:C when a descriptor transitions to reusable.
+	 *
+	 * Memory barrier involvement:
+	 *
+	 * 1. If desc_read:C reads from desc_reserve:I, then desc_read:E reads
+	 *    from desc_reserve:G.
+	 *
+	 *    Relies on:
+	 *
+	 *    WMB from desc_reserve:G to desc_reserve:I
+	 *       matching
+	 *    RMB from desc_read:C to desc_read:E
+	 *
+	 * 2. If desc_read:C reads from data_push_tail:D, then desc_read:E
+	 *    reads from desc_make_reusable:A.
+	 *
+	 *    Relies on:
+	 *
+	 *    MB from desc_make_reusable:A to data_push_tail:D
+	 *       matching
+	 *    RMB from desc_read:C to desc_read:E
+	 *
+	 *    Note: desc_make_reusable:A and data_push_tail:D can be
+	 *          different CPUs. However, the data_push_tail:D CPU
+	 *          (which performs the full memory barrier) must have
+	 *          previously seen desc_make_reusable:A.
+	 */
+	smp_rmb(); /* LMM(desc_read:D) */
+
+	/* Re-check the descriptor state. */
+	state_val = atomic_long_read(state_var); /* LMM(desc_read:E) */
+	return get_desc_state(id, state_val);
+}
+
+/*
+ * Take a specified descriptor out of the committed state by attempting
+ * the transition from committed to reusable. Either this task or some
+ * other task will have been successful.
+ */
+static void desc_make_reusable(struct prb_desc_ring *desc_ring,
+			       unsigned long id)
+{
+	unsigned long val_committed = id | DESC_COMMITTED_MASK;
+	unsigned long val_reusable = val_committed | DESC_REUSE_MASK;
+	struct prb_desc *desc = to_desc(desc_ring, id);
+	atomic_long_t *state_var = &desc->state_var;
+
+	atomic_long_cmpxchg_relaxed(state_var, val_committed,
+				val_reusable); /* LMM(desc_make_reusable:A) */
+}
+
+/*
+ * Given a data ring (text or dict), put the associated descriptor of each
+ * data block from @lpos_begin until @lpos_end into the reusable state.
+ *
+ * If there is any problem making the associated descriptor reusable, either
+ * the descriptor has not yet been committed or another writer task has
+ * already pushed the tail lpos past the problematic data block. Regardless,
+ * on error the caller can re-load the tail lpos to determine the situation.
+ */
+static bool data_make_reusable(struct printk_ringbuffer *rb,
+			       struct prb_data_ring *data_ring,
+			       unsigned long lpos_begin,
+			       unsigned long lpos_end,
+			       unsigned long *lpos_out)
+{
+	struct prb_desc_ring *desc_ring = &rb->desc_ring;
+	struct prb_data_blk_lpos *blk_lpos;
+	struct prb_data_block *blk;
+	unsigned long tail_lpos;
+	enum desc_state d_state;
+	struct prb_desc desc;
+	unsigned long id;
+
+	/*
+	 * Using the provided @data_ring, point @blk_lpos to the correct
+	 * blk_lpos within the local copy of the descriptor.
+	 */
+	if (data_ring == &rb->text_data_ring)
+		blk_lpos = &desc.text_blk_lpos;
+	else
+		blk_lpos = &desc.dict_blk_lpos;
+
+	/* Loop until @lpos_begin has advanced to or beyond @lpos_end. */
+	while ((lpos_end - lpos_begin) - 1 < DATA_SIZE(data_ring)) {
+		blk = to_block(data_ring, lpos_begin);
+		id = READ_ONCE(blk->id); /* LMM(data_make_reusable:A) */
+
+		/*
+		 * Guarantee the block ID is loaded before checking the tail
+		 * lpos. The loaded block ID can only be considered valid if
+		 * the tail lpos has not overtaken @lpos_begin. This pairs
+		 * with data_alloc:A.
+		 *
+		 * Memory barrier involvement:
+		 *
+		 * If data_make_reusable:A reads from data_alloc:B, then
+		 * data_make_reusable:C reads from data_push_tail:D.
+		 *
+		 * Relies on:
+		 *
+		 * MB from data_push_tail:D to data_alloc:B
+		 *    matching
+		 * RMB from data_make_reusable:A to data_make_reusable:C
+		 *
+		 * Note: data_push_tail:D and data_alloc:B can be different
+		 *       CPUs. However, the data_alloc:B CPU (which performs
+		 *       the full memory barrier) must have previously seen
+		 *       data_push_tail:D.
+		 */
+		smp_rmb(); /* LMM(data_make_reusable:B) */
+
+		tail_lpos = atomic_long_read(&data_ring->tail_lpos
+					); /* LMM(data_make_reusable:C) */
+
+		/*
+		 * If @lpos_begin has fallen behind the tail lpos, the read
+		 * block ID cannot be trusted. Fast forward @lpos_begin to the
+		 * tail lpos and try again.
+		 */
+		if (lpos_begin - tail_lpos >= DATA_SIZE(data_ring)) {
+			lpos_begin = tail_lpos;
+			continue;
+		}
+
+		d_state = desc_read(desc_ring, id,
+				    &desc); /* LMM(data_make_reusable:D) */
+
+		switch (d_state) {
+		case desc_miss:
+			return false;
+		case desc_reserved:
+			return false;
+		case desc_committed:
+			/*
+			 * This data block is invalid if the descriptor
+			 * does not point back to it.
+			 */
+			if (blk_lpos->begin != lpos_begin)
+				return false;
+			desc_make_reusable(desc_ring, id);
+			break;
+		case desc_reusable:
+			/*
+			 * This data block is invalid if the descriptor
+			 * does not point back to it.
+			 */
+			if (blk_lpos->begin != lpos_begin)
+				return false;
+			break;
+		}
+
+		/* Advance @lpos_begin to the next data block. */
+		lpos_begin = blk_lpos->next;
+	}
+
+	*lpos_out = lpos_begin;
+	return true;
+}
+
+/*
+ * Advance the data ring tail to at least @lpos. This function puts
+ * descriptors into the reusable state if the tail is pushed beyond
+ * their associated data block.
+ */
+static bool data_push_tail(struct printk_ringbuffer *rb,
+			   struct prb_data_ring *data_ring,
+			   unsigned long lpos)
+{
+	unsigned long tail_lpos;
+	unsigned long next_lpos;
+
+	/* If @lpos is not valid, there is nothing to do. */
+	if (lpos == INVALID_LPOS)
+		return true;
+
+	tail_lpos = atomic_long_read(&data_ring->tail_lpos);
+
+	do {
+		/* Done, if the tail lpos is already at or beyond @lpos. */
+		if ((lpos - tail_lpos) - 1 >= DATA_SIZE(data_ring))
+			break;
+
+		/*
+		 * Make all descriptors reusable that are associated with
+		 * data blocks before @lpos.
+		 */
+		if (!data_make_reusable(rb, data_ring, tail_lpos, lpos,
+					&next_lpos)) {
+			/*
+			 * Guarantee the descriptor state loaded in
+			 * data_make_reusable() is performed before reloading
+			 * the tail lpos. The failed data_make_reusable() may
+			 * be due to a newly recycled descriptor causing
+			 * the tail lpos to have been previously pushed. This
+			 * pairs with desc_reserve:D.
+			 *
+			 * Memory barrier involvement:
+			 *
+			 * If data_make_reusable:D reads from desc_reserve:G,
+			 * then data_push_tail:B reads from data_push_tail:D.
+			 *
+			 * Relies on:
+			 *
+			 * MB from data_push_tail:D to desc_reserve:G
+			 *    matching
+			 * RMB from data_make_reusable:D to data_push_tail:B
+			 *
+			 * Note: data_push_tail:D and desc_reserve:G can be
+			 *       different CPUs. However, the desc_reserve:G
+			 *       CPU (which performs the full memory barrier)
+			 *       must have previously seen data_push_tail:D.
+			 */
+			smp_rmb(); /* LMM(data_push_tail:A) */
+
+			next_lpos = atomic_long_read(&data_ring->tail_lpos
+						); /* LMM(data_push_tail:B) */
+			if (next_lpos == tail_lpos)
+				return false;
+
+			/* Another task pushed the tail. Try again. */
+			tail_lpos = next_lpos;
+			continue;
+		}
+
+		/*
+		 * Guarantee any descriptor states that have transitioned to
+		 * reusable are stored before pushing the tail lpos. This
+		 * allows readers to identify if data has expired while
+		 * reading the descriptor. This pairs with desc_read:D.
+		 */
+		smp_mb(); /* LMM(data_push_tail:C) */
+
+	} while (!atomic_long_try_cmpxchg_relaxed(&data_ring->tail_lpos,
+			&tail_lpos, next_lpos)); /* LMM(data_push_tail:D) */
+
+	return true;
+}
+
+/*
+ * Advance the desc ring tail. This function advances the tail by one
+ * descriptor, thus invalidating the oldest descriptor. Before advancing
+ * the tail, the tail descriptor is made reusable and all data blocks up to
+ * and including the descriptor's data block are invalidated (i.e. the data
+ * ring tail is pushed past the data block of the descriptor being made
+ * reusable).
+ */
+static bool desc_push_tail(struct printk_ringbuffer *rb,
+			   unsigned long tail_id)
+{
+	struct prb_desc_ring *desc_ring = &rb->desc_ring;
+	enum desc_state d_state;
+	struct prb_desc desc;
+
+	d_state = desc_read(desc_ring, tail_id, &desc);
+
+	switch (d_state) {
+	case desc_miss:
+		/*
+		 * If the ID is exactly 1 wrap behind the expected, it is
+		 * in the process of being reserved by another writer and
+		 * must be considered reserved.
+		 */
+		if (DESC_ID(atomic_long_read(&desc.state_var)) ==
+		    DESC_ID_PREV_WRAP(desc_ring, tail_id)) {
+			return false;
+		}
+
+		/*
+		 * The ID has changed. Another writer must have pushed the
+		 * tail and recycled the descriptor already. Success is
+		 * returned because the caller is only interested in the
+		 * specified tail being pushed, which it was.
+		 */
+		return true;
+	case desc_reserved:
+		return false;
+	case desc_committed:
+		desc_make_reusable(desc_ring, tail_id);
+		break;
+	case desc_reusable:
+		break;
+	}
+
+	/*
+	 * Data blocks must be invalidated before their associated
+	 * descriptor can be made available for recycling. Invalidating
+	 * them later is not possible because there is no way to trust
+	 * data blocks once their associated descriptor is gone.
+	 */
+
+	if (!data_push_tail(rb, &rb->text_data_ring, desc.text_blk_lpos.next))
+		return false;
+	if (!data_push_tail(rb, &rb->dict_data_ring, desc.dict_blk_lpos.next))
+		return false;
+
+	/*
+	 * Check the next descriptor after @tail_id before pushing the tail
+	 * to it because the tail must always be in a committed or reusable
+	 * state. The implementation of prb_first_seq() relies on this.
+	 *
+	 * A successful read implies that the next descriptor is less than or
+	 * equal to @head_id so there is no risk of pushing the tail past the
+	 * head.
+	 */
+	d_state = desc_read(desc_ring, DESC_ID(tail_id + 1),
+			    &desc); /* LMM(desc_push_tail:A) */
+	if (d_state == desc_committed || d_state == desc_reusable) {
+		/*
+		 * Any CPU that loads the new tail ID, must see that the
+		 * descriptor at @tail_id is in the reusable state. See the
+		 * read memory barrier part of desc_reserve:D for details.
+		 */
+		atomic_long_cmpxchg_relaxed(&desc_ring->tail_id, tail_id,
+			DESC_ID(tail_id + 1)); /* LMM(desc_push_tail:B) */
+	} else {
+		/*
+		 * Guarantee the last state load from desc_read() is before
+		 * reloading @tail_id in order to see a new tail ID in the
+		 * case that the descriptor has been recycled. This pairs
+		 * with desc_reserve:D.
+		 *
+		 * Memory barrier involvement:
+		 *
+		 * If desc_push_tail:A reads from desc_reserve:G, then
+		 * desc_push_tail:D reads from desc_push_tail:B.
+		 *
+		 * Relies on:
+		 *
+		 * MB from desc_push_tail:B to desc_reserve:G
+		 *    matching
+		 * RMB from desc_push_tail:A to desc_push_tail:D
+		 *
+		 * Note: desc_push_tail:B and desc_reserve:G can be different
+		 *       CPUs. However, the desc_reserve:G CPU (which performs
+		 *       the full memory barrier) must have previously seen
+		 *       desc_push_tail:B.
+		 */
+		smp_rmb(); /* LMM(desc_push_tail:C) */
+
+		/*
+		 * Re-check the tail ID. The descriptor following @tail_id is
+		 * not in an allowed tail state. But if the tail has since
+		 * been moved by another task, then it does not matter.
+		 */
+		if (atomic_long_read(&desc_ring->tail_id) ==
+					tail_id) { /* LMM(desc_push_tail:D) */
+			return false;
+		}
+	}
+
+	return true;
+}
+
+/* Reserve a new descriptor, invalidating the oldest if necessary. */
+static bool desc_reserve(struct printk_ringbuffer *rb, unsigned long *id_out)
+{
+	struct prb_desc_ring *desc_ring = &rb->desc_ring;
+	unsigned long prev_state_val;
+	unsigned long id_prev_wrap;
+	struct prb_desc *desc;
+	unsigned long head_id;
+	unsigned long id;
+
+	head_id = atomic_long_read(&desc_ring->head_id
+						); /* LMM(desc_reserve:A) */
+
+	do {
+		desc = to_desc(desc_ring, head_id);
+
+		id = DESC_ID(head_id + 1);
+		id_prev_wrap = DESC_ID_PREV_WRAP(desc_ring, id);
+
+		/*
+		 * Guarantee the head ID is read before reading the tail ID.
+		 * Since the tail ID is updated before the head ID, this
+		 * guarantees that @id_prev_wrap is never ahead of the tail
+		 * ID. This pairs with desc_reserve:D.
+		 *
+		 * Memory barrier involvement:
+		 *
+		 * If desc_reserve:A reads from desc_reserve:E, then
+		 * desc_reserve:C reads from desc_push_tail:B.
+		 *
+		 * Relies on:
+		 *
+		 * MB from desc_push_tail:B to desc_reserve:E
+		 *    matching
+		 * RMB from desc_reserve:A to desc_reserve:C
+		 *
+		 * Note: desc_push_tail:B and desc_reserve:E can be different
+		 *       CPUs. However, the desc_reserve:E CPU (which performs
+		 *       the full memory barrier) must have previously seen
+		 *       desc_push_tail:B.
+		 */
+		smp_rmb(); /* LMM(desc_reserve:B) */
+
+		if (id_prev_wrap == atomic_long_read(&desc_ring->tail_id
+						)) { /* LMM(desc_reserve:C) */
+			/*
+			 * Make space for the new descriptor by
+			 * advancing the tail.
+			 */
+			if (!desc_push_tail(rb, id_prev_wrap))
+				return false;
+		}
+
+		/*
+		 * Guarantee the tail ID is read before validating the
+		 * recycled descriptor state. A read memory barrier is
+		 * sufficient for this. This pairs with data_push_tail:C.
+		 *
+		 * Memory barrier involvement:
+		 *
+		 * If desc_reserve:C reads from desc_push_tail:B, then
+		 * desc_reserve:F reads from desc_make_reusable:A.
+		 *
+		 * Relies on:
+		 *
+		 * MB from desc_make_reusable:A to desc_push_tail:B
+		 *    matching
+		 * RMB from desc_reserve:C to desc_reserve:F
+		 *
+		 * Note: desc_make_reusable:A, desc_push_tail:B, and
+		 *       data_push_tail:C can all be different CPUs. However,
+		 *       the desc_push_tail:B CPU must have previously seen
+		 *       data_push_tail:D and the data_push_tail:D CPU (which
+		 *       performs the full memory barrier) must have
+		 *       previously seen desc_make_reusable:A.
+		 *
+		 * Guarantee any data ring tail changes are stored before
+		 * recycling the descriptor. Data ring tail changes can happen
+		 * via desc_push_tail()->data_push_tail(). A full memory
+		 * barrier is needed since another task may have pushed the
+		 * data ring tails. This pairs with data_push_tail:A.
+		 *
+		 * Guarantee a new tail ID is stored before recycling the
+		 * descriptor. A full memory barrier is needed since another
+		 * task may have pushed the tail ID. This pairs with
+		 * desc_push_tail:C and prb_first_seq:C.
+		 *
+		 * Guarantee the tail ID is stored before storing the head ID.
+		 * This pairs with desc_reserve:B.
+		 */
+		smp_mb(); /* LMM(desc_reserve:D) */
+
+	} while (!atomic_long_try_cmpxchg_relaxed(&desc_ring->head_id,
+				&head_id, id)); /* LMM(desc_reserve:E) */
+
+	desc = to_desc(desc_ring, id);
+
+	/*
+	 * If the descriptor has been recycled, verify the old state val.
+	 * See "ABA Issues" about why this verification is performed.
+	 */
+	prev_state_val = atomic_long_read(&desc->state_var
+						); /* LMM(desc_reserve:F) */
+	if (prev_state_val && prev_state_val != (id_prev_wrap |
+						 DESC_COMMITTED_MASK |
+						 DESC_REUSE_MASK)) {
+		WARN_ON_ONCE(1);
+		return false;
+	}
+
+	/*
+	 * Assign the descriptor a new ID and set its state to reserved.
+	 * See "ABA Issues" about why cmpxchg() instead of set() is used.
+	 */
+	if (!atomic_long_try_cmpxchg_relaxed(&desc->state_var,
+			&prev_state_val, id | 0)) { /* LMM(desc_reserve:G) */
+		WARN_ON_ONCE(1);
+		return false;
+	}
+
+	/*
+	 * Guarantee the new descriptor ID and state is stored before making
+	 * any other changes. This pairs with desc_read:D.
+	 */
+	smp_wmb(); /* LMM(desc_reserve:H) */
+
+	/* Now data in @desc can be modified: LMM(desc_reserve:I) */
+
+	*id_out = id;
+	return true;
+}
+
+/* Determine the end of a data block. */
+static unsigned long get_next_lpos(struct prb_data_ring *data_ring,
+				   unsigned long lpos, unsigned int size)
+{
+	unsigned long begin_lpos;
+	unsigned long next_lpos;
+
+	begin_lpos = lpos;
+	next_lpos = lpos + size;
+
+	if (DATA_WRAPS(data_ring, begin_lpos) ==
+	    DATA_WRAPS(data_ring, next_lpos)) {
+		/* The data block does not wrap. */
+		return next_lpos;
+	}
+
+	/* Wrapping data blocks store their data at the beginning. */
+	return (DATA_THIS_WRAP_START_LPOS(data_ring, next_lpos) + size);
+}
+
+/*
+ * Allocate a new data block, invalidating the oldest data block(s)
+ * if necessary. This function also associates the data block with
+ * a specified descriptor.
+ */
+static char *data_alloc(struct printk_ringbuffer *rb,
+			struct prb_data_ring *data_ring, unsigned long size,
+			struct prb_data_blk_lpos *blk_lpos, unsigned long id)
+{
+	struct prb_data_block *blk;
+	unsigned long begin_lpos;
+	unsigned long next_lpos;
+
+	if (!data_ring->data || size == 0) {
+		/* Specify a data-less block. */
+		blk_lpos->begin = INVALID_LPOS;
+		blk_lpos->next = INVALID_LPOS;
+		return NULL;
+	}
+
+	size = to_blk_size(size);
+
+	begin_lpos = atomic_long_read(&data_ring->head_lpos);
+
+	do {
+		next_lpos = get_next_lpos(data_ring, begin_lpos, size);
+
+		if (!data_push_tail(rb, data_ring,
+				    next_lpos - DATA_SIZE(data_ring))) {
+			/* Failed to allocate, specify a data-less block. */
+			blk_lpos->begin = INVALID_LPOS;
+			blk_lpos->next = INVALID_LPOS;
+			return NULL;
+		}
+	} while (!atomic_long_try_cmpxchg_relaxed(&data_ring->head_lpos,
+						  &begin_lpos, next_lpos));
+
+	/*
+	 * Guarantee any updated tail lpos is stored before setting the new
+	 * block ID. This allows block IDs to be trusted based on the tail
+	 * lpos. A full memory barrier is needed since another task may
+	 * have updated the tail lpos. This pairs with data_make_reusable:B.
+	 */
+	smp_mb(); /* LMM(data_alloc:A) */
+
+	blk = to_block(data_ring, begin_lpos);
+	WRITE_ONCE(blk->id, id); /* LMM(data_alloc:B) */
+
+	if (DATA_WRAPS(data_ring, begin_lpos) !=
+	    DATA_WRAPS(data_ring, next_lpos)) {
+		/* Wrapping data blocks store their data at the beginning. */
+		blk = to_block(data_ring, 0);
+
+		/*
+		 * Store the ID on the wrapped block for consistency.
+		 * The printk_ringbuffer does not actually use it.
+		 */
+		blk->id = id;
+	}
+
+	blk_lpos->begin = begin_lpos;
+	blk_lpos->next = next_lpos;
+
+	return &blk->data[0];
+}
+
+/* Return the number of bytes used by a data block. */
+static unsigned int space_used(struct prb_data_ring *data_ring,
+			       struct prb_data_blk_lpos *blk_lpos)
+{
+	if (DATA_WRAPS(data_ring, blk_lpos->begin) ==
+	    DATA_WRAPS(data_ring, blk_lpos->next)) {
+		return (DATA_INDEX(data_ring, blk_lpos->next) -
+			DATA_INDEX(data_ring, blk_lpos->begin));
+	}
+
+	/*
+	 * For wrapping data blocks, the trailing (wasted) space is
+	 * also counted.
+	 */
+	return (DATA_INDEX(data_ring, blk_lpos->next) +
+		DATA_SIZE(data_ring) -
+		DATA_INDEX(data_ring, blk_lpos->begin));
+}
+
+/**
+ * prb_reserve() - Reserve space in the ringbuffer.
+ *
+ * @e:  The entry structure to setup.
+ * @rb: The ringbuffer to reserve data in.
+ * @r:  The record structure to allocate buffers for.
+ *
+ * This is the public function available to writers to reserve data.
+ *
+ * The writer specifies the text and dict sizes to reserve by setting the
+ * @text_buf_size and @dict_buf_size fields of @r, respectively. Dictionaries
+ * are optional, so @dict_buf_size is allowed to be 0. To ensure proper
+ * initialization of @r, prb_rec_init_wr() should be used.
+ *
+ * Context: Any context. Disables local interrupts on success.
+ * Return: true if at least text data could be allocated, otherwise false.
+ *
+ * On success, the fields @info, @text_buf, @dict_buf of @r will be set by
+ * this function and should be filled in by the writer before committing. Also
+ * on success, prb_record_text_space() can be used on @e to query the actual
+ * space used for the text data block.
+ *
+ * If the function fails to reserve dictionary space (but all else succeeded),
+ * it will still report success. In that case @dict_buf is set to NULL and
+ * @dict_buf_size is set to 0. Writers must check this before writing to
+ * dictionary space.
+ *
+ * @info->text_len and @info->dict_len will already be set to @text_buf_size
+ * and @dict_buf_size, respectively. If dictionary space reserveration fails,
+ * @info->dict_len is set to 0.
+ */
+bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
+		 struct printk_record *r)
+{
+	struct prb_desc_ring *desc_ring = &rb->desc_ring;
+	struct prb_desc *d;
+	unsigned long id;
+
+	if (!data_check_size(&rb->text_data_ring, r->text_buf_size))
+		goto fail;
+
+	/* Records without dictionaries are allowed. */
+	if (r->dict_buf_size) {
+		if (!data_check_size(&rb->dict_data_ring, r->dict_buf_size))
+			goto fail;
+	}
+
+	/*
+	 * Descriptors in the reserved state act as blockers to all further
+	 * reservations once the desc_ring has fully wrapped. Disable
+	 * interrupts during the reserve/commit window in order to minimize
+	 * the likelihood of this happening.
+	 */
+	local_irq_save(e->irqflags);
+
+	if (!desc_reserve(rb, &id)) {
+		/* Descriptor reservation failures are tracked. */
+		atomic_long_inc(&rb->fail);
+		local_irq_restore(e->irqflags);
+		goto fail;
+	}
+
+	d = to_desc(desc_ring, id);
+
+	/*
+	 * Set the @e fields here so that prb_commit() can be used if
+	 * text data allocation fails.
+	 */
+	e->rb = rb;
+	e->id = id;
+
+	/*
+	 * Initialize the sequence number if it has "never been set".
+	 * Otherwise just increment it by a full wrap.
+	 *
+	 * @seq is considered "never been set" if it has a value of 0,
+	 * _except_ for @descs[0], which was specially setup by the ringbuffer
+	 * initializer and therefore is always considered as set.
+	 *
+	 * See the "Bootstrap" comment block in printk_ringbuffer.h for
+	 * details about how the initializer bootstraps the descriptors.
+	 */
+	if (d->info.seq == 0 && DESC_INDEX(desc_ring, id) != 0)
+		d->info.seq = DESC_INDEX(desc_ring, id);
+	else
+		d->info.seq += DESCS_COUNT(desc_ring);
+
+	r->text_buf = data_alloc(rb, &rb->text_data_ring, r->text_buf_size,
+				 &d->text_blk_lpos, id);
+	/* If text data allocation fails, a data-less record is committed. */
+	if (r->text_buf_size && !r->text_buf) {
+		d->info.text_len = 0;
+		d->info.dict_len = 0;
+		prb_commit(e);
+		/* prb_commit() re-enabled interrupts. */
+		goto fail;
+	}
+
+	r->dict_buf = data_alloc(rb, &rb->dict_data_ring, r->dict_buf_size,
+				 &d->dict_blk_lpos, id);
+	/*
+	 * If dict data allocation fails, the caller can still commit
+	 * text. But dictionary information will not be available.
+	 */
+	if (r->dict_buf_size && !r->dict_buf)
+		r->dict_buf_size = 0;
+
+	r->info = &d->info;
+
+	/* Set default values for the sizes. */
+	d->info.text_len = r->text_buf_size;
+	d->info.dict_len = r->dict_buf_size;
+
+	/* Record full text space used by record. */
+	e->text_space = space_used(&rb->text_data_ring, &d->text_blk_lpos);
+
+	return true;
+fail:
+	/* Make it clear to the caller that the reserve failed. */
+	memset(r, 0, sizeof(*r));
+	return false;
+}
+
+/**
+ * prb_commit() - Commit (previously reserved) data to the ringbuffer.
+ *
+ * @e: The entry containing the reserved data information.
+ *
+ * This is the public function available to writers to commit data.
+ *
+ * Context: Any context. Enables local interrupts.
+ */
+void prb_commit(struct prb_reserved_entry *e)
+{
+	struct prb_desc_ring *desc_ring = &e->rb->desc_ring;
+	struct prb_desc *d = to_desc(desc_ring, e->id);
+	unsigned long prev_state_val = e->id | 0;
+
+	/* Now the writer has finished all writing: LMM(prb_commit:A) */
+
+	/*
+	 * Guarantee all record data is stored before the descriptor
+	 * state is stored as committed. This pairs with desc_read:B.
+	 */
+	smp_wmb(); /* LMM(prb_commit:B) */
+
+	/*
+	 * Set the descriptor as committed.
+	 * See "ABA Issues" about why cmpxchg() instead of set() is used.
+	 */
+	if (!atomic_long_try_cmpxchg_relaxed(&d->state_var, &prev_state_val,
+		e->id | DESC_COMMITTED_MASK)) { /* LMM(prb_commit:C) */
+		WARN_ON_ONCE(1);
+	}
+
+	/* Restore interrupts, the reserve/commit window is finished. */
+	local_irq_restore(e->irqflags);
+}
+
+/*
+ * Given @blk_lpos, return a pointer to the raw data from the data block
+ * and calculate the size of the data part. A NULL pointer is returned
+ * if @blk_lpos specifies values that could never be legal.
+ *
+ * This function (used by readers) performs strict validation on the lpos
+ * values to possibly detect bugs in the writer code. A WARN_ON_ONCE() is
+ * triggered if an internal error is detected.
+ */
+static char *get_data(struct prb_data_ring *data_ring,
+		      struct prb_data_blk_lpos *blk_lpos,
+		      unsigned long *data_size)
+{
+	struct prb_data_block *db;
+
+	/* Data-less data block description. */
+	if (blk_lpos->begin == INVALID_LPOS &&
+	    blk_lpos->next == INVALID_LPOS) {
+		return NULL;
+	}
+
+	/* Regular data block: @begin less than @next and in same wrap. */
+	if (DATA_WRAPS(data_ring, blk_lpos->begin) ==
+	    DATA_WRAPS(data_ring, blk_lpos->next) &&
+	    blk_lpos->begin < blk_lpos->next) {
+		db = to_block(data_ring, blk_lpos->begin);
+		*data_size = blk_lpos->next - blk_lpos->begin;
+
+	/* Wrapping data block: @begin is one wrap behind @next. */
+	} else if (DATA_WRAPS(data_ring,
+			      blk_lpos->begin + DATA_SIZE(data_ring)) ==
+		   DATA_WRAPS(data_ring, blk_lpos->next)) {
+		db = to_block(data_ring, 0);
+		*data_size = DATA_INDEX(data_ring, blk_lpos->next);
+
+	/* Illegal block description. */
+	} else {
+		WARN_ON_ONCE(1);
+		return NULL;
+	}
+
+	/* A valid data block will always be aligned to the ID size. */
+	if (WARN_ON_ONCE(blk_lpos->begin !=
+			 ALIGN(blk_lpos->begin, sizeof(db->id))) ||
+	    WARN_ON_ONCE(blk_lpos->next !=
+			 ALIGN(blk_lpos->next, sizeof(db->id)))) {
+		return NULL;
+	}
+
+	/* A valid data block will always have at least an ID. */
+	if (WARN_ON_ONCE(*data_size < sizeof(db->id)))
+		return NULL;
+
+	/* Subtract descriptor ID space from size to reflect data size. */
+	*data_size -= sizeof(db->id);
+
+	return &db->data[0];
+}
+
+/**
+ * prb_count_lines() - Count the number of lines in provided text.
+ *
+ * @text:      The text to count the lines of.
+ * @text_size: The size of the text to process.
+ *
+ * This is the public function available to readers to count the number of
+ * lines in a text string.
+ *
+ * Context: Any context.
+ * Return: The number of lines in the text.
+ *
+ * All text has at least 1 line (even if @text_size is 0). Each '\n'
+ * processed is counted as an additional line.
+ */
+unsigned int prb_count_lines(char *text, unsigned int text_size)
+{
+	unsigned long next_size = text_size;
+	unsigned int line_count = 1;
+	char *next = text;
+
+	while (next_size) {
+		next = memchr(next, '\n', next_size);
+		if (!next)
+			break;
+		line_count++;
+		next++;
+		next_size = text_size - (next - text);
+	}
+
+	return line_count;
+}
+
+/*
+ * Given @blk_lpos, copy an expected @len of data into the provided buffer.
+ * If @line_count is provided, count the number of lines in the data.
+ *
+ * This function (used by readers) performs strict validation on the data
+ * size to possibly detect bugs in the writer code. A WARN_ON_ONCE() is
+ * triggered if an internal error is detected.
+ */
+static bool copy_data(struct prb_data_ring *data_ring,
+		      struct prb_data_blk_lpos *blk_lpos, u16 len, char *buf,
+		      unsigned int buf_size, unsigned int *line_count)
+{
+	unsigned long data_size;
+	char *data;
+
+	/* Caller might not want any data. */
+	if ((!buf || !buf_size) && !line_count)
+		return true;
+
+	data = get_data(data_ring, blk_lpos, &data_size);
+	if (!data)
+		return false;
+
+	/*
+	 * Actual cannot be less than expected. It can be more than expected
+	 * because of the trailing alignment padding.
+	 */
+	if (WARN_ON_ONCE(data_size < (unsigned long)len)) {
+		pr_warn_once(
+		    "wrong data size (%lu, expecting %hu) for data: %.*s\n",
+		    data_size, len, (int)data_size, data);
+		return false;
+	}
+
+	/* Caller interested in the line count? */
+	if (line_count)
+		*line_count = prb_count_lines(data, data_size);
+
+	/* Caller interested in the data content? */
+	if (!buf || !buf_size)
+		return true;
+
+	data_size = min_t(u16, buf_size, len);
+
+	if (!WARN_ON_ONCE(!data_size))
+		memcpy(&buf[0], data, data_size);
+	return true;
+}
+
+/*
+ * This is an extended version of desc_read(). It gets a copy of a specified
+ * descriptor. However, it also verifies that the record is committed and has
+ * the sequence number @seq. On success, 0 is returned.
+ *
+ * Error return values:
+ * -EINVAL: A committed record with sequence number @seq does not exist.
+ * -ENOENT: A committed record with sequence number @seq exists, but its data
+ *          is not available. This is a valid record, so readers should
+ *          continue with the next record.
+ */
+static int desc_read_committed_seq(struct prb_desc_ring *desc_ring,
+				   unsigned long id, u64 seq,
+				   struct prb_desc *desc_out)
+{
+	struct prb_data_blk_lpos *blk_lpos = &desc_out->text_blk_lpos;
+	enum desc_state d_state;
+
+	d_state = desc_read(desc_ring, id, desc_out);
+
+	/*
+	 * An unexpeced @id (desc_miss) or @seq mismatch means the record
+	 * does not exist. A descriptor in the reserved state means the
+	 * record does not yet exist for the reader.
+	 */
+	if (d_state == desc_miss ||
+	    d_state == desc_reserved ||
+	    desc_out->info.seq != seq) {
+		return -EINVAL;
+	}
+
+	/*
+	 * A descriptor in the reusable state may no longer have its data
+	 * available; report it as a data-less record. Or the record may
+	 * actually be a data-less record.
+	 */
+	if (d_state == desc_reusable ||
+	    (blk_lpos->begin == INVALID_LPOS &&
+	     blk_lpos->next == INVALID_LPOS)) {
+		return -ENOENT;
+	}
+
+	return 0;
+}
+
+/*
+ * Copy the ringbuffer data from the record with @seq to the provided
+ * @r buffer. On success, 0 is returned.
+ *
+ * See desc_read_committed_seq() for error return values.
+ */
+static int prb_read(struct printk_ringbuffer *rb, u64 seq,
+		    struct printk_record *r, unsigned int *line_count)
+{
+	struct prb_desc_ring *desc_ring = &rb->desc_ring;
+	struct prb_desc *rdesc = to_desc(desc_ring, seq);
+	atomic_long_t *state_var = &rdesc->state_var;
+	struct prb_desc desc;
+	unsigned long id;
+	int err;
+
+	/* Extract the ID, used to specify the descriptor to read. */
+	id = DESC_ID(atomic_long_read(state_var));
+
+	/* Get a local copy of the correct descriptor (if available). */
+	err = desc_read_committed_seq(desc_ring, id, seq, &desc);
+
+	/*
+	 * If @r is NULL, the caller is only interested in the availability
+	 * of the record.
+	 */
+	if (err || !r)
+		return err;
+
+	/* If requested, copy meta data. */
+	if (r->info)
+		memcpy(r->info, &desc.info, sizeof(*(r->info)));
+
+	/* Copy text data. If it fails, this is a data-less descriptor. */
+	if (!copy_data(&rb->text_data_ring, &desc.text_blk_lpos,
+		       desc.info.text_len, r->text_buf, r->text_buf_size,
+		       line_count)) {
+		return -ENOENT;
+	}
+
+	/*
+	 * Copy dict data. Although this should not fail, dict data is not
+	 * important. So if it fails, modify the copied meta data to report
+	 * that there is no dict data, thus silently dropping the dict data.
+	 */
+	if (!copy_data(&rb->dict_data_ring, &desc.dict_blk_lpos,
+		       desc.info.dict_len, r->dict_buf, r->dict_buf_size,
+		       NULL)) {
+		if (r->info)
+			r->info->dict_len = 0;
+	}
+
+	/* Ensure the record is still committed and has the same @seq. */
+	return desc_read_committed_seq(desc_ring, id, seq, &desc);
+}
+
+/**
+ * prb_first_seq() - Get the sequence number of the tail descriptor.
+ *
+ * @rb:  The ringbuffer to get the sequence number from.
+ *
+ * This is the public function available to readers to see what the
+ * first/oldest sequence number is.
+ *
+ * This provides readers a starting point to begin iterating the ringbuffer.
+ * Note that the returned sequence number might not belong to a valid record.
+ *
+ * Context: Any context.
+ * Return: The sequence number of the first/oldest record or, if the
+ *         ringbuffer is empty, 0 is returned.
+ */
+u64 prb_first_seq(struct printk_ringbuffer *rb)
+{
+	struct prb_desc_ring *desc_ring = &rb->desc_ring;
+	enum desc_state d_state;
+	struct prb_desc desc;
+	unsigned long id;
+
+	for (;;) {
+		id = atomic_long_read(
+			&rb->desc_ring.tail_id); /* LMM(prb_first_seq:A) */
+
+		d_state = desc_read(desc_ring, id,
+				    &desc); /* LMM(prb_first_seq:B) */
+
+		/*
+		 * This loop will not be infinite because the tail is
+		 * _always_ in the committed or reusable state.
+		 */
+		if (d_state == desc_committed || d_state == desc_reusable)
+			break;
+
+		/*
+		 * Guarantee the last state load from desc_read() is before
+		 * reloading @tail_id in order to see a new tail in the case
+		 * that the descriptor has been recycled. This pairs with
+		 * desc_reserve:D.
+		 */
+		smp_rmb(); /* LMM(prb_first_seq:C) */
+
+		/*
+		 * Reload the tail ID.
+		 *
+		 * Memory barrier involvement:
+		 *
+		 * If prb_first_seq:B reads from desc_reserve:G, then
+		 * prb_first_seq:A reads from desc_push_tail:B.
+		 *
+		 * Relies on:
+		 *
+		 * MB from desc_push_tail:B to desc_reserve:G
+		 *    matching
+		 * RMB prb_first_seq:B to prb_first_seq:A
+		 */
+	}
+
+	return desc.info.seq;
+}
+
+/*
+ * Non-blocking read of a record. Updates @seq to the last committed record
+ * (which may have no data).
+ *
+ * See the description of prb_read_valid() and prb_read_valid_info()
+ * for details.
+ */
+bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq,
+		     struct printk_record *r, unsigned int *line_count)
+{
+	u64 tail_seq;
+	int err;
+
+	while ((err = prb_read(rb, *seq, r, line_count))) {
+		tail_seq = prb_first_seq(rb);
+
+		if (*seq < tail_seq) {
+			/*
+			 * Behind the tail. Catch up and try again. This
+			 * can happen for -ENOENT and -EINVAL cases.
+			 */
+			*seq = tail_seq;
+
+		} else if (err == -ENOENT) {
+			/* Record exists, but no data available. Skip. */
+			(*seq)++;
+
+		} else {
+			/* Non-existent/non-committed record. Must stop. */
+			return false;
+		}
+	}
+
+	return true;
+}
+
+/**
+ * prb_read_valid() - Non-blocking read of a requested record or (if gone)
+ *                    the next available record.
+ *
+ * @rb:  The ringbuffer to read from.
+ * @seq: The sequence number of the record to read.
+ * @r:   A record data buffer to store the read record to.
+ *
+ * This is the public function available to readers to read a record.
+ *
+ * The reader provides the @info, @text_buf, @dict_buf buffers of @r to be
+ * filled in. Any of the buffer pointers can be set to NULL if the reader
+ * is not interested in that data. To ensure proper initialization of @r,
+ * prb_rec_init_rd() should be used.
+ *
+ * Context: Any context.
+ * Return: true if a record was read, otherwise false.
+ *
+ * On success, the reader must check r->info.seq to see which record was
+ * actually read. This allows the reader to detect dropped records.
+ *
+ * Failure means @seq refers to a not yet written record.
+ */
+bool prb_read_valid(struct printk_ringbuffer *rb, u64 seq,
+		    struct printk_record *r)
+{
+	return _prb_read_valid(rb, &seq, r, NULL);
+}
+
+/**
+ * prb_read_valid_info() - Non-blocking read of meta data for a requested
+ *                         record or (if gone) the next available record.
+ *
+ * @rb:         The ringbuffer to read from.
+ * @seq:        The sequence number of the record to read.
+ * @info:       A buffer to store the read record meta data to.
+ * @line_count: A buffer to store the number of lines in the record text.
+ *
+ * This is the public function available to readers to read only the
+ * meta data of a record.
+ *
+ * The reader provides the @info, @line_count buffers to be filled in.
+ * Either of the buffer pointers can be set to NULL if the reader is not
+ * interested in that data.
+ *
+ * Context: Any context.
+ * Return: true if a record's meta data was read, otherwise false.
+ *
+ * On success, the reader must check info->seq to see which record meta data
+ * was actually read. This allows the reader to detect dropped records.
+ *
+ * Failure means @seq refers to a not yet written record.
+ */
+bool prb_read_valid_info(struct printk_ringbuffer *rb, u64 seq,
+			 struct printk_info *info, unsigned int *line_count)
+{
+	struct printk_record r;
+
+	prb_rec_init_rd(&r, info, NULL, 0, NULL, 0);
+
+	return _prb_read_valid(rb, &seq, &r, line_count);
+}
+
+/**
+ * prb_next_seq() - Get the sequence number after the last available record.
+ *
+ * @rb:  The ringbuffer to get the sequence number from.
+ *
+ * This is the public function available to readers to see what the next
+ * newest sequence number available to readers will be.
+ *
+ * This provides readers a sequence number to jump to if all currently
+ * available records should be skipped.
+ *
+ * Context: Any context.
+ * Return: The sequence number of the next newest (not yet available) record
+ *         for readers.
+ */
+u64 prb_next_seq(struct printk_ringbuffer *rb)
+{
+	u64 seq = 0;
+
+	do {
+		/* Search forward from the oldest descriptor. */
+		if (!_prb_read_valid(rb, &seq, NULL, NULL))
+			return seq;
+		seq++;
+	} while (seq);
+
+	return 0;
+}
+
+/**
+ * prb_init() - Initialize a ringbuffer to use provided external buffers.
+ *
+ * @rb:       The ringbuffer to initialize.
+ * @text_buf: The data buffer for text data.
+ * @textbits: The size of @text_buf as a power-of-2 value.
+ * @dict_buf: The data buffer for dictionary data.
+ * @dictbits: The size of @dict_buf as a power-of-2 value.
+ * @descs:    The descriptor buffer for ringbuffer records.
+ * @descbits: The count of @descs items as a power-of-2 value.
+ *
+ * This is the public function available to writers to setup a ringbuffer
+ * during runtime using provided buffers.
+ *
+ * This must match the initialization of DECLARE_PRINTKRB().
+ *
+ * Context: Any context.
+ */
+void prb_init(struct printk_ringbuffer *rb,
+	      char *text_buf, unsigned int textbits,
+	      char *dict_buf, unsigned int dictbits,
+	      struct prb_desc *descs, unsigned int descbits)
+{
+	memset(descs, 0, _DESCS_COUNT(descbits) * sizeof(descs[0]));
+
+	rb->desc_ring.count_bits = descbits;
+	rb->desc_ring.descs = descs;
+	atomic_long_set(&rb->desc_ring.head_id, DESC0_ID(descbits));
+	atomic_long_set(&rb->desc_ring.tail_id, DESC0_ID(descbits));
+
+	rb->text_data_ring.size_bits = textbits;
+	rb->text_data_ring.data = text_buf;
+	atomic_long_set(&rb->text_data_ring.head_lpos, BLK0_LPOS(textbits));
+	atomic_long_set(&rb->text_data_ring.tail_lpos, BLK0_LPOS(textbits));
+
+	rb->dict_data_ring.size_bits = dictbits;
+	rb->dict_data_ring.data = dict_buf;
+	atomic_long_set(&rb->dict_data_ring.head_lpos, BLK0_LPOS(dictbits));
+	atomic_long_set(&rb->dict_data_ring.tail_lpos, BLK0_LPOS(dictbits));
+
+	atomic_long_set(&rb->fail, 0);
+
+	descs[0].info.seq = -(u64)_DESCS_COUNT(descbits);
+
+	descs[_DESCS_COUNT(descbits) - 1].info.seq = 0;
+	atomic_long_set(&(descs[_DESCS_COUNT(descbits) - 1].state_var),
+			DESC0_SV(descbits));
+	descs[_DESCS_COUNT(descbits) - 1].text_blk_lpos.begin = INVALID_LPOS;
+	descs[_DESCS_COUNT(descbits) - 1].text_blk_lpos.next = INVALID_LPOS;
+	descs[_DESCS_COUNT(descbits) - 1].dict_blk_lpos.begin = INVALID_LPOS;
+	descs[_DESCS_COUNT(descbits) - 1].dict_blk_lpos.next = INVALID_LPOS;
+}
+
+/**
+ * prb_record_text_space() - Query the full actual used ringbuffer space for
+ *                           the text data of a reserved entry.
+ *
+ * @e: The successfully reserved entry to query.
+ *
+ * This is the public function available to writers to see how much actual
+ * space is used in the ringbuffer to store the text data of the specified
+ * entry.
+ *
+ * This function is only valid if @e has been successfully reserved using
+ * prb_reserve().
+ *
+ * Context: Any context.
+ * Return: The size in bytes used by the text data of the associated record.
+ */
+unsigned int prb_record_text_space(struct prb_reserved_entry *e)
+{
+	return e->text_space;
+}
diff --git a/kernel/printk/printk_ringbuffer.h b/kernel/printk/printk_ringbuffer.h
new file mode 100644
index 000000000000..f929b3e26a11
--- /dev/null
+++ b/kernel/printk/printk_ringbuffer.h
@@ -0,0 +1,369 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+
+#ifndef _KERNEL_PRINTK_RINGBUFFER_H
+#define _KERNEL_PRINTK_RINGBUFFER_H
+
+#include <linux/atomic.h>
+
+struct printk_info {
+	u64	seq;		/* sequence number */
+	u64	ts_nsec;	/* timestamp in nanoseconds */
+	u16	text_len;	/* length of text message */
+	u16	dict_len;	/* length of dictionary message */
+	u8	facility;	/* syslog facility */
+	u8	flags:5;	/* internal record flags */
+	u8	level:3;	/* syslog level */
+	u32	caller_id;	/* thread id or processor id */
+};
+
+/*
+ * A structure providing the buffers, used by writers and readers.
+ *
+ * Writers:
+ * Using prb_rec_init_wr(), a writer sets @text_buf_size and @dict_buf_size
+ * before calling prb_reserve(). On success, prb_reserve() sets @info,
+ * @text_buf, @dict_buf to buffers reserved for that writer.
+ *
+ * Readers:
+ * Using prb_rec_init_rd(), a reader sets all fields before calling
+ * prb_read_valid(). Note that the reader provides the @info, @text_buf,
+ * @dict_buf buffers. On success, the struct pointed to by @info will be
+ * filled and the char arrays pointed to by @text_buf and @dict_buf will
+ * be filled with text and dict data.
+ */
+struct printk_record {
+	struct printk_info	*info;
+	char			*text_buf;
+	char			*dict_buf;
+	unsigned int		text_buf_size;
+	unsigned int		dict_buf_size;
+};
+
+/* Specifies the position/span of a data block. */
+struct prb_data_blk_lpos {
+	unsigned long	begin;
+	unsigned long	next;
+};
+
+/* A descriptor: the complete meta-data for a record. */
+struct prb_desc {
+	struct printk_info		info;
+	atomic_long_t			state_var;
+	struct prb_data_blk_lpos	text_blk_lpos;
+	struct prb_data_blk_lpos	dict_blk_lpos;
+};
+
+/* A ringbuffer of "ID + data" elements. */
+struct prb_data_ring {
+	unsigned int	size_bits;
+	char		*data;
+	atomic_long_t	head_lpos;
+	atomic_long_t	tail_lpos;
+};
+
+/* A ringbuffer of "struct prb_desc" elements. */
+struct prb_desc_ring {
+	unsigned int		count_bits;
+	struct prb_desc		*descs;
+	atomic_long_t		head_id;
+	atomic_long_t		tail_id;
+};
+
+/* The high level structure representing the printk ringbuffer. */
+struct printk_ringbuffer {
+	struct prb_desc_ring	desc_ring;
+	struct prb_data_ring	text_data_ring;
+	struct prb_data_ring	dict_data_ring;
+	atomic_long_t		fail;
+};
+
+/* Used by writers as a reserve/commit handle. */
+struct prb_reserved_entry {
+	struct printk_ringbuffer	*rb;
+	unsigned long			irqflags;
+	unsigned long			id;
+	unsigned int			text_space;
+};
+
+#define _DATA_SIZE(sz_bits)		(1UL << (sz_bits))
+#define _DESCS_COUNT(ct_bits)		(1U << (ct_bits))
+#define DESC_SV_BITS			(sizeof(unsigned long) * 8)
+#define DESC_COMMITTED_MASK		(1UL << (DESC_SV_BITS - 1))
+#define DESC_REUSE_MASK			(1UL << (DESC_SV_BITS - 2))
+#define DESC_FLAGS_MASK			(DESC_COMMITTED_MASK | DESC_REUSE_MASK)
+#define DESC_ID_MASK			(~DESC_FLAGS_MASK)
+#define DESC_ID(sv)			((sv) & DESC_ID_MASK)
+#define INVALID_LPOS			1
+
+#define INVALID_BLK_LPOS	\
+	{			\
+		.begin	= INVALID_LPOS,	\
+		.next	= INVALID_LPOS,	\
+	}
+
+/*
+ * Descriptor Bootstrap
+ *
+ * The descriptor array is minimally initialized to allow immediate usage
+ * by readers and writers. The requirements that the descriptor array
+ * initialization must satisfy:
+ *
+ *   Req1
+ *     The tail must point to an existing (committed or reusable) descriptor.
+ *     This is required by the implementation of prb_first_seq().
+ *
+ *   Req2
+ *     Readers must see that the ringbuffer is initially empty.
+ *
+ *   Req3
+ *     The first record reserved by a writer is assigned sequence number 0.
+ *
+ * To satisfy Req1, the tail initially points to a descriptor that is
+ * minimally initialized (having no data block, i.e. data-less with the
+ * data block's lpos @begin and @next values set to INVALID_LPOS).
+ *
+ * To satisfy Req2, the initial tail descriptor is initialized to the
+ * reusable state. Readers recognize reusable descriptors as existing
+ * records, but skip over them.
+ *
+ * To satisfy Req3, the last descriptor in the array is used as the initial
+ * head (and tail) descriptor. This allows the first record reserved by a
+ * writer (head + 1) to be the first descriptor in the array. (Only the first
+ * descriptor in the array could have a valid sequence number of 0.)
+ *
+ * The first time a descriptor is reserved, it is assigned a sequence number
+ * with the value of the array index. A "first time reserved" descriptor can
+ * be recognized because it has a sequence number of 0 but does not have an
+ * index of 0. (Only the first descriptor in the array could have a valid
+ * sequence number of 0.) After the first reservation, all future reservations
+ * (recycling) simply involve incrementing the sequence number by the array
+ * count.
+ *
+ *   Hack #1
+ *     Only the first descriptor in the array is allowed to have the sequence
+ *     number 0. In this case it is not possible to recognize if it is being
+ *     reserved the first time (set to index value) or has been reserved
+ *     previously (increment by the array count). This is handled by _always_
+ *     incrementing the sequence number by the array count when reserving the
+ *     first descriptor in the array. In order to satisfy Req3, the sequence
+ *     number of the first descriptor in the array is initialized to minus
+ *     the array count. Then, upon the first reservation, it is incremented
+ *     to 0, thus satisfying Req3.
+ *
+ *   Hack #2
+ *     prb_first_seq() can be called at any time by readers to retrieve the
+ *     sequence number of the tail descriptor. However, due to Req2 and Req3,
+ *     initially there are no records to report the sequence number of
+ *     (sequence numbers are u64 and there is nothing less than 0). To handle
+ *     this, the sequence number of the initial tail descriptor is initialized
+ *     to 0. Technically this is incorrect, because there is no record with
+ *     sequence number 0 (yet) and the tail descriptor is not the first
+ *     descriptor in the array. But it allows prb_read_valid() to correctly
+ *     report the existence of a record for _any_ given sequence number at all
+ *     times. Bootstrapping is complete when the tail is pushed the first
+ *     time, thus finally pointing to the first descriptor reserved by a
+ *     writer, which has the assigned sequence number 0.
+ */
+
+/*
+ * Initiating Logical Value Overflows
+ *
+ * Both logical position (lpos) and ID values can be mapped to array indexes
+ * but may experience overflows during the lifetime of the system. To ensure
+ * that printk_ringbuffer can handle the overflows for these types, initial
+ * values are chosen that map to the correct initial array indexes, but will
+ * result in overflows soon.
+ *
+ *   BLK0_LPOS
+ *     The initial @head_lpos and @tail_lpos for data rings. It is at index
+ *     0 and the lpos value is such that it will overflow on the first wrap.
+ *
+ *   DESC0_ID
+ *     The initial @head_id and @tail_id for the desc ring. It is at the last
+ *     index of the descriptor array (see Req3 above) and the ID value is such
+ *     that it will overflow on the second wrap.
+ */
+#define BLK0_LPOS(sz_bits)	(-(_DATA_SIZE(sz_bits)))
+#define DESC0_ID(ct_bits)	DESC_ID(-(_DESCS_COUNT(ct_bits) + 1))
+#define DESC0_SV(ct_bits)	(DESC_COMMITTED_MASK | DESC_REUSE_MASK | \
+					DESC0_ID(ct_bits))
+
+/*
+ * Declare a ringbuffer with an external text data buffer. The same as
+ * DECLARE_PRINTKRB() but allows specifying an external buffer for the
+ * text data.
+ *
+ * Note: The specified external buffer must be of the size:
+ *       2 ^ (descbits + avgtextbits)
+ */
+#define _DECLARE_PRINTKRB(name, descbits, avgtextbits, avgdictbits,	\
+			  text_buf)					\
+char _##name##_dict[1U << ((avgdictbits) + (descbits))]			\
+	__aligned(__alignof__(unsigned long));				\
+struct prb_desc _##name##_descs[_DESCS_COUNT(descbits)] = {		\
+	/* this will be the first record reserved by a writer */	\
+	[0] = {								\
+			.info = {					\
+				/*
+				 * will be incremented to 0 on
+				 * the first reservation
+				 */					\
+				.seq = -(u64)_DESCS_COUNT(descbits),	\
+				},					\
+		},							\
+	/* the initial head and tail */					\
+	[_DESCS_COUNT(descbits) - 1] = {				\
+			.info = {					\
+				/*
+				 * reports the minimal seq value
+				 * during the bootstrap phase
+				 */					\
+				.seq = 0,				\
+				},					\
+			/* reusable */					\
+			.state_var	= ATOMIC_INIT(DESC0_SV(descbits)), \
+			/* no associated data block */			\
+			.text_blk_lpos	= INVALID_BLK_LPOS,		\
+			.dict_blk_lpos	= INVALID_BLK_LPOS,		\
+		},							\
+	};								\
+struct printk_ringbuffer name = {					\
+	.desc_ring = {							\
+		.count_bits	= descbits,				\
+		.descs		= &_##name##_descs[0],			\
+		.head_id	= ATOMIC_INIT(DESC0_ID(descbits)),	\
+		.tail_id	= ATOMIC_INIT(DESC0_ID(descbits)),	\
+	},								\
+	.text_data_ring = {						\
+		.size_bits	= (avgtextbits) + (descbits),		\
+		.data		= text_buf,				\
+		.head_lpos	= ATOMIC_LONG_INIT(BLK0_LPOS(		\
+					(avgtextbits) + (descbits))),	\
+		.tail_lpos	= ATOMIC_LONG_INIT(BLK0_LPOS(		\
+					(avgtextbits) + (descbits))),	\
+	},								\
+	.dict_data_ring = {						\
+		.size_bits	= (avgtextbits) + (descbits),		\
+		.data		= &_##name##_dict[0],			\
+		.head_lpos	= ATOMIC_LONG_INIT(BLK0_LPOS(		\
+					(avgtextbits) + (descbits))),	\
+		.tail_lpos	= ATOMIC_LONG_INIT(BLK0_LPOS(		\
+					(avgtextbits) + (descbits))),	\
+	},								\
+	.fail			= ATOMIC_LONG_INIT(0),			\
+}
+
+/**
+ * DECLARE_PRINTKRB() - Declare a ringbuffer.
+ *
+ * @name:        The name of the ringbuffer variable.
+ * @descbits:    The number of descriptors as a power-of-2 value.
+ * @avgtextbits: The average text data size per record as a power-of-2 value.
+ * @avgdictbits: The average dictionary data size per record as a
+ *               power-of-2 value.
+ *
+ * This is a macro for declaring a ringbuffer and all internal structures
+ * such that it is ready for immediate use. See _DECLARE_PRINTKRB() for a
+ * variant where the text data buffer can be specified externally.
+ */
+#define DECLARE_PRINTKRB(name, descbits, avgtextbits, avgdictbits)	\
+char _##name##_text[1U << ((avgtextbits) + (descbits))]			\
+	__aligned(__alignof__(unsigned long));				\
+_DECLARE_PRINTKRB(name, descbits, avgtextbits, avgdictbits,		\
+		  &_##name##_text[0])
+
+/* Writer Interface */
+
+/**
+ * prb_rec_init_wd() - Initialize a buffer for writing records.
+ *
+ * @r:             The record to initialize.
+ * @text_buf_size: The needed text buffer size.
+ * @dict_buf_size: The needed dictionary buffer size.
+ *
+ * Initialize all the fields that a writer is interested in. If
+ * @dict_buf_size is 0, a dictionary buffer will not be reserved.
+ * @text_buf_size must be greater than 0.
+ *
+ * Note that although @dict_buf_size may be initialized to non-zero,
+ * its value must be rechecked after a successful call to prb_reserve()
+ * to verify a dictionary buffer was actually reserved. Dictionary buffer
+ * reservation is allowed to fail.
+ */
+static inline void prb_rec_init_wr(struct printk_record *r,
+				   unsigned int text_buf_size,
+				   unsigned int dict_buf_size)
+{
+	r->info = NULL;
+	r->text_buf = NULL;
+	r->dict_buf = NULL;
+	r->text_buf_size = text_buf_size;
+	r->dict_buf_size = dict_buf_size;
+}
+
+bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
+		 struct printk_record *r);
+void prb_commit(struct prb_reserved_entry *e);
+
+void prb_init(struct printk_ringbuffer *rb,
+	      char *text_buf, unsigned int text_buf_size,
+	      char *dict_buf, unsigned int dict_buf_size,
+	      struct prb_desc *descs, unsigned int descs_count_bits);
+unsigned int prb_record_text_space(struct prb_reserved_entry *e);
+
+/* Reader Interface */
+
+/**
+ * prb_rec_init_rd() - Initialize a buffer for reading records.
+ *
+ * @r:             The record to initialize.
+ * @info:          A buffer to store record meta-data.
+ * @text_buf:      A buffer to store text data.
+ * @text_buf_size: The size of @text_buf.
+ * @dict_buf:      A buffer to store dictionary data.
+ * @dict_buf_size: The size of @dict_buf.
+ *
+ * Initialize all the fields that a reader is interested in. All arguments
+ * (except @r) are optional. Only record data for arguments that are
+ * non-NULL or non-zero will be read.
+ */
+static inline void prb_rec_init_rd(struct printk_record *r,
+				   struct printk_info *info,
+				   char *text_buf, unsigned int text_buf_size,
+				   char *dict_buf, unsigned int dict_buf_size)
+{
+	r->info = info;
+	r->text_buf = text_buf;
+	r->dict_buf = dict_buf;
+	r->text_buf_size = text_buf_size;
+	r->dict_buf_size = dict_buf_size;
+}
+
+/**
+ * prb_for_each_record() - Iterate over a ringbuffer.
+ *
+ * @from: The sequence number to begin with.
+ * @rb:   The ringbuffer to iterate over.
+ * @s:    A u64 to store the sequence number on each iteration.
+ * @r:    A printk_record to store the record on each iteration.
+ *
+ * This is a macro for conveniently iterating over a ringbuffer.
+ *
+ * Context: Any context.
+ */
+#define prb_for_each_record(from, rb, s, r)	\
+	for ((s) = from;			\
+	     prb_read_valid(rb, s, r);		\
+	     (s) = (r)->info->seq + 1)
+
+bool prb_read_valid(struct printk_ringbuffer *rb, u64 seq,
+		    struct printk_record *r);
+bool prb_read_valid_info(struct printk_ringbuffer *rb, u64 seq,
+			 struct printk_info *info, unsigned int *line_count);
+
+u64 prb_first_seq(struct printk_ringbuffer *rb);
+u64 prb_next_seq(struct printk_ringbuffer *rb);
+
+unsigned int prb_count_lines(char *text, unsigned int text_size);
+
+#endif /* _KERNEL_PRINTK_RINGBUFFER_H */
-- 
2.20.1


^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH v2 3/3] printk: use the lockless ringbuffer
  2020-05-01  9:40 [PATCH v2 0/3] printk: replace ringbuffer John Ogness
  2020-05-01  9:40 ` [PATCH v2 1/3] crash: add VMCOREINFO macro for anonymous structs John Ogness
  2020-05-01  9:40 ` [PATCH v2 2/3] printk: add lockless buffer John Ogness
@ 2020-05-01  9:40 ` John Ogness
  2020-05-06 14:50   ` John Ogness
  2020-05-13 12:05 ` [PATCH v2 0/3] printk: replace ringbuffer Prarit Bhargava
  2020-05-15 10:24 ` Sergey Senozhatsky
  4 siblings, 1 reply; 30+ messages in thread
From: John Ogness @ 2020-05-01  9:40 UTC (permalink / raw)
  To: Petr Mladek
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel

Replace the existing ringbuffer usage and implementation with
lockless ringbuffer usage. Even though the new ringbuffer does not
require locking, all existing locking is left in place. Therefore,
this change is purely replacing the underlining ringbuffer.

Changes that exist due to the ringbuffer replacement:

- The VMCOREINFO has been updated for the new structures.

- Dictionary data is now stored in a separate data buffer from the
  human-readable messages. The dictionary data buffer is set to the
  same size as the message buffer. Therefore, the total required
  memory for both dictionary and message data is
  2 * (2 ^ CONFIG_LOG_BUF_SHIFT) for the initial static buffers and
  2 * log_buf_len (the kernel parameter) for the dynamic buffers.

- Record meta-data is now stored in a separate array of descriptors.
  This is an additional 72 * (2 ^ (CONFIG_LOG_BUF_SHIFT - 5)) bytes
  for the static array and 72 * (log_buf_len >> 5) bytes for the
  dynamic array.

Signed-off-by: John Ogness <john.ogness@linutronix.de>
---
 include/linux/kmsg_dump.h |   2 -
 kernel/printk/printk.c    | 938 ++++++++++++++++++++------------------
 2 files changed, 492 insertions(+), 448 deletions(-)

diff --git a/include/linux/kmsg_dump.h b/include/linux/kmsg_dump.h
index 2e7a1e032c71..ae6265033e31 100644
--- a/include/linux/kmsg_dump.h
+++ b/include/linux/kmsg_dump.h
@@ -46,8 +46,6 @@ struct kmsg_dumper {
 	bool registered;
 
 	/* private state of the kmsg iterator */
-	u32 cur_idx;
-	u32 next_idx;
 	u64 cur_seq;
 	u64 next_seq;
 };
diff --git a/kernel/printk/printk.c b/kernel/printk/printk.c
index 9a9b6156270b..0d74ca748b82 100644
--- a/kernel/printk/printk.c
+++ b/kernel/printk/printk.c
@@ -56,6 +56,7 @@
 #define CREATE_TRACE_POINTS
 #include <trace/events/printk.h>
 
+#include "printk_ringbuffer.h"
 #include "console_cmdline.h"
 #include "braille.h"
 #include "internal.h"
@@ -294,30 +295,24 @@ enum con_msg_format_flags {
 static int console_msg_format = MSG_FORMAT_DEFAULT;
 
 /*
- * The printk log buffer consists of a chain of concatenated variable
- * length records. Every record starts with a record header, containing
- * the overall length of the record.
+ * The printk log buffer consists of a sequenced collection of records, each
+ * containing variable length message and dictionary text. Every record
+ * also contains its own meta-data (@info).
  *
- * The heads to the first and last entry in the buffer, as well as the
- * sequence numbers of these entries are maintained when messages are
- * stored.
+ * Every record meta-data carries the timestamp in microseconds, as well as
+ * the standard userspace syslog level and syslog facility. The usual kernel
+ * messages use LOG_KERN; userspace-injected messages always carry a matching
+ * syslog facility, by default LOG_USER. The origin of every message can be
+ * reliably determined that way.
  *
- * If the heads indicate available messages, the length in the header
- * tells the start next message. A length == 0 for the next message
- * indicates a wrap-around to the beginning of the buffer.
+ * The human readable log message of a record is available in @text, the
+ * length of the message text in @text_len. The stored message is not
+ * terminated.
  *
- * Every record carries the monotonic timestamp in microseconds, as well as
- * the standard userspace syslog level and syslog facility. The usual
- * kernel messages use LOG_KERN; userspace-injected messages always carry
- * a matching syslog facility, by default LOG_USER. The origin of every
- * message can be reliably determined that way.
- *
- * The human readable log message directly follows the message header. The
- * length of the message text is stored in the header, the stored message
- * is not terminated.
- *
- * Optionally, a message can carry a dictionary of properties (key/value pairs),
- * to provide userspace with a machine-readable message context.
+ * Optionally, a record can carry a dictionary of properties (key/value
+ * pairs), to provide userspace with a machine-readable message context. The
+ * length of the dictionary is available in @dict_len. The dictionary is not
+ * terminated.
  *
  * Examples for well-defined, commonly used property names are:
  *   DEVICE=b12:8               device identifier
@@ -331,21 +326,19 @@ static int console_msg_format = MSG_FORMAT_DEFAULT;
  * follows directly after a '=' character. Every property is terminated by
  * a '\0' character. The last property is not terminated.
  *
- * Example of a message structure:
- *   0000  ff 8f 00 00 00 00 00 00      monotonic time in nsec
- *   0008  34 00                        record is 52 bytes long
- *   000a        0b 00                  text is 11 bytes long
- *   000c              1f 00            dictionary is 23 bytes long
- *   000e                    03 00      LOG_KERN (facility) LOG_ERR (level)
- *   0010  69 74 27 73 20 61 20 6c      "it's a l"
- *         69 6e 65                     "ine"
- *   001b           44 45 56 49 43      "DEVIC"
- *         45 3d 62 38 3a 32 00 44      "E=b8:2\0D"
- *         52 49 56 45 52 3d 62 75      "RIVER=bu"
- *         67                           "g"
- *   0032     00 00 00                  padding to next message header
- *
- * The 'struct printk_log' buffer header must never be directly exported to
+ * Example of record values:
+ *   record.text_buf       = "it's a line" (unterminated)
+ *   record.dict_buf       = "DEVICE=b8:2\0DRIVER=bug" (unterminated)
+ *   record.info.seq       = 56
+ *   record.info.ts_nsec   = 36863
+ *   record.info.text_len  = 11
+ *   record.info.dict_len  = 22
+ *   record.info.facility  = 0 (LOG_KERN)
+ *   record.info.flags     = 0
+ *   record.info.level     = 3 (LOG_ERR)
+ *   record.info.caller_id = 299 (task 299)
+ *
+ * The 'struct printk_info' buffer must never be directly exported to
  * userspace, it is a kernel-private implementation detail that might
  * need to be changed in the future, when the requirements change.
  *
@@ -365,23 +358,6 @@ enum log_flags {
 	LOG_CONT	= 8,	/* text is a fragment of a continuation line */
 };
 
-struct printk_log {
-	u64 ts_nsec;		/* timestamp in nanoseconds */
-	u16 len;		/* length of entire record */
-	u16 text_len;		/* length of text buffer */
-	u16 dict_len;		/* length of dictionary buffer */
-	u8 facility;		/* syslog facility */
-	u8 flags:5;		/* internal record flags */
-	u8 level:3;		/* syslog level */
-#ifdef CONFIG_PRINTK_CALLER
-	u32 caller_id;            /* thread id or processor id */
-#endif
-}
-#ifdef CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS
-__packed __aligned(4)
-#endif
-;
-
 /*
  * The logbuf_lock protects kmsg buffer, indices, counters.  This can be taken
  * within the scheduler's rq lock. It must be released before calling
@@ -421,26 +397,16 @@ DEFINE_RAW_SPINLOCK(logbuf_lock);
 DECLARE_WAIT_QUEUE_HEAD(log_wait);
 /* the next printk record to read by syslog(READ) or /proc/kmsg */
 static u64 syslog_seq;
-static u32 syslog_idx;
 static size_t syslog_partial;
 static bool syslog_time;
 
-/* index and sequence number of the first record stored in the buffer */
-static u64 log_first_seq;
-static u32 log_first_idx;
-
-/* index and sequence number of the next record to store in the buffer */
-static u64 log_next_seq;
-static u32 log_next_idx;
-
 /* the next printk record to write to the console */
 static u64 console_seq;
-static u32 console_idx;
 static u64 exclusive_console_stop_seq;
+static unsigned long console_dropped;
 
 /* the next printk record to read after the last 'clear' command */
 static u64 clear_seq;
-static u32 clear_idx;
 
 #ifdef CONFIG_PRINTK_CALLER
 #define PREFIX_MAX		48
@@ -453,13 +419,28 @@ static u32 clear_idx;
 #define LOG_FACILITY(v)		((v) >> 3 & 0xff)
 
 /* record buffer */
-#define LOG_ALIGN __alignof__(struct printk_log)
+#define LOG_ALIGN __alignof__(unsigned long)
 #define __LOG_BUF_LEN (1 << CONFIG_LOG_BUF_SHIFT)
 #define LOG_BUF_LEN_MAX (u32)(1 << 31)
 static char __log_buf[__LOG_BUF_LEN] __aligned(LOG_ALIGN);
 static char *log_buf = __log_buf;
 static u32 log_buf_len = __LOG_BUF_LEN;
 
+/*
+ * Define the average message size. This only affects the number of
+ * descriptors that will be available. Underestimating is better than
+ * overestimating (too many available descriptors is better than not enough).
+ * The dictionary buffer will be the same size as the text buffer.
+ */
+#define PRB_AVGBITS 5	/* 32 character average length */
+
+_DECLARE_PRINTKRB(printk_rb_static, CONFIG_LOG_BUF_SHIFT - PRB_AVGBITS,
+		  PRB_AVGBITS, PRB_AVGBITS, &__log_buf[0]);
+
+static struct printk_ringbuffer printk_rb_dynamic;
+
+static struct printk_ringbuffer *prb = &printk_rb_static;
+
 /*
  * We cannot access per-CPU data (e.g. per-CPU flush irq_work) before
  * per_cpu_areas are initialised. This variable is set to true when
@@ -484,108 +465,6 @@ u32 log_buf_len_get(void)
 	return log_buf_len;
 }
 
-/* human readable text of the record */
-static char *log_text(const struct printk_log *msg)
-{
-	return (char *)msg + sizeof(struct printk_log);
-}
-
-/* optional key/value pair dictionary attached to the record */
-static char *log_dict(const struct printk_log *msg)
-{
-	return (char *)msg + sizeof(struct printk_log) + msg->text_len;
-}
-
-/* get record by index; idx must point to valid msg */
-static struct printk_log *log_from_idx(u32 idx)
-{
-	struct printk_log *msg = (struct printk_log *)(log_buf + idx);
-
-	/*
-	 * A length == 0 record is the end of buffer marker. Wrap around and
-	 * read the message at the start of the buffer.
-	 */
-	if (!msg->len)
-		return (struct printk_log *)log_buf;
-	return msg;
-}
-
-/* get next record; idx must point to valid msg */
-static u32 log_next(u32 idx)
-{
-	struct printk_log *msg = (struct printk_log *)(log_buf + idx);
-
-	/* length == 0 indicates the end of the buffer; wrap */
-	/*
-	 * A length == 0 record is the end of buffer marker. Wrap around and
-	 * read the message at the start of the buffer as *this* one, and
-	 * return the one after that.
-	 */
-	if (!msg->len) {
-		msg = (struct printk_log *)log_buf;
-		return msg->len;
-	}
-	return idx + msg->len;
-}
-
-/*
- * Check whether there is enough free space for the given message.
- *
- * The same values of first_idx and next_idx mean that the buffer
- * is either empty or full.
- *
- * If the buffer is empty, we must respect the position of the indexes.
- * They cannot be reset to the beginning of the buffer.
- */
-static int logbuf_has_space(u32 msg_size, bool empty)
-{
-	u32 free;
-
-	if (log_next_idx > log_first_idx || empty)
-		free = max(log_buf_len - log_next_idx, log_first_idx);
-	else
-		free = log_first_idx - log_next_idx;
-
-	/*
-	 * We need space also for an empty header that signalizes wrapping
-	 * of the buffer.
-	 */
-	return free >= msg_size + sizeof(struct printk_log);
-}
-
-static int log_make_free_space(u32 msg_size)
-{
-	while (log_first_seq < log_next_seq &&
-	       !logbuf_has_space(msg_size, false)) {
-		/* drop old messages until we have enough contiguous space */
-		log_first_idx = log_next(log_first_idx);
-		log_first_seq++;
-	}
-
-	if (clear_seq < log_first_seq) {
-		clear_seq = log_first_seq;
-		clear_idx = log_first_idx;
-	}
-
-	/* sequence numbers are equal, so the log buffer is empty */
-	if (logbuf_has_space(msg_size, log_first_seq == log_next_seq))
-		return 0;
-
-	return -ENOMEM;
-}
-
-/* compute the message size including the padding bytes */
-static u32 msg_used_size(u16 text_len, u16 dict_len, u32 *pad_len)
-{
-	u32 size;
-
-	size = sizeof(struct printk_log) + text_len + dict_len;
-	*pad_len = (-size) & (LOG_ALIGN - 1);
-	size += *pad_len;
-
-	return size;
-}
-
 /*
  * Define how much of the log buffer we could take at maximum. The value
  * must be greater than two. Note that only half of the buffer is available
@@ -594,22 +473,26 @@ static u32 msg_used_size(u16 text_len, u16 dict_len, u32 *pad_len)
 #define MAX_LOG_TAKE_PART 4
 static const char trunc_msg[] = "<truncated>";
 
-static u32 truncate_msg(u16 *text_len, u16 *trunc_msg_len,
-			u16 *dict_len, u32 *pad_len)
+static void truncate_msg(u16 *text_len, u16 *trunc_msg_len, u16 *dict_len)
 {
 	/*
 	 * The message should not take the whole buffer. Otherwise, it might
 	 * get removed too soon.
 	 */
 	u32 max_text_len = log_buf_len / MAX_LOG_TAKE_PART;
+
 	if (*text_len > max_text_len)
 		*text_len = max_text_len;
-	/* enable the warning message */
+
+	/* enable the warning message (if there is room) */
 	*trunc_msg_len = strlen(trunc_msg);
+	if (*text_len >= *trunc_msg_len)
+		*text_len -= *trunc_msg_len;
+	else
+		*trunc_msg_len = 0;
+
 	/* disable the "dict" completely */
 	*dict_len = 0;
-	/* compute the size again, count also the warning message */
-	return msg_used_size(*text_len + *trunc_msg_len, 0, pad_len);
 }
 
 /* insert record into the buffer, discard old ones, update heads */
@@ -618,60 +501,40 @@ static int log_store(u32 caller_id, int facility, int level,
 		     const char *dict, u16 dict_len,
 		     const char *text, u16 text_len)
 {
-	struct printk_log *msg;
-	u32 size, pad_len;
+	struct prb_reserved_entry e;
+	struct printk_record r;
 	u16 trunc_msg_len = 0;
 
-	/* number of '\0' padding bytes to next message */
-	size = msg_used_size(text_len, dict_len, &pad_len);
+	prb_rec_init_wr(&r, text_len, dict_len);
 
-	if (log_make_free_space(size)) {
+	if (!prb_reserve(&e, prb, &r)) {
 		/* truncate the message if it is too long for empty buffer */
-		size = truncate_msg(&text_len, &trunc_msg_len,
-				    &dict_len, &pad_len);
+		truncate_msg(&text_len, &trunc_msg_len, &dict_len);
+		prb_rec_init_wr(&r, text_len + trunc_msg_len, dict_len);
 		/* survive when the log buffer is too small for trunc_msg */
-		if (log_make_free_space(size))
+		if (!prb_reserve(&e, prb, &r))
 			return 0;
 	}
 
-	if (log_next_idx + size + sizeof(struct printk_log) > log_buf_len) {
-		/*
-		 * This message + an additional empty header does not fit
-		 * at the end of the buffer. Add an empty header with len == 0
-		 * to signify a wrap around.
-		 */
-		memset(log_buf + log_next_idx, 0, sizeof(struct printk_log));
-		log_next_idx = 0;
-	}
-
 	/* fill message */
-	msg = (struct printk_log *)(log_buf + log_next_idx);
-	memcpy(log_text(msg), text, text_len);
-	msg->text_len = text_len;
-	if (trunc_msg_len) {
-		memcpy(log_text(msg) + text_len, trunc_msg, trunc_msg_len);
-		msg->text_len += trunc_msg_len;
-	}
-	memcpy(log_dict(msg), dict, dict_len);
-	msg->dict_len = dict_len;
-	msg->facility = facility;
-	msg->level = level & 7;
-	msg->flags = flags & 0x1f;
+	memcpy(&r.text_buf[0], text, text_len);
+	if (trunc_msg_len)
+		memcpy(&r.text_buf[text_len], trunc_msg, trunc_msg_len);
+	if (r.dict_buf)
+		memcpy(&r.dict_buf[0], dict, dict_len);
+	r.info->facility = facility;
+	r.info->level = level & 7;
+	r.info->flags = flags & 0x1f;
 	if (ts_nsec > 0)
-		msg->ts_nsec = ts_nsec;
+		r.info->ts_nsec = ts_nsec;
 	else
-		msg->ts_nsec = local_clock();
-#ifdef CONFIG_PRINTK_CALLER
-	msg->caller_id = caller_id;
-#endif
-	memset(log_dict(msg) + dict_len, 0, pad_len);
-	msg->len = size;
+		r.info->ts_nsec = local_clock();
+	r.info->caller_id = caller_id;
 
 	/* insert message */
-	log_next_idx += msg->len;
-	log_next_seq++;
+	prb_commit(&e);
 
-	return msg->text_len;
+	return (text_len + trunc_msg_len);
 }
 
 int dmesg_restrict = IS_ENABLED(CONFIG_SECURITY_DMESG_RESTRICT);
@@ -723,13 +586,13 @@ static void append_char(char **pp, char *e, char c)
 		*(*pp)++ = c;
 }
 
-static ssize_t msg_print_ext_header(char *buf, size_t size,
-				    struct printk_log *msg, u64 seq)
+static ssize_t info_print_ext_header(char *buf, size_t size,
+				     struct printk_info *info)
 {
-	u64 ts_usec = msg->ts_nsec;
+	u64 ts_usec = info->ts_nsec;
 	char caller[20];
 #ifdef CONFIG_PRINTK_CALLER
-	u32 id = msg->caller_id;
+	u32 id = info->caller_id;
 
 	snprintf(caller, sizeof(caller), ",caller=%c%u",
 		 id & 0x80000000 ? 'C' : 'T', id & ~0x80000000);
@@ -740,8 +603,8 @@ static ssize_t msg_print_ext_header(char *buf, size_t size,
 	do_div(ts_usec, 1000);
 
 	return scnprintf(buf, size, "%u,%llu,%llu,%c%s;",
-			 (msg->facility << 3) | msg->level, seq, ts_usec,
-			 msg->flags & LOG_CONT ? 'c' : '-', caller);
+			 (info->facility << 3) | info->level, info->seq,
+			 ts_usec, info->flags & LOG_CONT ? 'c' : '-', caller);
 }
 
 static ssize_t msg_print_ext_body(char *buf, size_t size,
@@ -795,10 +658,14 @@ static ssize_t msg_print_ext_body(char *buf, size_t size,
 /* /dev/kmsg - userspace message inject/listen interface */
 struct devkmsg_user {
 	u64 seq;
-	u32 idx;
 	struct ratelimit_state rs;
 	struct mutex lock;
 	char buf[CONSOLE_EXT_LOG_MAX];
+
+	struct printk_info info;
+	char text_buf[CONSOLE_EXT_LOG_MAX];
+	char dict_buf[CONSOLE_EXT_LOG_MAX];
+	struct printk_record record;
 };
 
 static __printf(3, 4) __cold
@@ -881,7 +748,7 @@ static ssize_t devkmsg_read(struct file *file, char __user *buf,
 			    size_t count, loff_t *ppos)
 {
 	struct devkmsg_user *user = file->private_data;
-	struct printk_log *msg;
+	struct printk_record *r = &user->record;
 	size_t len;
 	ssize_t ret;
 
@@ -893,7 +760,7 @@ static ssize_t devkmsg_read(struct file *file, char __user *buf,
 		return ret;
 
 	logbuf_lock_irq();
-	while (user->seq == log_next_seq) {
+	if (!prb_read_valid(prb, user->seq, r)) {
 		if (file->f_flags & O_NONBLOCK) {
 			ret = -EAGAIN;
 			logbuf_unlock_irq();
@@ -902,30 +769,26 @@ static ssize_t devkmsg_read(struct file *file, char __user *buf,
 
 		logbuf_unlock_irq();
 		ret = wait_event_interruptible(log_wait,
-					       user->seq != log_next_seq);
+					prb_read_valid(prb, user->seq, r));
 		if (ret)
 			goto out;
 		logbuf_lock_irq();
 	}
 
-	if (user->seq < log_first_seq) {
+	if (user->seq < prb_first_seq(prb)) {
 		/* our last seen message is gone, return error and reset */
-		user->idx = log_first_idx;
-		user->seq = log_first_seq;
+		user->seq = prb_first_seq(prb);
 		ret = -EPIPE;
 		logbuf_unlock_irq();
 		goto out;
 	}
 
-	msg = log_from_idx(user->idx);
-	len = msg_print_ext_header(user->buf, sizeof(user->buf),
-				   msg, user->seq);
+	len = info_print_ext_header(user->buf, sizeof(user->buf), r->info);
 	len += msg_print_ext_body(user->buf + len, sizeof(user->buf) - len,
-				  log_dict(msg), msg->dict_len,
-				  log_text(msg), msg->text_len);
+				  &r->dict_buf[0], r->info->dict_len,
+				  &r->text_buf[0], r->info->text_len);
 
-	user->idx = log_next(user->idx);
-	user->seq++;
+	user->seq = r->info->seq + 1;
 	logbuf_unlock_irq();
 
 	if (len > count) {
@@ -957,8 +820,7 @@ static loff_t devkmsg_llseek(struct file *file, loff_t offset, int whence)
 	switch (whence) {
 	case SEEK_SET:
 		/* the first record */
-		user->idx = log_first_idx;
-		user->seq = log_first_seq;
+		user->seq = prb_first_seq(prb);
 		break;
 	case SEEK_DATA:
 		/*
@@ -966,13 +828,11 @@ static loff_t devkmsg_llseek(struct file *file, loff_t offset, int whence)
 		 * like issued by 'dmesg -c'. Reading /dev/kmsg itself
 		 * changes no global state, and does not clear anything.
 		 */
-		user->idx = clear_idx;
 		user->seq = clear_seq;
 		break;
 	case SEEK_END:
 		/* after the last record */
-		user->idx = log_next_idx;
-		user->seq = log_next_seq;
+		user->seq = prb_next_seq(prb);
 		break;
 	default:
 		ret = -EINVAL;
@@ -992,9 +852,9 @@ static __poll_t devkmsg_poll(struct file *file, poll_table *wait)
 	poll_wait(file, &log_wait, wait);
 
 	logbuf_lock_irq();
-	if (user->seq < log_next_seq) {
+	if (prb_read_valid(prb, user->seq, NULL)) {
 		/* return error when data has vanished underneath us */
-		if (user->seq < log_first_seq)
+		if (user->seq < prb_first_seq(prb))
 			ret = EPOLLIN|EPOLLRDNORM|EPOLLERR|EPOLLPRI;
 		else
 			ret = EPOLLIN|EPOLLRDNORM;
@@ -1029,9 +889,12 @@ static int devkmsg_open(struct inode *inode, struct file *file)
 
 	mutex_init(&user->lock);
 
+	prb_rec_init_rd(&user->record, &user->info,
+			&user->text_buf[0], sizeof(user->text_buf),
+			&user->dict_buf[0], sizeof(user->dict_buf));
+
 	logbuf_lock_irq();
-	user->idx = log_first_idx;
-	user->seq = log_first_seq;
+	user->seq = prb_first_seq(prb);
 	logbuf_unlock_irq();
 
 	file->private_data = user;
@@ -1072,23 +935,52 @@ const struct file_operations kmsg_fops = {
  */
 void log_buf_vmcoreinfo_setup(void)
 {
-	VMCOREINFO_SYMBOL(log_buf);
-	VMCOREINFO_SYMBOL(log_buf_len);
-	VMCOREINFO_SYMBOL(log_first_idx);
-	VMCOREINFO_SYMBOL(clear_idx);
-	VMCOREINFO_SYMBOL(log_next_idx);
+	VMCOREINFO_SYMBOL(prb);
+	VMCOREINFO_SYMBOL(printk_rb_static);
+	VMCOREINFO_SYMBOL(clear_seq);
+
 	/*
-	 * Export struct printk_log size and field offsets. User space tools can
+	 * Export struct size and field offsets. User space tools can
 	 * parse it and detect any changes to structure down the line.
 	 */
-	VMCOREINFO_STRUCT_SIZE(printk_log);
-	VMCOREINFO_OFFSET(printk_log, ts_nsec);
-	VMCOREINFO_OFFSET(printk_log, len);
-	VMCOREINFO_OFFSET(printk_log, text_len);
-	VMCOREINFO_OFFSET(printk_log, dict_len);
-#ifdef CONFIG_PRINTK_CALLER
-	VMCOREINFO_OFFSET(printk_log, caller_id);
-#endif
+
+	VMCOREINFO_STRUCT_SIZE(printk_ringbuffer);
+	VMCOREINFO_OFFSET(printk_ringbuffer, desc_ring);
+	VMCOREINFO_OFFSET(printk_ringbuffer, text_data_ring);
+	VMCOREINFO_OFFSET(printk_ringbuffer, dict_data_ring);
+	VMCOREINFO_OFFSET(printk_ringbuffer, fail);
+
+	VMCOREINFO_STRUCT_SIZE(prb_desc_ring);
+	VMCOREINFO_OFFSET(prb_desc_ring, count_bits);
+	VMCOREINFO_OFFSET(prb_desc_ring, descs);
+	VMCOREINFO_OFFSET(prb_desc_ring, head_id);
+	VMCOREINFO_OFFSET(prb_desc_ring, tail_id);
+
+	VMCOREINFO_STRUCT_SIZE(prb_desc);
+	VMCOREINFO_OFFSET(prb_desc, info);
+	VMCOREINFO_OFFSET(prb_desc, state_var);
+	VMCOREINFO_OFFSET(prb_desc, text_blk_lpos);
+	VMCOREINFO_OFFSET(prb_desc, dict_blk_lpos);
+
+	VMCOREINFO_STRUCT_SIZE(prb_data_blk_lpos);
+	VMCOREINFO_OFFSET(prb_data_blk_lpos, begin);
+	VMCOREINFO_OFFSET(prb_data_blk_lpos, next);
+
+	VMCOREINFO_STRUCT_SIZE(printk_info);
+	VMCOREINFO_OFFSET(printk_info, seq);
+	VMCOREINFO_OFFSET(printk_info, ts_nsec);
+	VMCOREINFO_OFFSET(printk_info, text_len);
+	VMCOREINFO_OFFSET(printk_info, dict_len);
+	VMCOREINFO_OFFSET(printk_info, caller_id);
+
+	VMCOREINFO_STRUCT_SIZE(prb_data_ring);
+	VMCOREINFO_OFFSET(prb_data_ring, size_bits);
+	VMCOREINFO_OFFSET(prb_data_ring, data);
+	VMCOREINFO_OFFSET(prb_data_ring, head_lpos);
+	VMCOREINFO_OFFSET(prb_data_ring, tail_lpos);
+
+	VMCOREINFO_SIZE(atomic_long_t);
+	VMCOREINFO_TYPE_OFFSET(atomic_long_t, counter);
 }
 #endif
 
@@ -1166,11 +1058,46 @@ static void __init set_percpu_data_ready(void)
 	__printk_percpu_data_ready = true;
 }
 
+static unsigned int __init add_to_rb(struct printk_ringbuffer *rb,
+				     struct printk_record *r)
+{
+	struct prb_reserved_entry e;
+	struct printk_record dest_r;
+
+	prb_rec_init_wr(&dest_r, r->info->text_len, r->info->dict_len);
+
+	if (!prb_reserve(&e, rb, &dest_r))
+		return 0;
+
+	memcpy(&dest_r.text_buf[0], &r->text_buf[0], dest_r.text_buf_size);
+	if (dest_r.dict_buf) {
+		memcpy(&dest_r.dict_buf[0], &r->dict_buf[0],
+		       dest_r.dict_buf_size);
+	}
+	dest_r.info->facility = r->info->facility;
+	dest_r.info->level = r->info->level;
+	dest_r.info->flags = r->info->flags;
+	dest_r.info->ts_nsec = r->info->ts_nsec;
+	dest_r.info->caller_id = r->info->caller_id;
+
+	prb_commit(&e);
+
+	return prb_record_text_space(&e);
+}
+
+static char setup_text_buf[CONSOLE_EXT_LOG_MAX] __initdata;
+static char setup_dict_buf[CONSOLE_EXT_LOG_MAX] __initdata;
+
 void __init setup_log_buf(int early)
 {
+	struct prb_desc *new_descs;
+	struct printk_info info;
+	struct printk_record r;
 	unsigned long flags;
+	char *new_dict_buf;
 	char *new_log_buf;
 	unsigned int free;
+	u64 seq;
 
 	/*
 	 * Some archs call setup_log_buf() multiple times - first is very
@@ -1191,17 +1118,50 @@ void __init setup_log_buf(int early)
 
 	new_log_buf = memblock_alloc(new_log_buf_len, LOG_ALIGN);
 	if (unlikely(!new_log_buf)) {
-		pr_err("log_buf_len: %lu bytes not available\n",
+		pr_err("log_buf_len: %lu text bytes not available\n",
+			new_log_buf_len);
+		return;
+	}
+
+	new_dict_buf = memblock_alloc(new_log_buf_len, LOG_ALIGN);
+	if (unlikely(!new_dict_buf)) {
+		/* dictionary failure is allowed */
+		pr_err("log_buf_len: %lu dict bytes not available\n",
 			new_log_buf_len);
+	}
+
+	new_descs = memblock_alloc((new_log_buf_len >> PRB_AVGBITS) *
+				   sizeof(struct prb_desc), LOG_ALIGN);
+	if (unlikely(!new_descs)) {
+		pr_err("log_buf_len: %lu desc bytes not available\n",
+			new_log_buf_len >> PRB_AVGBITS);
+		if (new_dict_buf)
+			memblock_free(__pa(new_dict_buf), new_log_buf_len);
+		memblock_free(__pa(new_log_buf), new_log_buf_len);
 		return;
 	}
 
+	prb_rec_init_rd(&r, &info,
+			&setup_text_buf[0], sizeof(setup_text_buf),
+			&setup_dict_buf[0], sizeof(setup_dict_buf));
+
 	logbuf_lock_irqsave(flags);
+
+	prb_init(&printk_rb_dynamic,
+		 new_log_buf, bits_per(new_log_buf_len) - 1,
+		 new_dict_buf, bits_per(new_log_buf_len) - 1,
+		 new_descs, (bits_per(new_log_buf_len) - 1) - PRB_AVGBITS);
+
 	log_buf_len = new_log_buf_len;
 	log_buf = new_log_buf;
 	new_log_buf_len = 0;
-	free = __LOG_BUF_LEN - log_next_idx;
-	memcpy(log_buf, __log_buf, __LOG_BUF_LEN);
+
+	free = __LOG_BUF_LEN;
+	prb_for_each_record(0, &printk_rb_static, seq, &r)
+		free -= add_to_rb(&printk_rb_dynamic, &r);
+
+	prb = &printk_rb_dynamic;
+
 	logbuf_unlock_irqrestore(flags);
 
 	pr_info("log_buf_len: %u bytes\n", log_buf_len);
@@ -1313,18 +1273,18 @@ static size_t print_caller(u32 id, char *buf)
 #define print_caller(id, buf) 0
 #endif
 
-static size_t print_prefix(const struct printk_log *msg, bool syslog,
-			   bool time, char *buf)
+static size_t info_print_prefix(const struct printk_info  *info, bool syslog,
+				bool time, char *buf)
 {
 	size_t len = 0;
 
 	if (syslog)
-		len = print_syslog((msg->facility << 3) | msg->level, buf);
+		len = print_syslog((info->facility << 3) | info->level, buf);
 
 	if (time)
-		len += print_time(msg->ts_nsec, buf + len);
+		len += print_time(info->ts_nsec, buf + len);
 
-	len += print_caller(msg->caller_id, buf + len);
+	len += print_caller(info->caller_id, buf + len);
 
 	if (IS_ENABLED(CONFIG_PRINTK_CALLER) || time) {
 		buf[len++] = ' ';
@@ -1334,72 +1294,150 @@ static size_t print_prefix(const struct printk_log *msg, bool syslog,
 	return len;
 }
 
-static size_t msg_print_text(const struct printk_log *msg, bool syslog,
-			     bool time, char *buf, size_t size)
+static size_t record_print_text(struct printk_record *r, bool syslog,
+				bool time)
 {
-	const char *text = log_text(msg);
-	size_t text_size = msg->text_len;
-	size_t len = 0;
+	size_t text_len = r->info->text_len;
+	size_t buf_size = r->text_buf_size;
+	char *text = r->text_buf;
 	char prefix[PREFIX_MAX];
-	const size_t prefix_len = print_prefix(msg, syslog, time, prefix);
+	bool truncated = false;
+	size_t prefix_len;
+	size_t line_len;
+	size_t len = 0;
+	char *next;
 
-	do {
-		const char *next = memchr(text, '\n', text_size);
-		size_t text_len;
+	prefix_len = info_print_prefix(r->info, syslog, time, prefix);
 
+	/*
+	 * Add the prefix for each line by shifting the rest of the text to
+	 * make room for each prefix. If the buffer is not large enough for
+	 * all the prefixes, then drop the trailing text and report the
+	 * largest length that includes full lines with their prefixes.
+	 *
+	 * @text_len: bytes of unprocessed text
+	 * @line_len: bytes of current line (newline _not_ included)
+	 * @text:     pointer to beginning of current line
+	 * @len:      number of bytes processed (size of r->text_buf done)
+	 */
+	for (;;) {
+		next = memchr(text, '\n', text_len);
 		if (next) {
-			text_len = next - text;
-			next++;
-			text_size -= next - text;
+			line_len = next - text;
 		} else {
-			text_len = text_size;
+			/*
+			 * No newline. If the text was previously truncated,
+			 * assume this line was truncated and do not include
+			 * it.
+			 */
+			if (truncated)
+				break;
+			line_len = text_len;
 		}
 
-		if (buf) {
-			if (prefix_len + text_len + 1 >= size - len)
+		/*
+		 * Ensure there is enough buffer available to shift this line
+		 * (and add a newline at the end).
+		 */
+		if (len + prefix_len + line_len + 1 > buf_size)
+			break;
+
+		/*
+		 * Ensure there is enough buffer available to shift all
+		 * remaining text (and add a newline at the end). If this
+		 * test fails, then there must be a newline (i.e.
+		 * text_len > line_len because the previous test succeeded).
+		 */
+		if (len + prefix_len + text_len + 1 > buf_size) {
+			/*
+			 * Truncate @text_len so that there is enough space
+			 * for a prefix. A newline will not be added because
+			 * the last line of the text is now truncated and
+			 * will not be included.
+			 */
+			text_len = (buf_size - len) - prefix_len;
+
+			/*
+			 * Ensure there is still a newline. Otherwise this
+			 * line may have been truncated and will not be
+			 * included.
+			 */
+			if (memchr(text, '\n', text_len) == NULL)
 				break;
 
-			memcpy(buf + len, prefix, prefix_len);
-			len += prefix_len;
-			memcpy(buf + len, text, text_len);
-			len += text_len;
-			buf[len++] = '\n';
-		} else {
-			/* SYSLOG_ACTION_* buffer size only calculation */
-			len += prefix_len + text_len + 1;
+			/* Note that the last line will not be included. */
+			truncated = true;
 		}
 
-		text = next;
-	} while (text);
+		memmove(text + prefix_len, text, text_len);
+		memcpy(text, prefix, prefix_len);
+
+		/* Advance beyond the newly added prefix and existing line. */
+		text += prefix_len + line_len;
+
+		/* The remaining text has only decreased by the line. */
+		text_len -= line_len;
+
+		len += prefix_len + line_len + 1;
+
+		if (text_len) {
+			/* Advance past the newline. */
+			text++;
+			text_len--;
+		} else {
+			/* The final line, add a newline. */
+			*text = '\n';
+			break;
+		}
+	}
 
 	return len;
 }
 
+static size_t get_record_text_size(struct printk_info *info,
+				   unsigned int line_count,
+				   bool syslog, bool time)
+{
+	char prefix[PREFIX_MAX];
+	size_t prefix_len;
+
+	prefix_len = info_print_prefix(info, syslog, time, prefix);
+
+	/*
+	 * Each line will be preceded with a prefix. The intermediate
+	 * newlines are already within the text, but a final trailing
+	 * newline will be added.
+	 */
+	return ((prefix_len * line_count) + info->text_len + 1);
+}
+
 static int syslog_print(char __user *buf, int size)
 {
+	struct printk_info info;
+	struct printk_record r;
 	char *text;
-	struct printk_log *msg;
 	int len = 0;
 
 	text = kmalloc(LOG_LINE_MAX + PREFIX_MAX, GFP_KERNEL);
 	if (!text)
 		return -ENOMEM;
 
+	prb_rec_init_rd(&r, &info, text, LOG_LINE_MAX + PREFIX_MAX, NULL, 0);
+
 	while (size > 0) {
 		size_t n;
 		size_t skip;
 
 		logbuf_lock_irq();
-		if (syslog_seq < log_first_seq) {
-			/* messages are gone, move to first one */
-			syslog_seq = log_first_seq;
-			syslog_idx = log_first_idx;
-			syslog_partial = 0;
-		}
-		if (syslog_seq == log_next_seq) {
+		if (!prb_read_valid(prb, syslog_seq, &r)) {
 			logbuf_unlock_irq();
 			break;
 		}
+		if (r.info->seq != syslog_seq) {
+			/* message is gone, move to next valid one */
+			syslog_seq = r.info->seq;
+			syslog_partial = 0;
+		}
 
 		/*
 		 * To keep reading/counting partial line consistent,
@@ -1409,13 +1447,10 @@ static int syslog_print(char __user *buf, int size)
 			syslog_time = printk_time;
 
 		skip = syslog_partial;
-		msg = log_from_idx(syslog_idx);
-		n = msg_print_text(msg, true, syslog_time, text,
-				   LOG_LINE_MAX + PREFIX_MAX);
+		n = record_print_text(&r, true, syslog_time);
 		if (n - syslog_partial <= size) {
 			/* message fits into buffer, move forward */
-			syslog_idx = log_next(syslog_idx);
-			syslog_seq++;
+			syslog_seq = r.info->seq + 1;
 			n -= syslog_partial;
 			syslog_partial = 0;
 		} else if (!len){
@@ -1446,55 +1481,49 @@ static int syslog_print(char __user *buf, int size)
 
 static int syslog_print_all(char __user *buf, int size, bool clear)
 {
+	struct printk_info info;
+	unsigned int line_count;
+	struct printk_record r;
 	char *text;
 	int len = 0;
-	u64 next_seq;
 	u64 seq;
-	u32 idx;
 	bool time;
 
 	text = kmalloc(LOG_LINE_MAX + PREFIX_MAX, GFP_KERNEL);
 	if (!text)
 		return -ENOMEM;
 
+	prb_rec_init_rd(&r, &info, text, LOG_LINE_MAX + PREFIX_MAX, NULL, 0);
+
 	time = printk_time;
 	logbuf_lock_irq();
 	/*
 	 * Find first record that fits, including all following records,
 	 * into the user-provided buffer for this dump.
 	 */
-	seq = clear_seq;
-	idx = clear_idx;
-	while (seq < log_next_seq) {
-		struct printk_log *msg = log_from_idx(idx);
-
-		len += msg_print_text(msg, true, time, NULL, 0);
-		idx = log_next(idx);
-		seq++;
+	prb_for_each_record(clear_seq, prb, seq, &r) {
+		line_count = prb_count_lines(text, r.info->text_len);
+		len += get_record_text_size(r.info, line_count, true, time);
 	}
 
 	/* move first record forward until length fits into the buffer */
-	seq = clear_seq;
-	idx = clear_idx;
-	while (len > size && seq < log_next_seq) {
-		struct printk_log *msg = log_from_idx(idx);
-
-		len -= msg_print_text(msg, true, time, NULL, 0);
-		idx = log_next(idx);
-		seq++;
+	prb_for_each_record(clear_seq, prb, seq, &r) {
+		if (len <= size)
+			break;
+		line_count = prb_count_lines(text, r.info->text_len);
+		len -= get_record_text_size(r.info, line_count, true, time);
 	}
 
-	/* last message fitting into this dump */
-	next_seq = log_next_seq;
-
 	len = 0;
-	while (len >= 0 && seq < next_seq) {
-		struct printk_log *msg = log_from_idx(idx);
-		int textlen = msg_print_text(msg, true, time, text,
-					     LOG_LINE_MAX + PREFIX_MAX);
+	prb_for_each_record(seq, prb, seq, &r) {
+		int textlen;
 
-		idx = log_next(idx);
-		seq++;
+		textlen = record_print_text(&r, true, time);
+
+		if (len + textlen > size) {
+			seq--;
+			break;
+		}
 
 		logbuf_unlock_irq();
 		if (copy_to_user(buf + len, text, textlen))
@@ -1503,17 +1532,12 @@ static int syslog_print_all(char __user *buf, int size, bool clear)
 			len += textlen;
 		logbuf_lock_irq();
 
-		if (seq < log_first_seq) {
-			/* messages are gone, move to next one */
-			seq = log_first_seq;
-			idx = log_first_idx;
-		}
+		if (len < 0)
+			break;
 	}
 
-	if (clear) {
-		clear_seq = log_next_seq;
-		clear_idx = log_next_idx;
-	}
+	if (clear)
+		clear_seq = seq;
 	logbuf_unlock_irq();
 
 	kfree(text);
@@ -1523,8 +1547,7 @@ static int syslog_print_all(char __user *buf, int size, bool clear)
 static void syslog_clear(void)
 {
 	logbuf_lock_irq();
-	clear_seq = log_next_seq;
-	clear_idx = log_next_idx;
+	clear_seq = prb_next_seq(prb);
 	logbuf_unlock_irq();
 }
 
@@ -1532,6 +1555,7 @@ int do_syslog(int type, char __user *buf, int len, int source)
 {
 	bool clear = false;
 	static int saved_console_loglevel = LOGLEVEL_DEFAULT;
+	char *text = NULL;
 	int error;
 
 	error = check_syslog_permissions(type, source);
@@ -1551,7 +1575,7 @@ int do_syslog(int type, char __user *buf, int len, int source)
 		if (!access_ok(buf, len))
 			return -EFAULT;
 		error = wait_event_interruptible(log_wait,
-						 syslog_seq != log_next_seq);
+				prb_read_valid(prb, syslog_seq, NULL));
 		if (error)
 			return error;
 		error = syslog_print(buf, len);
@@ -1599,11 +1623,15 @@ int do_syslog(int type, char __user *buf, int len, int source)
 		break;
 	/* Number of chars in the log buffer */
 	case SYSLOG_ACTION_SIZE_UNREAD:
+		if (source != SYSLOG_FROM_PROC) {
+			text = kmalloc(LOG_LINE_MAX + PREFIX_MAX, GFP_KERNEL);
+			if (!text)
+				return -ENOMEM;
+		}
 		logbuf_lock_irq();
-		if (syslog_seq < log_first_seq) {
+		if (syslog_seq < prb_first_seq(prb)) {
 			/* messages are gone, move to first one */
-			syslog_seq = log_first_seq;
-			syslog_idx = log_first_idx;
+			syslog_seq = prb_first_seq(prb);
 			syslog_partial = 0;
 		}
 		if (source == SYSLOG_FROM_PROC) {
@@ -1612,24 +1640,29 @@ int do_syslog(int type, char __user *buf, int len, int source)
 			 * for pending data, not the size; return the count of
 			 * records, not the length.
 			 */
-			error = log_next_seq - syslog_seq;
+			error = prb_next_seq(prb) - syslog_seq;
 		} else {
-			u64 seq = syslog_seq;
-			u32 idx = syslog_idx;
 			bool time = syslog_partial ? syslog_time : printk_time;
-
-			while (seq < log_next_seq) {
-				struct printk_log *msg = log_from_idx(idx);
-
-				error += msg_print_text(msg, true, time, NULL,
-							0);
+			struct printk_info info;
+			unsigned int line_count;
+			struct printk_record r;
+			u64 seq;
+
+			prb_rec_init_rd(&r, &info, text,
+					LOG_LINE_MAX + PREFIX_MAX, NULL, 0);
+
+			prb_for_each_record(syslog_seq, prb, seq, &r) {
+				line_count = prb_count_lines(text,
+							r.info->text_len);
+				error += get_record_text_size(r.info,
+							      line_count,
+							      true, time);
 				time = printk_time;
-				idx = log_next(idx);
-				seq++;
 			}
 			error -= syslog_partial;
 		}
 		logbuf_unlock_irq();
+		kfree(text);
 		break;
 	/* Size of the log buffer */
 	case SYSLOG_ACTION_SIZE_BUFFER:
@@ -1796,10 +1829,22 @@ static int console_trylock_spinning(void)
 static void call_console_drivers(const char *ext_text, size_t ext_len,
 				 const char *text, size_t len)
 {
+	static char dropped_text[64];
+	size_t dropped_len = 0;
 	struct console *con;
 
 	trace_console_rcuidle(text, len);
 
+	if (!console_drivers)
+		return;
+
+	if (console_dropped) {
+		dropped_len = sprintf(dropped_text,
+				      "** %lu printk messages dropped **\n",
+				      console_dropped);
+		console_dropped = 0;
+	}
+
 	for_each_console(con) {
 		if (exclusive_console && con != exclusive_console)
 			continue;
@@ -1812,8 +1857,11 @@ static void call_console_drivers(const char *ext_text, size_t ext_len,
 			continue;
 		if (con->flags & CON_EXTENDED)
 			con->write(con, ext_text, ext_len);
-		else
+		else {
+			if (dropped_len)
+				con->write(con, dropped_text, dropped_len);
 			con->write(con, text, len);
+		}
 	}
 }
 
@@ -1983,7 +2031,6 @@ asmlinkage int vprintk_emit(int facility, int level,
 	int printed_len;
 	bool in_sched = false, pending_output;
 	unsigned long flags;
-	u64 curr_log_seq;
 
 	/* Suppress unimportant messages after panic happens */
 	if (unlikely(suppress_printk))
@@ -1999,9 +2046,9 @@ asmlinkage int vprintk_emit(int facility, int level,
 
 	/* This stops the holder of console_sem just where we want him */
 	logbuf_lock_irqsave(flags);
-	curr_log_seq = log_next_seq;
+	pending_output = !prb_read_valid(prb, console_seq, NULL);
 	printed_len = vprintk_store(facility, level, dict, dictlen, fmt, args);
-	pending_output = (curr_log_seq != log_next_seq);
+	pending_output &= prb_read_valid(prb, console_seq, NULL);
 	logbuf_unlock_irqrestore(flags);
 
 	/* If called from the scheduler, we can not call up(). */
@@ -2091,21 +2138,24 @@ EXPORT_SYMBOL(printk);
 #define PREFIX_MAX		0
 #define printk_time		false
 
+#define prb_read_valid(rb, seq, r)	false
+#define prb_first_seq(rb)		0
+
 static u64 syslog_seq;
-static u32 syslog_idx;
 static u64 console_seq;
-static u32 console_idx;
 static u64 exclusive_console_stop_seq;
-static u64 log_first_seq;
-static u32 log_first_idx;
-static u64 log_next_seq;
-static char *log_text(const struct printk_log *msg) { return NULL; }
-static char *log_dict(const struct printk_log *msg) { return NULL; }
-static struct printk_log *log_from_idx(u32 idx) { return NULL; }
-static u32 log_next(u32 idx) { return 0; }
-static ssize_t msg_print_ext_header(char *buf, size_t size,
-				    struct printk_log *msg,
-				    u64 seq) { return 0; }
+static unsigned long console_dropped;
+
+static size_t record_print_text(const struct printk_record *r,
+				bool syslog, bool time)
+{
+	return 0;
+}
+static ssize_t info_print_ext_header(char *buf, size_t size,
+				     struct printk_info *info)
+{
+	return 0;
+}
 static ssize_t msg_print_ext_body(char *buf, size_t size,
 				  char *dict, size_t dict_len,
 				  char *text, size_t text_len) { return 0; }
@@ -2113,8 +2163,6 @@ static void console_lock_spinning_enable(void) { }
 static int console_lock_spinning_disable_and_check(void) { return 0; }
 static void call_console_drivers(const char *ext_text, size_t ext_len,
 				 const char *text, size_t len) {}
-static size_t msg_print_text(const struct printk_log *msg, bool syslog,
-			     bool time, char *buf, size_t size) { return 0; }
 static bool suppress_message_printing(int level) { return false; }
 
 #endif /* CONFIG_PRINTK */
@@ -2395,12 +2443,17 @@ void console_unlock(void)
 	static char text[LOG_LINE_MAX + PREFIX_MAX];
 	unsigned long flags;
 	bool do_cond_resched, retry;
+	struct printk_info info;
+	struct printk_record r;
 
 	if (console_suspended) {
 		up_console_sem();
 		return;
 	}
 
+	prb_rec_init_rd(&r, &info, text, sizeof(text), ext_text,
+			sizeof(ext_text));
+
 	/*
 	 * Console drivers are called with interrupts disabled, so
 	 * @console_may_schedule should be cleared before; however, we may
@@ -2431,35 +2484,26 @@ void console_unlock(void)
 	}
 
 	for (;;) {
-		struct printk_log *msg;
 		size_t ext_len = 0;
 		size_t len;
 
 		printk_safe_enter_irqsave(flags);
 		raw_spin_lock(&logbuf_lock);
-		if (console_seq < log_first_seq) {
-			len = sprintf(text,
-				      "** %llu printk messages dropped **\n",
-				      log_first_seq - console_seq);
-
-			/* messages are gone, move to first one */
-			console_seq = log_first_seq;
-			console_idx = log_first_idx;
-		} else {
-			len = 0;
-		}
 skip:
-		if (console_seq == log_next_seq)
+		if (!prb_read_valid(prb, console_seq, &r))
 			break;
 
-		msg = log_from_idx(console_idx);
-		if (suppress_message_printing(msg->level)) {
+		if (console_seq != r.info->seq) {
+			console_dropped += r.info->seq - console_seq;
+			console_seq = r.info->seq;
+		}
+
+		if (suppress_message_printing(r.info->level)) {
 			/*
 			 * Skip record we have buffered and already printed
 			 * directly to the console when we received it, and
 			 * record that has level above the console loglevel.
 			 */
-			console_idx = log_next(console_idx);
 			console_seq++;
 			goto skip;
 		}
@@ -2470,19 +2514,20 @@ void console_unlock(void)
 			exclusive_console = NULL;
 		}
 
-		len += msg_print_text(msg,
+		len = record_print_text(&r,
 				console_msg_format & MSG_FORMAT_SYSLOG,
-				printk_time, text + len, sizeof(text) - len);
+				printk_time);
 		if (nr_ext_console_drivers) {
-			ext_len = msg_print_ext_header(ext_text,
+			ext_len = info_print_ext_header(ext_text,
 						sizeof(ext_text),
-						msg, console_seq);
+						r.info);
 			ext_len += msg_print_ext_body(ext_text + ext_len,
 						sizeof(ext_text) - ext_len,
-						log_dict(msg), msg->dict_len,
-						log_text(msg), msg->text_len);
+						&r.dict_buf[0],
+						r.info->dict_len,
+						&r.text_buf[0],
+						r.info->text_len);
 		}
-		console_idx = log_next(console_idx);
 		console_seq++;
 		raw_spin_unlock(&logbuf_lock);
 
@@ -2522,7 +2567,7 @@ void console_unlock(void)
 	 * flush, no worries.
 	 */
 	raw_spin_lock(&logbuf_lock);
-	retry = console_seq != log_next_seq;
+	retry = prb_read_valid(prb, console_seq, NULL);
 	raw_spin_unlock(&logbuf_lock);
 	printk_safe_exit_irqrestore(flags);
 
@@ -2591,8 +2636,7 @@ void console_flush_on_panic(enum con_flush_mode mode)
 		unsigned long flags;
 
 		logbuf_lock_irqsave(flags);
-		console_seq = log_first_seq;
-		console_idx = log_first_idx;
+		console_seq = prb_first_seq(prb);
 		logbuf_unlock_irqrestore(flags);
 	}
 	console_unlock();
@@ -2805,7 +2849,6 @@ void register_console(struct console *newcon)
 		exclusive_console = newcon;
 		exclusive_console_stop_seq = console_seq;
 		console_seq = syslog_seq;
-		console_idx = syslog_idx;
 		logbuf_unlock_irqrestore(flags);
 	}
 	console_unlock();
@@ -3170,9 +3213,7 @@ void kmsg_dump(enum kmsg_dump_reason reason)
 
 		logbuf_lock_irqsave(flags);
 		dumper->cur_seq = clear_seq;
-		dumper->cur_idx = clear_idx;
-		dumper->next_seq = log_next_seq;
-		dumper->next_idx = log_next_idx;
+		dumper->next_seq = prb_next_seq(prb);
 		logbuf_unlock_irqrestore(flags);
 
 		/* invoke dumper which will iterate over records */
@@ -3206,28 +3247,33 @@ void kmsg_dump(enum kmsg_dump_reason reason)
 bool kmsg_dump_get_line_nolock(struct kmsg_dumper *dumper, bool syslog,
 			       char *line, size_t size, size_t *len)
 {
-	struct printk_log *msg;
+	struct printk_info info;
+	unsigned int line_count;
+	struct printk_record r;
 	size_t l = 0;
 	bool ret = false;
 
+	prb_rec_init_rd(&r, &info, line, size, NULL, 0);
+
 	if (!dumper->active)
 		goto out;
 
-	if (dumper->cur_seq < log_first_seq) {
-		/* messages are gone, move to first available one */
-		dumper->cur_seq = log_first_seq;
-		dumper->cur_idx = log_first_idx;
-	}
-
-	/* last entry */
-	if (dumper->cur_seq >= log_next_seq)
-		goto out;
+	/* Read text or count text lines? */
+	if (line) {
+		if (!prb_read_valid(prb, dumper->cur_seq, &r))
+			goto out;
+		l = record_print_text(&r, syslog, printk_time);
+	} else {
+		if (!prb_read_valid_info(prb, dumper->cur_seq,
+					 &info, &line_count)) {
+			goto out;
+		}
+		l = get_record_text_size(&info, line_count, syslog,
+					 printk_time);
 
-	msg = log_from_idx(dumper->cur_idx);
-	l = msg_print_text(msg, syslog, printk_time, line, size);
+	}
 
-	dumper->cur_idx = log_next(dumper->cur_idx);
-	dumper->cur_seq++;
+	dumper->cur_seq = r.info->seq + 1;
 	ret = true;
 out:
 	if (len)
@@ -3288,23 +3334,25 @@ EXPORT_SYMBOL_GPL(kmsg_dump_get_line);
 bool kmsg_dump_get_buffer(struct kmsg_dumper *dumper, bool syslog,
 			  char *buf, size_t size, size_t *len)
 {
+	struct printk_info info;
+	unsigned int line_count;
+	struct printk_record r;
 	unsigned long flags;
 	u64 seq;
-	u32 idx;
 	u64 next_seq;
-	u32 next_idx;
 	size_t l = 0;
 	bool ret = false;
 	bool time = printk_time;
 
-	if (!dumper->active)
+	prb_rec_init_rd(&r, &info, buf, size, NULL, 0);
+
+	if (!dumper->active || !buf || !size)
 		goto out;
 
 	logbuf_lock_irqsave(flags);
-	if (dumper->cur_seq < log_first_seq) {
+	if (dumper->cur_seq < prb_first_seq(prb)) {
 		/* messages are gone, move to first available one */
-		dumper->cur_seq = log_first_seq;
-		dumper->cur_idx = log_first_idx;
+		dumper->cur_seq = prb_first_seq(prb);
 	}
 
 	/* last entry */
@@ -3315,41 +3363,41 @@ bool kmsg_dump_get_buffer(struct kmsg_dumper *dumper, bool syslog,
 
 	/* calculate length of entire buffer */
 	seq = dumper->cur_seq;
-	idx = dumper->cur_idx;
-	while (seq < dumper->next_seq) {
-		struct printk_log *msg = log_from_idx(idx);
-
-		l += msg_print_text(msg, true, time, NULL, 0);
-		idx = log_next(idx);
-		seq++;
+	while (prb_read_valid_info(prb, seq, &info, &line_count)) {
+		if (r.info->seq >= dumper->next_seq)
+			break;
+		l += get_record_text_size(&info, line_count, true, time);
+		seq = r.info->seq + 1;
 	}
 
 	/* move first record forward until length fits into the buffer */
 	seq = dumper->cur_seq;
-	idx = dumper->cur_idx;
-	while (l >= size && seq < dumper->next_seq) {
-		struct printk_log *msg = log_from_idx(idx);
-
-		l -= msg_print_text(msg, true, time, NULL, 0);
-		idx = log_next(idx);
-		seq++;
+	while (l >= size && prb_read_valid_info(prb, seq,
+						&info, &line_count)) {
+		if (r.info->seq >= dumper->next_seq)
+			break;
+		l -= get_record_text_size(&info, line_count, true, time);
+		seq = r.info->seq + 1;
 	}
 
 	/* last message in next interation */
 	next_seq = seq;
-	next_idx = idx;
 
+	/* actually read text into the buffer now */
 	l = 0;
-	while (seq < dumper->next_seq) {
-		struct printk_log *msg = log_from_idx(idx);
+	while (prb_read_valid(prb, seq, &r)) {
+		if (r.info->seq >= dumper->next_seq)
+			break;
+
+		l += record_print_text(&r, syslog, time);
+
+		/* adjust record to store to remaining buffer space */
+		prb_rec_init_rd(&r, &info, buf + l, size - l, NULL, 0);
 
-		l += msg_print_text(msg, syslog, time, buf + l, size - l);
-		idx = log_next(idx);
-		seq++;
+		seq = r.info->seq + 1;
 	}
 
 	dumper->next_seq = next_seq;
-	dumper->next_idx = next_idx;
 	ret = true;
 	logbuf_unlock_irqrestore(flags);
 out:
@@ -3372,9 +3420,7 @@ EXPORT_SYMBOL_GPL(kmsg_dump_get_buffer);
 void kmsg_dump_rewind_nolock(struct kmsg_dumper *dumper)
 {
 	dumper->cur_seq = clear_seq;
-	dumper->cur_idx = clear_idx;
-	dumper->next_seq = log_next_seq;
-	dumper->next_idx = log_next_idx;
+	dumper->next_seq = prb_next_seq(prb);
 }
 
 /**
-- 
2.20.1


^ permalink raw reply related	[flat|nested] 30+ messages in thread

* Re: [PATCH v2 3/3] printk: use the lockless ringbuffer
  2020-05-01  9:40 ` [PATCH v2 3/3] printk: use the lockless ringbuffer John Ogness
@ 2020-05-06 14:50   ` John Ogness
  0 siblings, 0 replies; 30+ messages in thread
From: John Ogness @ 2020-05-06 14:50 UTC (permalink / raw)
  To: Petr Mladek
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel

Hi,

I discovered a bug while preparing my next series, which also made me
realize I had never tested the extended mode feature of netconsole. :-/
The only other user of extended output is /dev/kmsg, and it is doing it
correctly.

Explanation and patch below.

On 2020-05-01, John Ogness <john.ogness@linutronix.de> wrote:
> diff --git a/kernel/printk/printk.c b/kernel/printk/printk.c
> index 9a9b6156270b..0d74ca748b82 100644
> --- a/kernel/printk/printk.c
> +++ b/kernel/printk/printk.c
> @@ -2395,12 +2443,17 @@ void console_unlock(void)
>  	static char text[LOG_LINE_MAX + PREFIX_MAX];
>  	unsigned long flags;
>  	bool do_cond_resched, retry;
> +	struct printk_info info;
> +	struct printk_record r;
>  
>  	if (console_suspended) {
>  		up_console_sem();
>  		return;
>  	}
>  
> +	prb_rec_init_rd(&r, &info, text, sizeof(text), ext_text,
> +			sizeof(ext_text));
> +

Using @ext_text for the dictionary buffer. But extended mode will need
that buffer for generating extended output!

>  	/*
>  	 * Console drivers are called with interrupts disabled, so
>  	 * @console_may_schedule should be cleared before; however, we may
> @@ -2470,19 +2514,20 @@ void console_unlock(void)
>  			exclusive_console = NULL;
>  		}
>  
> -		len += msg_print_text(msg,
> +		len = record_print_text(&r,
>  				console_msg_format & MSG_FORMAT_SYSLOG,
> -				printk_time, text + len, sizeof(text) - len);
> +				printk_time);
>  		if (nr_ext_console_drivers) {
> -			ext_len = msg_print_ext_header(ext_text,
> +			ext_len = info_print_ext_header(ext_text,
>  						sizeof(ext_text),
> -						msg, console_seq);
> +						r.info);
>  			ext_len += msg_print_ext_body(ext_text + ext_len,
>  						sizeof(ext_text) - ext_len,
> -						log_dict(msg), msg->dict_len,
> -						log_text(msg), msg->text_len);
> +						&r.dict_buf[0],
> +						r.info->dict_len,
> +						&r.text_buf[0],
> +						r.info->text_len);

2 problems here:

First, record_print_text() works by modifying the record text buffer
"in-place". So when msg_print_ext_body() creates the extended output, it
will be using a modified text buffer (i.e. one with prefix added).

Second, info_print_ext_header() and msg_print_ext_body() are clobbering
the dictionary buffer because @ext_text _is_ @r.dict_buf.

This can be resolved by giving the record a separate dictionary buffer
and calling record_print_text() after preparing any extended console
text. Patch below...

John Ogness

diff --git a/kernel/printk/printk.c b/kernel/printk/printk.c
index 0d74ca748b82..cc79ad67e6e3 100644
--- a/kernel/printk/printk.c
+++ b/kernel/printk/printk.c
@@ -2441,6 +2441,7 @@ void console_unlock(void)
 {
 	static char ext_text[CONSOLE_EXT_LOG_MAX];
 	static char text[LOG_LINE_MAX + PREFIX_MAX];
+	static char dict[LOG_LINE_MAX];
 	unsigned long flags;
 	bool do_cond_resched, retry;
 	struct printk_info info;
@@ -2451,8 +2452,7 @@ void console_unlock(void)
 		return;
 	}
 
-	prb_rec_init_rd(&r, &info, text, sizeof(text), ext_text,
-			sizeof(ext_text));
+	prb_rec_init_rd(&r, &info, text, sizeof(text), dict, sizeof(dict));
 
 	/*
 	 * Console drivers are called with interrupts disabled, so
@@ -2514,9 +2514,10 @@ void console_unlock(void)
 			exclusive_console = NULL;
 		}
 
-		len = record_print_text(&r,
-				console_msg_format & MSG_FORMAT_SYSLOG,
-				printk_time);
+		/*
+		 * Handle extended console text first because later
+		 * record_print_text() will modify the record buffer in-place.
+		 */
 		if (nr_ext_console_drivers) {
 			ext_len = info_print_ext_header(ext_text,
 						sizeof(ext_text),
@@ -2528,6 +2529,9 @@ void console_unlock(void)
 						&r.text_buf[0],
 						r.info->text_len);
 		}
+		len = record_print_text(&r,
+				console_msg_format & MSG_FORMAT_SYSLOG,
+				printk_time);
 		console_seq++;
 		raw_spin_unlock(&logbuf_lock);
 
-- 
2.20.1

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* Re: [PATCH v2 0/3] printk: replace ringbuffer
  2020-05-01  9:40 [PATCH v2 0/3] printk: replace ringbuffer John Ogness
                   ` (2 preceding siblings ...)
  2020-05-01  9:40 ` [PATCH v2 3/3] printk: use the lockless ringbuffer John Ogness
@ 2020-05-13 12:05 ` Prarit Bhargava
  2020-05-15 10:24 ` Sergey Senozhatsky
  4 siblings, 0 replies; 30+ messages in thread
From: Prarit Bhargava @ 2020-05-13 12:05 UTC (permalink / raw)
  To: John Ogness, Petr Mladek
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel



On 5/1/20 5:40 AM, John Ogness wrote:
> Hello,
> 
> Here is a v2 for the first series to rework the printk subsystem. The
> v1 and history are here [0]. This first series only replaces the
> existing ringbuffer implementation. No locking is removed. No
> semantics/behavior of printk are changed.
> 
> The VMCOREINFO is updated. RFC patches for the external tools
> crash(8) [1] and makedumpfile(8) [2] have been submitted that allow
> the new ringbuffer to be correctly read.
> 
> This series is in line with the agreements [3] made at the meeting
> during LPC2019 in Lisbon, with 1 exception: support for dictionaries
> will not be discontinued [4]. Dictionaries are stored in a separate
> buffer so that they cannot interfere with the human-readable buffer.
> 

Hey John,

I tested this on two 128-cpu ppc64le power8 boxes, and a 64 core x86_64 box
using your prb-test testsuite with the latest upstream kernel.  The tests ran
successfully for ~16 hours without any problems.

P.


^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [PATCH v2 0/3] printk: replace ringbuffer
  2020-05-01  9:40 [PATCH v2 0/3] printk: replace ringbuffer John Ogness
                   ` (3 preceding siblings ...)
  2020-05-13 12:05 ` [PATCH v2 0/3] printk: replace ringbuffer Prarit Bhargava
@ 2020-05-15 10:24 ` Sergey Senozhatsky
  4 siblings, 0 replies; 30+ messages in thread
From: Sergey Senozhatsky @ 2020-05-15 10:24 UTC (permalink / raw)
  To: John Ogness
  Cc: Petr Mladek, Peter Zijlstra, Sergey Senozhatsky,
	Sergey Senozhatsky, Steven Rostedt, Linus Torvalds,
	Greg Kroah-Hartman, Andrea Parri, Thomas Gleixner, kexec,
	linux-kernel

On (20/05/01 11:46), John Ogness wrote:
> Hello,
> 
> Here is a v2 for the first series to rework the printk subsystem. The
> v1 and history are here [0]. This first series only replaces the
> existing ringbuffer implementation. No locking is removed. No
> semantics/behavior of printk are changed.
> 
> The VMCOREINFO is updated. RFC patches for the external tools
> crash(8) [1] and makedumpfile(8) [2] have been submitted that allow
> the new ringbuffer to be correctly read.
> 
> This series is in line with the agreements [3] made at the meeting
> during LPC2019 in Lisbon, with 1 exception: support for dictionaries
> will not be discontinued [4]. Dictionaries are stored in a separate
> buffer so that they cannot interfere with the human-readable buffer.

I'm willing to bless this. The code looks good to me, nice job guys.

Acked-by: Sergey Senozhatsky <sergey.senozhatsky@gmail.com>

	-ss

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [PATCH v2 2/3] printk: add lockless buffer
       [not found]   ` <87v9ktcs3q.fsf@vostro.fn.ogness.net>
@ 2020-05-18 17:22     ` Linus Torvalds
  2020-05-19 20:34       ` John Ogness
  0 siblings, 1 reply; 30+ messages in thread
From: Linus Torvalds @ 2020-05-18 17:22 UTC (permalink / raw)
  To: John Ogness
  Cc: Petr Mladek, Peter Zijlstra, Sergey Senozhatsky,
	Sergey Senozhatsky, Steven Rostedt, Greg Kroah-Hartman,
	Andrea Parri, Thomas Gleixner, kexec, Linux Kernel Mailing List,
	Paul E. McKenney

On Mon, May 18, 2020 at 6:04 AM John Ogness <john.ogness@linutronix.de> wrote:
>
> The cmpxchg() needs to be moved out of the while condition so that a
> continue can be used as intended. Patch below.

This seems pointless and wrong (patch edited to remove the '-' lines
so that you see the end result):

>                 smp_mb(); /* LMM(data_push_tail:C) */
>
> +               if (atomic_long_try_cmpxchg_relaxed(&data_ring->tail_lpos,
> +                               &tail_lpos,
> +                               next_lpos)) { /* LMM(data_push_tail:D) */
> +                       break;
> +               }

Doing an "smp_mb()" followed by a "cmpxchg_relaxed" seems all kinds of
odd and pointless, and is very much non-optimal on x86 for example.,

Just remove the smp_mb(), and use the non-relaxed form of cmpxchg.
It's defined to be fully ordered if it succeeds (and if the cmpxchg
doesn't succeed, it's a no-op and the memory barrier shouldn't make
any difference).

Otherwise you'll do two memory ordering operations on x86 (and
probably some other architectures), since the cmpxchg is always
ordered on x86 and there exists no "relaxed" form of it.

                  Linus

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [PATCH v2 2/3] printk: add lockless buffer
  2020-05-18 17:22     ` Linus Torvalds
@ 2020-05-19 20:34       ` John Ogness
  0 siblings, 0 replies; 30+ messages in thread
From: John Ogness @ 2020-05-19 20:34 UTC (permalink / raw)
  To: Linus Torvalds
  Cc: Petr Mladek, Peter Zijlstra, Sergey Senozhatsky,
	Sergey Senozhatsky, Steven Rostedt, Greg Kroah-Hartman,
	Andrea Parri, Thomas Gleixner, kexec, Linux Kernel Mailing List,
	Paul E. McKenney

On 2020-05-18, Linus Torvalds <torvalds@linux-foundation.org> wrote:
>>                 smp_mb(); /* LMM(data_push_tail:C) */
>>
>> +               if (atomic_long_try_cmpxchg_relaxed(&data_ring->tail_lpos,
>> +                               &tail_lpos,
>> +                               next_lpos)) { /* LMM(data_push_tail:D) */
>> +                       break;
>> +               }
>
> Doing an "smp_mb()" followed by a "cmpxchg_relaxed" seems all kinds of
> odd and pointless, and is very much non-optimal on x86 for example.,
>
> Just remove the smp_mb(), and use the non-relaxed form of cmpxchg.
> It's defined to be fully ordered if it succeeds (and if the cmpxchg
> doesn't succeed, it's a no-op and the memory barrier shouldn't make
> any difference).
>
> Otherwise you'll do two memory ordering operations on x86 (and
> probably some other architectures), since the cmpxchg is always
> ordered on x86 and there exists no "relaxed" form of it.

ACK.

All three smp_mb()'s and both smp_wmb()'s sit directly next to
cmpxchg_relaxed() calls. Having explicit memory barriers was helpful for
identifying, proving, and testing a minimal set of pairs (on arm64), but
all will be folded into full cmpxchg()'s for the next version.

John Ogness

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [PATCH v2 1/3] crash: add VMCOREINFO macro for anonymous structs
  2020-05-01  9:40 ` [PATCH v2 1/3] crash: add VMCOREINFO macro for anonymous structs John Ogness
@ 2020-06-03 10:16   ` Petr Mladek
  0 siblings, 0 replies; 30+ messages in thread
From: Petr Mladek @ 2020-06-03 10:16 UTC (permalink / raw)
  To: John Ogness
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel

On Fri 2020-05-01 11:46:08, John Ogness wrote:
> Some structs are not named and are only available via their typedef.
> Add a VMCOREINFO macro to export field offsets for such structs.

Honestly, I did not get the meaning until I looked at the sample
usage added by the 2nd patch.

The term "anonymous structures" has another meaning in C++. It is
used for structures without any name that are defined inside a named
structure.

Something like this might be better:

"crash: Add VMCOREINFO macro to define offset in a struct declared by typedef

 The existing macro VMCOREINFO_OFFSET() can't be used for structures
 declared via typedef because "struct" is not part of type definition.

 Create another macro for this purpose."


Anyway, thanks a lot for the prototype of crash implementation.
I am happy that it is possible.

Best Regards,
Petr

PS: It might take few more days until I send some feedback for the
other patches. They are a bit more complex ;-)

^ permalink raw reply	[flat|nested] 30+ messages in thread

* blk->id read race: was: [PATCH v2 2/3] printk: add lockless buffer
  2020-05-01  9:40 ` [PATCH v2 2/3] printk: add lockless buffer John Ogness
       [not found]   ` <87v9ktcs3q.fsf@vostro.fn.ogness.net>
@ 2020-06-09  7:10   ` Petr Mladek
  2020-06-09 14:18     ` John Ogness
  2020-06-09  9:31   ` redundant check in make_data_reusable(): was " Petr Mladek
                     ` (3 subsequent siblings)
  5 siblings, 1 reply; 30+ messages in thread
From: Petr Mladek @ 2020-06-09  7:10 UTC (permalink / raw)
  To: John Ogness
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel

On Fri 2020-05-01 11:46:09, John Ogness wrote:
> Introduce a multi-reader multi-writer lockless ringbuffer for storing
> the kernel log messages. Readers and writers may use their API from
> any context (including scheduler and NMI). This ringbuffer will make
> it possible to decouple printk() callers from any context, locking,
> or console constraints. It also makes it possible for readers to have
> full access to the ringbuffer contents at any time and context (for
> example from any panic situation).
> 
> --- /dev/null
> +++ b/kernel/printk/printk_ringbuffer.c
> +/*
> + * Given a data ring (text or dict), put the associated descriptor of each
> + * data block from @lpos_begin until @lpos_end into the reusable state.
> + *
> + * If there is any problem making the associated descriptor reusable, either
> + * the descriptor has not yet been committed or another writer task has
> + * already pushed the tail lpos past the problematic data block. Regardless,
> + * on error the caller can re-load the tail lpos to determine the situation.
> + */
> +static bool data_make_reusable(struct printk_ringbuffer *rb,
> +			       struct prb_data_ring *data_ring,
> +			       unsigned long lpos_begin,
> +			       unsigned long lpos_end,
> +			       unsigned long *lpos_out)
> +{
> +	struct prb_desc_ring *desc_ring = &rb->desc_ring;
> +	struct prb_data_blk_lpos *blk_lpos;
> +	struct prb_data_block *blk;
> +	unsigned long tail_lpos;
> +	enum desc_state d_state;
> +	struct prb_desc desc;
> +	unsigned long id;
> +
> +	/*
> +	 * Using the provided @data_ring, point @blk_lpos to the correct
> +	 * blk_lpos within the local copy of the descriptor.
> +	 */
> +	if (data_ring == &rb->text_data_ring)
> +		blk_lpos = &desc.text_blk_lpos;
> +	else
> +		blk_lpos = &desc.dict_blk_lpos;
> +
> +	/* Loop until @lpos_begin has advanced to or beyond @lpos_end. */
> +	while ((lpos_end - lpos_begin) - 1 < DATA_SIZE(data_ring)) {
> +		blk = to_block(data_ring, lpos_begin);
> +		id = READ_ONCE(blk->id); /* LMM(data_make_reusable:A) */

This would deserve some comment:

1. Compiler could not optimize out the read because there is a data
   dependency on lpos_begin.

2. Compiler could not postpone the read because it is followed by
   smp_rmb().

So, is READ_ONCE() realy needed?

Well, blk->id clearly can be modified in parallel so we need to be
careful. There is smp_rmb() right below. Do we needed smp_rmb() also
before?

What about the following scenario?:


CPU0						CPU1

						data_alloc()
						  data_push_tail()

						blk = to_block(data_ring, begin_lpos)
						WRITE_ONCE(blk->id, id); /* LMM(data_alloc:B) */

desc_push_tail()
  data_push_tail()

    tail_lpos = data_ring->tail_lpos;
    // see data_ring->tail_lpos already updated by CPU1

    data_make_reusable()

      // lpos_begin = tail_lpos via parameter
      blk = to_block(data_ring, lpos_begin);
      id = blk->id

Now: CPU0 might see outdated blk->id before CPU1 wrote new value
     because there is no read barrier betwen reading tail_lpos
     and blk->id here.

     The outdated id would cause desc_miss. CPU0 would return back
     to data_push_tail(). It will try to re-read data_ring->tail_lpos.
     But it will be the same as before because it already read the
     updated value.

     As a result, data_alloc() would fail.

IMHO, we need smp_rmb() between data_ring->tail_lpos read and
the related blk->id read. It should be either in data_push_tail()
or in data_make_reusable().

Best Regards,
Petr

PS: I am still in the middle of the review. I think that it is better
to discuss each race separately.

> +		/*
> +		 * Guarantee the block ID is loaded before checking the tail
> +		 * lpos. The loaded block ID can only be considered valid if
> +		 * the tail lpos has not overtaken @lpos_begin. This pairs
> +		 * with data_alloc:A.
> +		 *
> +		 * Memory barrier involvement:
> +		 *
> +		 * If data_make_reusable:A reads from data_alloc:B, then
> +		 * data_make_reusable:C reads from data_push_tail:D.
> +		 *
> +		 * Relies on:
> +		 *
> +		 * MB from data_push_tail:D to data_alloc:B
> +		 *    matching
> +		 * RMB from data_make_reusable:A to data_make_reusable:C
> +		 *
> +		 * Note: data_push_tail:D and data_alloc:B can be different
> +		 *       CPUs. However, the data_alloc:B CPU (which performs
> +		 *       the full memory barrier) must have previously seen
> +		 *       data_push_tail:D.
> +		 */
> +		smp_rmb(); /* LMM(data_make_reusable:B) */
> +
> +		tail_lpos = atomic_long_read(&data_ring->tail_lpos
> +					); /* LMM(data_make_reusable:C) */
> +
> +		/*
> +		 * If @lpos_begin has fallen behind the tail lpos, the read
> +		 * block ID cannot be trusted. Fast forward @lpos_begin to the
> +		 * tail lpos and try again.
> +		 */
> +		if (lpos_begin - tail_lpos >= DATA_SIZE(data_ring)) {
> +			lpos_begin = tail_lpos;
> +			continue;
> +		}
> +
> +		d_state = desc_read(desc_ring, id,
> +				    &desc); /* LMM(data_make_reusable:D) */
> +
> +		switch (d_state) {
> +		case desc_miss:
> +			return false;
> +		case desc_reserved:
> +			return false;
> +		case desc_committed:
> +			/*
> +			 * This data block is invalid if the descriptor
> +			 * does not point back to it.
> +			 */
> +			if (blk_lpos->begin != lpos_begin)
> +				return false;
> +			desc_make_reusable(desc_ring, id);
> +			break;
> +		case desc_reusable:
> +			/*
> +			 * This data block is invalid if the descriptor
> +			 * does not point back to it.
> +			 */
> +			if (blk_lpos->begin != lpos_begin)
> +				return false;
> +			break;
> +		}
> +
> +		/* Advance @lpos_begin to the next data block. */
> +		lpos_begin = blk_lpos->next;
> +	}
> +
> +	*lpos_out = lpos_begin;
> +	return true;
> +}
> +
> +/*
> + * Advance the data ring tail to at least @lpos. This function puts
> + * descriptors into the reusable state if the tail is pushed beyond
> + * their associated data block.
> + */
> +static bool data_push_tail(struct printk_ringbuffer *rb,
> +			   struct prb_data_ring *data_ring,
> +			   unsigned long lpos)
> +{
> +	unsigned long tail_lpos;
> +	unsigned long next_lpos;
> +
> +	/* If @lpos is not valid, there is nothing to do. */
> +	if (lpos == INVALID_LPOS)
> +		return true;
> +
> +	tail_lpos = atomic_long_read(&data_ring->tail_lpos);
> +
> +	do {
> +		/* Done, if the tail lpos is already at or beyond @lpos. */
> +		if ((lpos - tail_lpos) - 1 >= DATA_SIZE(data_ring))
> +			break;
> +
> +		/*
> +		 * Make all descriptors reusable that are associated with
> +		 * data blocks before @lpos.
> +		 */
> +		if (!data_make_reusable(rb, data_ring, tail_lpos, lpos,
> +					&next_lpos)) {
> +			/*
> +			 * Guarantee the descriptor state loaded in
> +			 * data_make_reusable() is performed before reloading
> +			 * the tail lpos. The failed data_make_reusable() may
> +			 * be due to a newly recycled descriptor causing
> +			 * the tail lpos to have been previously pushed. This
> +			 * pairs with desc_reserve:D.
> +			 *
> +			 * Memory barrier involvement:
> +			 *
> +			 * If data_make_reusable:D reads from desc_reserve:G,
> +			 * then data_push_tail:B reads from data_push_tail:D.
> +			 *
> +			 * Relies on:
> +			 *
> +			 * MB from data_push_tail:D to desc_reserve:G
> +			 *    matching
> +			 * RMB from data_make_reusable:D to data_push_tail:B
> +			 *
> +			 * Note: data_push_tail:D and desc_reserve:G can be
> +			 *       different CPUs. However, the desc_reserve:G
> +			 *       CPU (which performs the full memory barrier)
> +			 *       must have previously seen data_push_tail:D.
> +			 */
> +			smp_rmb(); /* LMM(data_push_tail:A) */
> +
> +			next_lpos = atomic_long_read(&data_ring->tail_lpos
> +						); /* LMM(data_push_tail:B) */
> +			if (next_lpos == tail_lpos)
> +				return false;
> +
> +			/* Another task pushed the tail. Try again. */
> +			tail_lpos = next_lpos;
> +			continue;
> +		}
> +
> +		/*
> +		 * Guarantee any descriptor states that have transitioned to
> +		 * reusable are stored before pushing the tail lpos. This
> +		 * allows readers to identify if data has expired while
> +		 * reading the descriptor. This pairs with desc_read:D.
> +		 */
> +		smp_mb(); /* LMM(data_push_tail:C) */
> +
> +	} while (!atomic_long_try_cmpxchg_relaxed(&data_ring->tail_lpos,
> +			&tail_lpos, next_lpos)); /* LMM(data_push_tail:D) */
> +
> +	return true;
> +}
> +

^ permalink raw reply	[flat|nested] 30+ messages in thread

* redundant check in make_data_reusable(): was [PATCH v2 2/3] printk: add lockless buffer
  2020-05-01  9:40 ` [PATCH v2 2/3] printk: add lockless buffer John Ogness
       [not found]   ` <87v9ktcs3q.fsf@vostro.fn.ogness.net>
  2020-06-09  7:10   ` blk->id read race: was: " Petr Mladek
@ 2020-06-09  9:31   ` Petr Mladek
  2020-06-09 14:48     ` John Ogness
  2020-06-09  9:48   ` Full barrier in data_push_tail(): " Petr Mladek
                     ` (2 subsequent siblings)
  5 siblings, 1 reply; 30+ messages in thread
From: Petr Mladek @ 2020-06-09  9:31 UTC (permalink / raw)
  To: John Ogness
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel

On Fri 2020-05-01 11:46:09, John Ogness wrote:
> Introduce a multi-reader multi-writer lockless ringbuffer for storing
> the kernel log messages. Readers and writers may use their API from
> any context (including scheduler and NMI). This ringbuffer will make
> it possible to decouple printk() callers from any context, locking,
> or console constraints. It also makes it possible for readers to have
> full access to the ringbuffer contents at any time and context (for
> example from any panic situation).
> 
> diff --git a/kernel/printk/printk_ringbuffer.c b/kernel/printk/printk_ringbuffer.c
> new file mode 100644
> index 000000000000..e0a66468d4f3
> --- /dev/null
> +++ b/kernel/printk/printk_ringbuffer.c
> +/*
> + * Given a data ring (text or dict), put the associated descriptor of each
> + * data block from @lpos_begin until @lpos_end into the reusable state.
> + *
> + * If there is any problem making the associated descriptor reusable, either
> + * the descriptor has not yet been committed or another writer task has
> + * already pushed the tail lpos past the problematic data block. Regardless,
> + * on error the caller can re-load the tail lpos to determine the situation.
> + */
> +static bool data_make_reusable(struct printk_ringbuffer *rb,
> +			       struct prb_data_ring *data_ring,
> +			       unsigned long lpos_begin,
> +			       unsigned long lpos_end,
> +			       unsigned long *lpos_out)
> +{
> +	struct prb_desc_ring *desc_ring = &rb->desc_ring;
> +	struct prb_data_blk_lpos *blk_lpos;
> +	struct prb_data_block *blk;
> +	unsigned long tail_lpos;
> +	enum desc_state d_state;
> +	struct prb_desc desc;
> +	unsigned long id;
> +
> +	/*
> +	 * Using the provided @data_ring, point @blk_lpos to the correct
> +	 * blk_lpos within the local copy of the descriptor.
> +	 */
> +	if (data_ring == &rb->text_data_ring)
> +		blk_lpos = &desc.text_blk_lpos;
> +	else
> +		blk_lpos = &desc.dict_blk_lpos;
> +
> +	/* Loop until @lpos_begin has advanced to or beyond @lpos_end. */
> +	while ((lpos_end - lpos_begin) - 1 < DATA_SIZE(data_ring)) {
> +		blk = to_block(data_ring, lpos_begin);
> +		id = READ_ONCE(blk->id); /* LMM(data_make_reusable:A) */
> +
> +		/*
> +		 * Guarantee the block ID is loaded before checking the tail
> +		 * lpos. The loaded block ID can only be considered valid if
> +		 * the tail lpos has not overtaken @lpos_begin. This pairs
> +		 * with data_alloc:A.
> +		 *
> +		 * Memory barrier involvement:
> +		 *
> +		 * If data_make_reusable:A reads from data_alloc:B, then
> +		 * data_make_reusable:C reads from data_push_tail:D.
> +		 *
> +		 * Relies on:
> +		 *
> +		 * MB from data_push_tail:D to data_alloc:B
> +		 *    matching
> +		 * RMB from data_make_reusable:A to data_make_reusable:C
> +		 *
> +		 * Note: data_push_tail:D and data_alloc:B can be different
> +		 *       CPUs. However, the data_alloc:B CPU (which performs
> +		 *       the full memory barrier) must have previously seen
> +		 *       data_push_tail:D.
> +		 */
> +		smp_rmb(); /* LMM(data_make_reusable:B) */
> +
> +		tail_lpos = atomic_long_read(&data_ring->tail_lpos
> +					); /* LMM(data_make_reusable:C) */
> +
> +		/*
> +		 * If @lpos_begin has fallen behind the tail lpos, the read
> +		 * block ID cannot be trusted. Fast forward @lpos_begin to the
> +		 * tail lpos and try again.
> +		 */
> +		if (lpos_begin - tail_lpos >= DATA_SIZE(data_ring)) {
> +			lpos_begin = tail_lpos;
> +			continue;
> +		}

I am sorry if we have had this discussion already in past. Well, it
would just prove that it really needs a comment why this check is
necessary. I looks redundant or just an optimization to me.

IMHO, the following check of the descriptor state should be enough:

1. This function returns "false" when the descriptor is in desc_misc or
   desc_reusable state or when it does not point back to the data.

   In this case, data_push_tail() does similar check. It loads
   data_ring->tail_lpos as well. It returns false when the tail did
   not move. Otherwise, it calls data_make_reusable() again and has
   the same effect as the continue here.


2. This function returns true only when the descriptor is in committed
   or reusable state and points back to the same data block. In this,
   case it really contains consistent data.

   OK, it is possible that the block is already reused when the
   descriptor is in reusable state. But it is possible only when
   data_ring->tail_lpos moved. In this case, cmpxchg in
   data_push_tail() would fail and we would go back as well.

Do I miss anything, please?

See also a nit below.

> +
> +		d_state = desc_read(desc_ring, id,
> +				    &desc); /* LMM(data_make_reusable:D) */
> +
> +		switch (d_state) {
> +		case desc_miss:
> +			return false;
> +		case desc_reserved:
> +			return false;
> +		case desc_committed:
> +			/*
> +			 * This data block is invalid if the descriptor
> +			 * does not point back to it.
> +			 */
> +			if (blk_lpos->begin != lpos_begin)
> +				return false;
> +			desc_make_reusable(desc_ring, id);
> +			break;
> +		case desc_reusable:
> +			/*
> +			 * This data block is invalid if the descriptor
> +			 * does not point back to it.
> +			 */
> +			if (blk_lpos->begin != lpos_begin)
> +				return false;
> +			break;
> +		}
> +
> +		/* Advance @lpos_begin to the next data block. */
> +		lpos_begin = blk_lpos->next;
> +	}
> +
> +	*lpos_out = lpos_begin;
> +	return true;
> +}
> +
> +/*
> + * Advance the data ring tail to at least @lpos. This function puts
> + * descriptors into the reusable state if the tail is pushed beyond
> + * their associated data block.
> + */
> +static bool data_push_tail(struct printk_ringbuffer *rb,
> +			   struct prb_data_ring *data_ring,
> +			   unsigned long lpos)
> +{
> +	unsigned long tail_lpos;
> +	unsigned long next_lpos;
> +
> +	/* If @lpos is not valid, there is nothing to do. */
> +	if (lpos == INVALID_LPOS)
> +		return true;
> +
> +	tail_lpos = atomic_long_read(&data_ring->tail_lpos);
> +
> +	do {
> +		/* Done, if the tail lpos is already at or beyond @lpos. */
> +		if ((lpos - tail_lpos) - 1 >= DATA_SIZE(data_ring))
> +			break;
> +
> +		/*
> +		 * Make all descriptors reusable that are associated with
> +		 * data blocks before @lpos.
> +		 */
> +		if (!data_make_reusable(rb, data_ring, tail_lpos, lpos,
> +					&next_lpos)) {
> +			/*
> +			 * Guarantee the descriptor state loaded in
> +			 * data_make_reusable() is performed before reloading
> +			 * the tail lpos. The failed data_make_reusable() may
> +			 * be due to a newly recycled descriptor causing
> +			 * the tail lpos to have been previously pushed. This
> +			 * pairs with desc_reserve:D.
> +			 *
> +			 * Memory barrier involvement:
> +			 *
> +			 * If data_make_reusable:D reads from desc_reserve:G,
> +			 * then data_push_tail:B reads from data_push_tail:D.
> +			 *
> +			 * Relies on:
> +			 *
> +			 * MB from data_push_tail:D to desc_reserve:G
> +			 *    matching
> +			 * RMB from data_make_reusable:D to data_push_tail:B
> +			 *
> +			 * Note: data_push_tail:D and desc_reserve:G can be
> +			 *       different CPUs. However, the desc_reserve:G
> +			 *       CPU (which performs the full memory barrier)
> +			 *       must have previously seen data_push_tail:D.
> +			 */
> +			smp_rmb(); /* LMM(data_push_tail:A) */
> +
> +			next_lpos = atomic_long_read(&data_ring->tail_lpos
> +						); /* LMM(data_push_tail:B) */

I have been a bit confused whether next_lpos variable is used
correctly everywhere. The primary purpose is to use a value provided
by data_make_reusable() in the cmpxchg below.

But it is (mis)used here to temporary store the current tail_lpos
value. Could we please use another variable here? IMHO, the following
code would be more clear:

	tail_lpos_old = tail_lpos;
	tail_lpos = atomic_long_read(&data_ring->tail_lpos); /* LMM(data_push_tail:B) */

	if (tail_lpos == tail_lpos_old)
		return false;

> +			if (next_lpos == tail_lpos)
> +				return false;
> +
> +			/* Another task pushed the tail. Try again. */
> +			tail_lpos = next_lpos;
> +			continue;
> +		}
> +
> +		/*
> +		 * Guarantee any descriptor states that have transitioned to
> +		 * reusable are stored before pushing the tail lpos. This
> +		 * allows readers to identify if data has expired while
> +		 * reading the descriptor. This pairs with desc_read:D.
> +		 */
> +		smp_mb(); /* LMM(data_push_tail:C) */
> +
> +	} while (!atomic_long_try_cmpxchg_relaxed(&data_ring->tail_lpos,
> +			&tail_lpos, next_lpos)); /* LMM(data_push_tail:D) */
> +
> +	return true;
> +}

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Full barrier in data_push_tail(): was [PATCH v2 2/3] printk: add lockless buffer
  2020-05-01  9:40 ` [PATCH v2 2/3] printk: add lockless buffer John Ogness
                     ` (2 preceding siblings ...)
  2020-06-09  9:31   ` redundant check in make_data_reusable(): was " Petr Mladek
@ 2020-06-09  9:48   ` Petr Mladek
  2020-06-09 15:03     ` John Ogness
  2020-06-09 11:37   ` Barrier before pushing desc_ring tail: " Petr Mladek
  2020-06-09 14:38   ` data_ring head_lpos and tail_lpos synchronization: " Petr Mladek
  5 siblings, 1 reply; 30+ messages in thread
From: Petr Mladek @ 2020-06-09  9:48 UTC (permalink / raw)
  To: John Ogness
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel

On Fri 2020-05-01 11:46:09, John Ogness wrote:
> Introduce a multi-reader multi-writer lockless ringbuffer for storing
> the kernel log messages. Readers and writers may use their API from
> any context (including scheduler and NMI). This ringbuffer will make
> it possible to decouple printk() callers from any context, locking,
> or console constraints. It also makes it possible for readers to have
> full access to the ringbuffer contents at any time and context (for
> example from any panic situation).
> 
> diff --git a/kernel/printk/printk_ringbuffer.c b/kernel/printk/printk_ringbuffer.c
> new file mode 100644
> index 000000000000..e0a66468d4f3
> --- /dev/null
> +++ b/kernel/printk/printk_ringbuffer.c
> +/*
> + * Advance the data ring tail to at least @lpos. This function puts
> + * descriptors into the reusable state if the tail is pushed beyond
> + * their associated data block.
> + */
> +static bool data_push_tail(struct printk_ringbuffer *rb,
> +			   struct prb_data_ring *data_ring,
> +			   unsigned long lpos)
> +{
> +	unsigned long tail_lpos;
> +	unsigned long next_lpos;
> +
> +	/* If @lpos is not valid, there is nothing to do. */
> +	if (lpos == INVALID_LPOS)
> +		return true;
> +
> +	tail_lpos = atomic_long_read(&data_ring->tail_lpos);
> +
> +	do {
> +		/* Done, if the tail lpos is already at or beyond @lpos. */
> +		if ((lpos - tail_lpos) - 1 >= DATA_SIZE(data_ring))
> +			break;
> +
> +		/*
> +		 * Make all descriptors reusable that are associated with
> +		 * data blocks before @lpos.
> +		 */
> +		if (!data_make_reusable(rb, data_ring, tail_lpos, lpos,
> +					&next_lpos)) {
> +			/*
> +			 * Guarantee the descriptor state loaded in
> +			 * data_make_reusable() is performed before reloading
> +			 * the tail lpos. The failed data_make_reusable() may
> +			 * be due to a newly recycled descriptor causing
> +			 * the tail lpos to have been previously pushed. This
> +			 * pairs with desc_reserve:D.
> +			 *
> +			 * Memory barrier involvement:
> +			 *
> +			 * If data_make_reusable:D reads from desc_reserve:G,
> +			 * then data_push_tail:B reads from data_push_tail:D.
> +			 *
> +			 * Relies on:
> +			 *
> +			 * MB from data_push_tail:D to desc_reserve:G
> +			 *    matching
> +			 * RMB from data_make_reusable:D to data_push_tail:B
> +			 *
> +			 * Note: data_push_tail:D and desc_reserve:G can be
> +			 *       different CPUs. However, the desc_reserve:G
> +			 *       CPU (which performs the full memory barrier)
> +			 *       must have previously seen data_push_tail:D.
> +			 */
> +			smp_rmb(); /* LMM(data_push_tail:A) */
> +
> +			next_lpos = atomic_long_read(&data_ring->tail_lpos
> +						); /* LMM(data_push_tail:B) */
> +			if (next_lpos == tail_lpos)
> +				return false;
> +
> +			/* Another task pushed the tail. Try again. */
> +			tail_lpos = next_lpos;
> +			continue;
> +		}
> +
> +		/*
> +		 * Guarantee any descriptor states that have transitioned to
> +		 * reusable are stored before pushing the tail lpos. This
> +		 * allows readers to identify if data has expired while
> +		 * reading the descriptor. This pairs with desc_read:D.
> +		 */
> +		smp_mb(); /* LMM(data_push_tail:C) */

The comment does not explain why we need a full barrier here. It talks
about writing descriptor states. It suggests that write barrier should
be enough.

I guess that this is related to the discussion that we had last time,
and the litmus test mentioned in
see https://lore.kernel.org/r/87h7zcjkxy.fsf@linutronix.de

I would add something like:

		* Full barrier is necessary because the descriptors
		* might have been made reusable also by other CPUs.

For people like me, it would be great to add also link to a more
detailed explanation, for example, the litmus tests, or something
even more human readable ;-) I think that it is a "rather" common
problem. I wonder whether it is already documented somewhere.

> +	} while (!atomic_long_try_cmpxchg_relaxed(&data_ring->tail_lpos,
> +			&tail_lpos, next_lpos)); /* LMM(data_push_tail:D) */
> +
> +	return true;
> +}
> +

Best Regards,
Petr

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Barrier before pushing desc_ring tail: was [PATCH v2 2/3] printk: add lockless buffer
  2020-05-01  9:40 ` [PATCH v2 2/3] printk: add lockless buffer John Ogness
                     ` (3 preceding siblings ...)
  2020-06-09  9:48   ` Full barrier in data_push_tail(): " Petr Mladek
@ 2020-06-09 11:37   ` Petr Mladek
  2020-06-09 15:56     ` John Ogness
  2020-06-09 14:38   ` data_ring head_lpos and tail_lpos synchronization: " Petr Mladek
  5 siblings, 1 reply; 30+ messages in thread
From: Petr Mladek @ 2020-06-09 11:37 UTC (permalink / raw)
  To: John Ogness
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel

On Fri 2020-05-01 11:46:09, John Ogness wrote:
> Introduce a multi-reader multi-writer lockless ringbuffer for storing
> the kernel log messages. Readers and writers may use their API from
> any context (including scheduler and NMI). This ringbuffer will make
> it possible to decouple printk() callers from any context, locking,
> or console constraints. It also makes it possible for readers to have
> full access to the ringbuffer contents at any time and context (for
> example from any panic situation).
> 
> --- /dev/null
> +++ b/kernel/printk/printk_ringbuffer.c
> +/*
> + * Advance the desc ring tail. This function advances the tail by one
> + * descriptor, thus invalidating the oldest descriptor. Before advancing
> + * the tail, the tail descriptor is made reusable and all data blocks up to
> + * and including the descriptor's data block are invalidated (i.e. the data
> + * ring tail is pushed past the data block of the descriptor being made
> + * reusable).
> + */
> +static bool desc_push_tail(struct printk_ringbuffer *rb,
> +			   unsigned long tail_id)
> +{
> +	struct prb_desc_ring *desc_ring = &rb->desc_ring;
> +	enum desc_state d_state;
> +	struct prb_desc desc;
> +
> +	d_state = desc_read(desc_ring, tail_id, &desc);
> +
> +	switch (d_state) {
> +	case desc_miss:
> +		/*
> +		 * If the ID is exactly 1 wrap behind the expected, it is
> +		 * in the process of being reserved by another writer and
> +		 * must be considered reserved.
> +		 */
> +		if (DESC_ID(atomic_long_read(&desc.state_var)) ==
> +		    DESC_ID_PREV_WRAP(desc_ring, tail_id)) {
> +			return false;
> +		}
> +
> +		/*
> +		 * The ID has changed. Another writer must have pushed the
> +		 * tail and recycled the descriptor already. Success is
> +		 * returned because the caller is only interested in the
> +		 * specified tail being pushed, which it was.
> +		 */
> +		return true;
> +	case desc_reserved:
> +		return false;
> +	case desc_committed:
> +		desc_make_reusable(desc_ring, tail_id);
> +		break;
> +	case desc_reusable:
> +		break;
> +	}
> +
> +	/*
> +	 * Data blocks must be invalidated before their associated
> +	 * descriptor can be made available for recycling. Invalidating
> +	 * them later is not possible because there is no way to trust
> +	 * data blocks once their associated descriptor is gone.
> +	 */
> +
> +	if (!data_push_tail(rb, &rb->text_data_ring, desc.text_blk_lpos.next))
> +		return false;
> +	if (!data_push_tail(rb, &rb->dict_data_ring, desc.dict_blk_lpos.next))
> +		return false;
> +
> +	/*
> +	 * Check the next descriptor after @tail_id before pushing the tail
> +	 * to it because the tail must always be in a committed or reusable
> +	 * state. The implementation of prb_first_seq() relies on this.
> +	 *
> +	 * A successful read implies that the next descriptor is less than or
> +	 * equal to @head_id so there is no risk of pushing the tail past the
> +	 * head.
> +	 */
> +	d_state = desc_read(desc_ring, DESC_ID(tail_id + 1),
> +			    &desc); /* LMM(desc_push_tail:A) */
> +	if (d_state == desc_committed || d_state == desc_reusable) {
> +		/*
> +		 * Any CPU that loads the new tail ID, must see that the
> +		 * descriptor at @tail_id is in the reusable state. See the
> +		 * read memory barrier part of desc_reserve:D for details.
> +		 */

I was quite confused by the above comment. Does it mean that we need
a barrier here? Or does it explain why the cmpxchg has its own
LMM marker?

I think that we actually need a full barrier here to make sure that
all CPUs see the changes made by data_push_tail() before we
allow to rewrite the descriptor. The changes in data_push_tail() might
be done on different CPUs.

It is similar like the full barrier in data_push_tail() before changing
data_ring->tail_lpos.

Best Regards,
Petr

> +		atomic_long_cmpxchg_relaxed(&desc_ring->tail_id, tail_id,
> +			DESC_ID(tail_id + 1)); /* LMM(desc_push_tail:B) */
> +	} else {
> +		/*
> +		 * Guarantee the last state load from desc_read() is before
> +		 * reloading @tail_id in order to see a new tail ID in the
> +		 * case that the descriptor has been recycled. This pairs
> +		 * with desc_reserve:D.
> +		 *
> +		 * Memory barrier involvement:
> +		 *
> +		 * If desc_push_tail:A reads from desc_reserve:G, then
> +		 * desc_push_tail:D reads from desc_push_tail:B.
> +		 *
> +		 * Relies on:
> +		 *
> +		 * MB from desc_push_tail:B to desc_reserve:G
> +		 *    matching
> +		 * RMB from desc_push_tail:A to desc_push_tail:D
> +		 *
> +		 * Note: desc_push_tail:B and desc_reserve:G can be different
> +		 *       CPUs. However, the desc_reserve:G CPU (which performs
> +		 *       the full memory barrier) must have previously seen
> +		 *       desc_push_tail:B.
> +		 */
> +		smp_rmb(); /* LMM(desc_push_tail:C) */
> +
> +		/*
> +		 * Re-check the tail ID. The descriptor following @tail_id is
> +		 * not in an allowed tail state. But if the tail has since
> +		 * been moved by another task, then it does not matter.
> +		 */
> +		if (atomic_long_read(&desc_ring->tail_id) ==
> +					tail_id) { /* LMM(desc_push_tail:D) */
> +			return false;
> +		}
> +	}
> +
> +	return true;
> +}
> +
> +/* Reserve a new descriptor, invalidating the oldest if necessary. */
> +static bool desc_reserve(struct printk_ringbuffer *rb, unsigned long *id_out)
> +{
> +	struct prb_desc_ring *desc_ring = &rb->desc_ring;
> +	unsigned long prev_state_val;
> +	unsigned long id_prev_wrap;
> +	struct prb_desc *desc;
> +	unsigned long head_id;
> +	unsigned long id;
> +
> +	head_id = atomic_long_read(&desc_ring->head_id
> +						); /* LMM(desc_reserve:A) */
> +
> +	do {
> +		desc = to_desc(desc_ring, head_id);
> +
> +		id = DESC_ID(head_id + 1);
> +		id_prev_wrap = DESC_ID_PREV_WRAP(desc_ring, id);
> +
> +		/*
> +		 * Guarantee the head ID is read before reading the tail ID.
> +		 * Since the tail ID is updated before the head ID, this
> +		 * guarantees that @id_prev_wrap is never ahead of the tail
> +		 * ID. This pairs with desc_reserve:D.
> +		 *
> +		 * Memory barrier involvement:
> +		 *
> +		 * If desc_reserve:A reads from desc_reserve:E, then
> +		 * desc_reserve:C reads from desc_push_tail:B.
> +		 *
> +		 * Relies on:
> +		 *
> +		 * MB from desc_push_tail:B to desc_reserve:E
> +		 *    matching
> +		 * RMB from desc_reserve:A to desc_reserve:C
> +		 *
> +		 * Note: desc_push_tail:B and desc_reserve:E can be different
> +		 *       CPUs. However, the desc_reserve:E CPU (which performs
> +		 *       the full memory barrier) must have previously seen
> +		 *       desc_push_tail:B.
> +		 */
> +		smp_rmb(); /* LMM(desc_reserve:B) */
> +
> +		if (id_prev_wrap == atomic_long_read(&desc_ring->tail_id
> +						)) { /* LMM(desc_reserve:C) */
> +			/*
> +			 * Make space for the new descriptor by
> +			 * advancing the tail.
> +			 */
> +			if (!desc_push_tail(rb, id_prev_wrap))
> +				return false;
> +		}
> +
> +		/*
> +		 * Guarantee the tail ID is read before validating the
> +		 * recycled descriptor state. A read memory barrier is
> +		 * sufficient for this. This pairs with data_push_tail:C.
> +		 *
> +		 * Memory barrier involvement:
> +		 *
> +		 * If desc_reserve:C reads from desc_push_tail:B, then
> +		 * desc_reserve:F reads from desc_make_reusable:A.
> +		 *
> +		 * Relies on:
> +		 *
> +		 * MB from desc_make_reusable:A to desc_push_tail:B
> +		 *    matching
> +		 * RMB from desc_reserve:C to desc_reserve:F
> +		 *
> +		 * Note: desc_make_reusable:A, desc_push_tail:B, and
> +		 *       data_push_tail:C can all be different CPUs. However,
> +		 *       the desc_push_tail:B CPU must have previously seen
> +		 *       data_push_tail:D and the data_push_tail:D CPU (which
> +		 *       performs the full memory barrier) must have
> +		 *       previously seen desc_make_reusable:A.
> +		 *
> +		 * Guarantee any data ring tail changes are stored before
> +		 * recycling the descriptor. Data ring tail changes can happen
> +		 * via desc_push_tail()->data_push_tail(). A full memory
> +		 * barrier is needed since another task may have pushed the
> +		 * data ring tails. This pairs with data_push_tail:A.
> +		 *
> +		 * Guarantee a new tail ID is stored before recycling the
> +		 * descriptor. A full memory barrier is needed since another
> +		 * task may have pushed the tail ID. This pairs with
> +		 * desc_push_tail:C and prb_first_seq:C.
> +		 *
> +		 * Guarantee the tail ID is stored before storing the head ID.
> +		 * This pairs with desc_reserve:B.
> +		 */
> +		smp_mb(); /* LMM(desc_reserve:D) */
> +
> +	} while (!atomic_long_try_cmpxchg_relaxed(&desc_ring->head_id,
> +				&head_id, id)); /* LMM(desc_reserve:E) */
> +
> +	desc = to_desc(desc_ring, id);
> +
> +	/*
> +	 * If the descriptor has been recycled, verify the old state val.
> +	 * See "ABA Issues" about why this verification is performed.
> +	 */
> +	prev_state_val = atomic_long_read(&desc->state_var
> +						); /* LMM(desc_reserve:F) */
> +	if (prev_state_val && prev_state_val != (id_prev_wrap |
> +						 DESC_COMMITTED_MASK |
> +						 DESC_REUSE_MASK)) {
> +		WARN_ON_ONCE(1);
> +		return false;
> +	}
> +
> +	/*
> +	 * Assign the descriptor a new ID and set its state to reserved.
> +	 * See "ABA Issues" about why cmpxchg() instead of set() is used.
> +	 */
> +	if (!atomic_long_try_cmpxchg_relaxed(&desc->state_var,
> +			&prev_state_val, id | 0)) { /* LMM(desc_reserve:G) */
> +		WARN_ON_ONCE(1);
> +		return false;
> +	}
> +
> +	/*
> +	 * Guarantee the new descriptor ID and state is stored before making
> +	 * any other changes. This pairs with desc_read:D.
> +	 */
> +	smp_wmb(); /* LMM(desc_reserve:H) */
> +
> +	/* Now data in @desc can be modified: LMM(desc_reserve:I) */
> +
> +	*id_out = id;
> +	return true;
> +}

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: blk->id read race: was: [PATCH v2 2/3] printk: add lockless buffer
  2020-06-09  7:10   ` blk->id read race: was: " Petr Mladek
@ 2020-06-09 14:18     ` John Ogness
  2020-06-10  8:42       ` Petr Mladek
  0 siblings, 1 reply; 30+ messages in thread
From: John Ogness @ 2020-06-09 14:18 UTC (permalink / raw)
  To: Petr Mladek
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel, Paul McKenney

On 2020-06-09, Petr Mladek <pmladek@suse.com> wrote:
>> --- /dev/null
>> +++ b/kernel/printk/printk_ringbuffer.c
>> +/*
>> + * Given a data ring (text or dict), put the associated descriptor of each
>> + * data block from @lpos_begin until @lpos_end into the reusable state.
>> + *
>> + * If there is any problem making the associated descriptor reusable, either
>> + * the descriptor has not yet been committed or another writer task has
>> + * already pushed the tail lpos past the problematic data block. Regardless,
>> + * on error the caller can re-load the tail lpos to determine the situation.
>> + */
>> +static bool data_make_reusable(struct printk_ringbuffer *rb,
>> +			       struct prb_data_ring *data_ring,
>> +			       unsigned long lpos_begin,
>> +			       unsigned long lpos_end,
>> +			       unsigned long *lpos_out)
>> +{
>> +	struct prb_desc_ring *desc_ring = &rb->desc_ring;
>> +	struct prb_data_blk_lpos *blk_lpos;
>> +	struct prb_data_block *blk;
>> +	unsigned long tail_lpos;
>> +	enum desc_state d_state;
>> +	struct prb_desc desc;
>> +	unsigned long id;
>> +
>> +	/*
>> +	 * Using the provided @data_ring, point @blk_lpos to the correct
>> +	 * blk_lpos within the local copy of the descriptor.
>> +	 */
>> +	if (data_ring == &rb->text_data_ring)
>> +		blk_lpos = &desc.text_blk_lpos;
>> +	else
>> +		blk_lpos = &desc.dict_blk_lpos;
>> +
>> +	/* Loop until @lpos_begin has advanced to or beyond @lpos_end. */
>> +	while ((lpos_end - lpos_begin) - 1 < DATA_SIZE(data_ring)) {
>> +		blk = to_block(data_ring, lpos_begin);
>> +		id = READ_ONCE(blk->id); /* LMM(data_make_reusable:A) */
>
> This would deserve some comment:
>
> 1. Compiler could not optimize out the read because there is a data
>    dependency on lpos_begin.
>
> 2. Compiler could not postpone the read because it is followed by
>    smp_rmb().
>
> So, is READ_ONCE() realy needed?

I agree that it is not needed. Both the READ_ONCE() and its countering
WRITE_ONCE() (data_alloc:B) only document the lockless shared access. I
will remove both for the next version.

Do we still need a comment? Is it not obvious that there is a data
dependency on @lpos_begin?

        blk = to_block(data_ring, lpos_begin);
        id = blk->id;

> Well, blk->id clearly can be modified in parallel so we need to be
> careful. There is smp_rmb() right below. Do we needed smp_rmb() also
> before?
>
> What about the following scenario?:
>
>
> CPU0						CPU1
>
> 						data_alloc()
> 						  data_push_tail()
>
> 						blk = to_block(data_ring, begin_lpos)
> 						WRITE_ONCE(blk->id, id); /* LMM(data_alloc:B) */
>
> desc_push_tail()
>   data_push_tail()
>
>     tail_lpos = data_ring->tail_lpos;
>     // see data_ring->tail_lpos already updated by CPU1
>
>     data_make_reusable()
>
>       // lpos_begin = tail_lpos via parameter
>       blk = to_block(data_ring, lpos_begin);
>       id = blk->id
>
> Now: CPU0 might see outdated blk->id before CPU1 wrote new value
>      because there is no read barrier betwen reading tail_lpos
>      and blk->id here.

In your example, CPU1 is pushing the tail and then setting the block ID
for the _newly_ allocated block, that is located is _before_ the new
tail. If CPU0 sees the new tail already, it is still reading a valid
block ID, which is _not_ from the block that CPU1 is in the process of
writing.

John Ogness

^ permalink raw reply	[flat|nested] 30+ messages in thread

* data_ring head_lpos and tail_lpos synchronization: was [PATCH v2 2/3] printk: add lockless buffer
  2020-05-01  9:40 ` [PATCH v2 2/3] printk: add lockless buffer John Ogness
                     ` (4 preceding siblings ...)
  2020-06-09 11:37   ` Barrier before pushing desc_ring tail: " Petr Mladek
@ 2020-06-09 14:38   ` Petr Mladek
  2020-06-10  7:53     ` John Ogness
  5 siblings, 1 reply; 30+ messages in thread
From: Petr Mladek @ 2020-06-09 14:38 UTC (permalink / raw)
  To: John Ogness
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel

On Fri 2020-05-01 11:46:09, John Ogness wrote:
> Introduce a multi-reader multi-writer lockless ringbuffer for storing
> the kernel log messages. Readers and writers may use their API from
> any context (including scheduler and NMI). This ringbuffer will make
> it possible to decouple printk() callers from any context, locking,
> or console constraints. It also makes it possible for readers to have
> full access to the ringbuffer contents at any time and context (for
> example from any panic situation).
> 
> --- /dev/null
> +++ b/kernel/printk/printk_ringbuffer.c
> +/*
> + * Advance the data ring tail to at least @lpos. This function puts
> + * descriptors into the reusable state if the tail is pushed beyond
> + * their associated data block.
> + */
> +static bool data_push_tail(struct printk_ringbuffer *rb,
> +			   struct prb_data_ring *data_ring,
> +			   unsigned long lpos)
> +{
> +	unsigned long tail_lpos;
> +	unsigned long next_lpos;
> +
> +	/* If @lpos is not valid, there is nothing to do. */
> +	if (lpos == INVALID_LPOS)
> +		return true;
> +
> +	tail_lpos = atomic_long_read(&data_ring->tail_lpos);

Hmm, I wonder whether data_ring->tail_lpos and data_ring->head_lpos
are synchronized enough between each other.

I feel that there should be read barrier here. But it seems that more
barriers are missing. For example, let's have:


CPU0				CPU1

data_alloc()
  begin_lpos = atomic_read(data_ring->head_lpos);

				data_alloc()
				  data_push_tail()
				    cmpxchg(data_ring->tail_lpos);
				    // A: no barrier
				  cmpxchg(data_ring->head_lpos);

  data_push_tail()
    // B: no barrier
    tail_lpos = atomic_read(data_ring->tail_lpos);

Problem 1:

   CPU0 might see random ordering of data_ring->tail_lpos and
   head_lpos values modified by CPU1. There are missing both
   write and read barriers.


Problem 2:

   There might be still a chance because CPU0 does:

     if (!data_make_reusable())
       smp_rmb()
       tail_lpos = atomic_read(data_ring->tail_lpos);

   But CPU0 might still see old data_ring->tail because CPU1 did not
   do write barrier.


My proposal:

1. There should be full memory barrier on the location A before
   updating data_ring->head_lpos. It will be the same as the full
   barriers needed before updating data_ring->tail_lpos.

   data_ring->tail_lpos might have been pushed by another CPU.
   We need to make sure that all CPUs see all needed changes
   before we data_alloc() pushes head_lpos.


2. There should be read memory barrier in the location B.

   It is not strictly necessary because data_push_tail() tries
   to re-read data_ring->tail_lpos after a read barrier. But
   the re-read is just a fallback.

   The read barrier before the first read should be there to
   keep "clean" design ;-) Or there should be at least some
   comment about why the barrier is not there.

Best Regards,
Petr

> +
> +	do {
> +		/* Done, if the tail lpos is already at or beyond @lpos. */
> +		if ((lpos - tail_lpos) - 1 >= DATA_SIZE(data_ring))
> +			break;
> +
> +		/*
> +		 * Make all descriptors reusable that are associated with
> +		 * data blocks before @lpos.
> +		 */
> +		if (!data_make_reusable(rb, data_ring, tail_lpos, lpos,
> +					&next_lpos)) {
> +			/*
> +			 * Guarantee the descriptor state loaded in
> +			 * data_make_reusable() is performed before reloading
> +			 * the tail lpos. The failed data_make_reusable() may
> +			 * be due to a newly recycled descriptor causing
> +			 * the tail lpos to have been previously pushed. This
> +			 * pairs with desc_reserve:D.
> +			 *
> +			 * Memory barrier involvement:
> +			 *
> +			 * If data_make_reusable:D reads from desc_reserve:G,
> +			 * then data_push_tail:B reads from data_push_tail:D.
> +			 *
> +			 * Relies on:
> +			 *
> +			 * MB from data_push_tail:D to desc_reserve:G
> +			 *    matching
> +			 * RMB from data_make_reusable:D to data_push_tail:B
> +			 *
> +			 * Note: data_push_tail:D and desc_reserve:G can be
> +			 *       different CPUs. However, the desc_reserve:G
> +			 *       CPU (which performs the full memory barrier)
> +			 *       must have previously seen data_push_tail:D.
> +			 */
> +			smp_rmb(); /* LMM(data_push_tail:A) */
> +
> +			next_lpos = atomic_long_read(&data_ring->tail_lpos
> +						); /* LMM(data_push_tail:B) */
> +			if (next_lpos == tail_lpos)
> +				return false;
> +
> +			/* Another task pushed the tail. Try again. */
> +			tail_lpos = next_lpos;
> +			continue;
> +		}
> +
> +		/*
> +		 * Guarantee any descriptor states that have transitioned to
> +		 * reusable are stored before pushing the tail lpos. This
> +		 * allows readers to identify if data has expired while
> +		 * reading the descriptor. This pairs with desc_read:D.
> +		 */
> +		smp_mb(); /* LMM(data_push_tail:C) */
> +
> +	} while (!atomic_long_try_cmpxchg_relaxed(&data_ring->tail_lpos,
> +			&tail_lpos, next_lpos)); /* LMM(data_push_tail:D) */
> +
> +	return true;
> +}
> +
> +/*
> + * Allocate a new data block, invalidating the oldest data block(s)
> + * if necessary. This function also associates the data block with
> + * a specified descriptor.
> + */
> +static char *data_alloc(struct printk_ringbuffer *rb,
> +			struct prb_data_ring *data_ring, unsigned long size,
> +			struct prb_data_blk_lpos *blk_lpos, unsigned long id)
> +{
> +	struct prb_data_block *blk;
> +	unsigned long begin_lpos;
> +	unsigned long next_lpos;
> +
> +	if (!data_ring->data || size == 0) {
> +		/* Specify a data-less block. */
> +		blk_lpos->begin = INVALID_LPOS;
> +		blk_lpos->next = INVALID_LPOS;
> +		return NULL;
> +	}
> +
> +	size = to_blk_size(size);
> +
> +	begin_lpos = atomic_long_read(&data_ring->head_lpos);
> +
> +	do {
> +		next_lpos = get_next_lpos(data_ring, begin_lpos, size);
> +
> +		if (!data_push_tail(rb, data_ring,
> +				    next_lpos - DATA_SIZE(data_ring))) {
> +			/* Failed to allocate, specify a data-less block. */
> +			blk_lpos->begin = INVALID_LPOS;
> +			blk_lpos->next = INVALID_LPOS;
> +			return NULL;
> +		}
> +	} while (!atomic_long_try_cmpxchg_relaxed(&data_ring->head_lpos,
> +						  &begin_lpos, next_lpos));
> +
> +	/*
> +	 * Guarantee any updated tail lpos is stored before setting the new
> +	 * block ID. This allows block IDs to be trusted based on the tail
> +	 * lpos. A full memory barrier is needed since another task may
> +	 * have updated the tail lpos. This pairs with data_make_reusable:B.
> +	 */
> +	smp_mb(); /* LMM(data_alloc:A) */
> +
> +	blk = to_block(data_ring, begin_lpos);
> +	WRITE_ONCE(blk->id, id); /* LMM(data_alloc:B) */
> +
> +	if (DATA_WRAPS(data_ring, begin_lpos) !=
> +	    DATA_WRAPS(data_ring, next_lpos)) {
> +		/* Wrapping data blocks store their data at the beginning. */
> +		blk = to_block(data_ring, 0);
> +
> +		/*
> +		 * Store the ID on the wrapped block for consistency.
> +		 * The printk_ringbuffer does not actually use it.
> +		 */
> +		blk->id = id;
> +	}
> +
> +	blk_lpos->begin = begin_lpos;
> +	blk_lpos->next = next_lpos;
> +
> +	return &blk->data[0];
> +}

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: redundant check in make_data_reusable(): was [PATCH v2 2/3] printk: add lockless buffer
  2020-06-09  9:31   ` redundant check in make_data_reusable(): was " Petr Mladek
@ 2020-06-09 14:48     ` John Ogness
  2020-06-10  9:38       ` Petr Mladek
  0 siblings, 1 reply; 30+ messages in thread
From: John Ogness @ 2020-06-09 14:48 UTC (permalink / raw)
  To: Petr Mladek
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel, Paul McKenney

On 2020-06-09, Petr Mladek <pmladek@suse.com> wrote:
>> --- /dev/null
>> +++ b/kernel/printk/printk_ringbuffer.c
>> +/*
>> + * Given a data ring (text or dict), put the associated descriptor of each
>> + * data block from @lpos_begin until @lpos_end into the reusable state.
>> + *
>> + * If there is any problem making the associated descriptor reusable, either
>> + * the descriptor has not yet been committed or another writer task has
>> + * already pushed the tail lpos past the problematic data block. Regardless,
>> + * on error the caller can re-load the tail lpos to determine the situation.
>> + */
>> +static bool data_make_reusable(struct printk_ringbuffer *rb,
>> +			       struct prb_data_ring *data_ring,
>> +			       unsigned long lpos_begin,
>> +			       unsigned long lpos_end,
>> +			       unsigned long *lpos_out)
>> +{
>> +	struct prb_desc_ring *desc_ring = &rb->desc_ring;
>> +	struct prb_data_blk_lpos *blk_lpos;
>> +	struct prb_data_block *blk;
>> +	unsigned long tail_lpos;
>> +	enum desc_state d_state;
>> +	struct prb_desc desc;
>> +	unsigned long id;
>> +
>> +	/*
>> +	 * Using the provided @data_ring, point @blk_lpos to the correct
>> +	 * blk_lpos within the local copy of the descriptor.
>> +	 */
>> +	if (data_ring == &rb->text_data_ring)
>> +		blk_lpos = &desc.text_blk_lpos;
>> +	else
>> +		blk_lpos = &desc.dict_blk_lpos;
>> +
>> +	/* Loop until @lpos_begin has advanced to or beyond @lpos_end. */
>> +	while ((lpos_end - lpos_begin) - 1 < DATA_SIZE(data_ring)) {
>> +		blk = to_block(data_ring, lpos_begin);
>> +		id = READ_ONCE(blk->id); /* LMM(data_make_reusable:A) */
>> +
>> +		/*
>> +		 * Guarantee the block ID is loaded before checking the tail
>> +		 * lpos. The loaded block ID can only be considered valid if
>> +		 * the tail lpos has not overtaken @lpos_begin. This pairs
>> +		 * with data_alloc:A.
>> +		 *
>> +		 * Memory barrier involvement:
>> +		 *
>> +		 * If data_make_reusable:A reads from data_alloc:B, then
>> +		 * data_make_reusable:C reads from data_push_tail:D.
>> +		 *
>> +		 * Relies on:
>> +		 *
>> +		 * MB from data_push_tail:D to data_alloc:B
>> +		 *    matching
>> +		 * RMB from data_make_reusable:A to data_make_reusable:C
>> +		 *
>> +		 * Note: data_push_tail:D and data_alloc:B can be different
>> +		 *       CPUs. However, the data_alloc:B CPU (which performs
>> +		 *       the full memory barrier) must have previously seen
>> +		 *       data_push_tail:D.
>> +		 */
>> +		smp_rmb(); /* LMM(data_make_reusable:B) */
>> +
>> +		tail_lpos = atomic_long_read(&data_ring->tail_lpos
>> +					); /* LMM(data_make_reusable:C) */
>> +
>> +		/*
>> +		 * If @lpos_begin has fallen behind the tail lpos, the read
>> +		 * block ID cannot be trusted. Fast forward @lpos_begin to the
>> +		 * tail lpos and try again.
>> +		 */
>> +		if (lpos_begin - tail_lpos >= DATA_SIZE(data_ring)) {
>> +			lpos_begin = tail_lpos;
>> +			continue;
>> +		}
>
> I am sorry if we have had this discussion already in past.

We have [0]. (Search for "Ouch.")

> Well, it would just prove that it really needs a comment why this
> check is necessary.

The comment says why it is necessary. The previous read of the block ID
cannot be trusted if the the tail has been pushed beyond it.

> IMHO, the following check of the descriptor state should be enough:
>
> 1. This function returns "false" when the descriptor is in desc_misc or
>    desc_reusable state or when it does not point back to the data.
>
>    In this case, data_push_tail() does similar check. It loads
>    data_ring->tail_lpos as well. It returns false when the tail did
>    not move. Otherwise, it calls data_make_reusable() again and has
>    the same effect as the continue here.
>
>
> 2. This function returns true only when the descriptor is in committed
>    or reusable state and points back to the same data block. In this,
>    case it really contains consistent data.
>
>    OK, it is possible that the block is already reused when the
>    descriptor is in reusable state. But it is possible only when
>    data_ring->tail_lpos moved. In this case, cmpxchg in
>    data_push_tail() would fail and we would go back as well.

In your email from our previous discussion, you pointed out that two
writers could race to allocate a block and the loser would start freeing
the winner's data because it gets a valid block ID to a descriptor in
the committed state that points back to the block lpos.

You wrote:

> => We should add a check into data_make_reusable() that
>    we are invalidating really the descriptor pointing to
>    the given lpos and not a freshly reused one!

That is what this check is doing. If the tail has been pushed beyond the
block, we cannot trust the block ID read because that area may have been
recycled already.

>> +/*
>> + * Advance the data ring tail to at least @lpos. This function puts
>> + * descriptors into the reusable state if the tail is pushed beyond
>> + * their associated data block.
>> + */
>> +static bool data_push_tail(struct printk_ringbuffer *rb,
>> +			   struct prb_data_ring *data_ring,
>> +			   unsigned long lpos)
>> +{
>> +	unsigned long tail_lpos;
>> +	unsigned long next_lpos;
>> +
>> +	/* If @lpos is not valid, there is nothing to do. */
>> +	if (lpos == INVALID_LPOS)
>> +		return true;
>> +
>> +	tail_lpos = atomic_long_read(&data_ring->tail_lpos);
>> +
>> +	do {
>> +		/* Done, if the tail lpos is already at or beyond @lpos. */
>> +		if ((lpos - tail_lpos) - 1 >= DATA_SIZE(data_ring))
>> +			break;
>> +
>> +		/*
>> +		 * Make all descriptors reusable that are associated with
>> +		 * data blocks before @lpos.
>> +		 */
>> +		if (!data_make_reusable(rb, data_ring, tail_lpos, lpos,
>> +					&next_lpos)) {
>> +			/*
>> +			 * Guarantee the descriptor state loaded in
>> +			 * data_make_reusable() is performed before reloading
>> +			 * the tail lpos. The failed data_make_reusable() may
>> +			 * be due to a newly recycled descriptor causing
>> +			 * the tail lpos to have been previously pushed. This
>> +			 * pairs with desc_reserve:D.
>> +			 *
>> +			 * Memory barrier involvement:
>> +			 *
>> +			 * If data_make_reusable:D reads from desc_reserve:G,
>> +			 * then data_push_tail:B reads from data_push_tail:D.
>> +			 *
>> +			 * Relies on:
>> +			 *
>> +			 * MB from data_push_tail:D to desc_reserve:G
>> +			 *    matching
>> +			 * RMB from data_make_reusable:D to data_push_tail:B
>> +			 *
>> +			 * Note: data_push_tail:D and desc_reserve:G can be
>> +			 *       different CPUs. However, the desc_reserve:G
>> +			 *       CPU (which performs the full memory barrier)
>> +			 *       must have previously seen data_push_tail:D.
>> +			 */
>> +			smp_rmb(); /* LMM(data_push_tail:A) */
>> +
>> +			next_lpos = atomic_long_read(&data_ring->tail_lpos
>> +						); /* LMM(data_push_tail:B) */
>
> I have been a bit confused whether next_lpos variable is used
> correctly everywhere. The primary purpose is to use a value provided
> by data_make_reusable() in the cmpxchg below.
>
> But it is (mis)used here to temporary store the current tail_lpos
> value. Could we please use another variable here? IMHO, the following
> code would be more clear:
>
> 	tail_lpos_old = tail_lpos;
> 	tail_lpos = atomic_long_read(&data_ring->tail_lpos); /* LMM(data_push_tail:B) */
>
> 	if (tail_lpos == tail_lpos_old)
> 		return false;

You are correct. I am ashamed about this misuse. I will add the variable
and rely on the compiler to appropriately merge/recycle stack variables.

John Ogness

[0] https://lkml.kernel.org/r/87ftecy343.fsf@linutronix.de

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: Full barrier in data_push_tail(): was [PATCH v2 2/3] printk: add lockless buffer
  2020-06-09  9:48   ` Full barrier in data_push_tail(): " Petr Mladek
@ 2020-06-09 15:03     ` John Ogness
  0 siblings, 0 replies; 30+ messages in thread
From: John Ogness @ 2020-06-09 15:03 UTC (permalink / raw)
  To: Petr Mladek
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel, Paul McKenney

On 2020-06-09, Petr Mladek <pmladek@suse.com> wrote:
>> --- /dev/null
>> +++ b/kernel/printk/printk_ringbuffer.c
>> +static bool data_push_tail(struct printk_ringbuffer *rb,
>> +			   struct prb_data_ring *data_ring,
>> +			   unsigned long lpos)
>> +{
>> ...
>> +
>> +		/*
>> +		 * Guarantee any descriptor states that have transitioned to
>> +		 * reusable are stored before pushing the tail lpos. This
>> +		 * allows readers to identify if data has expired while
>> +		 * reading the descriptor. This pairs with desc_read:D.
>> +		 */
>> +		smp_mb(); /* LMM(data_push_tail:C) */
>
> The comment does not explain why we need a full barrier here.
>
> I would add something like:
>
> 		* Full barrier is necessary because the descriptors
> 		* might have been made reusable also by other CPUs.

Agreed. Somehow I missed that explanation for this comment.

> For people like me, it would be great to add also link to a more
> detailed explanation, for example, the litmus tests, or something
> even more human readable ;-) I think that it is a "rather" common
> problem. I wonder whether it is already documented somewhere.

I believe that memory barriers need to be formally documented. Perhaps
in such a way that litmus tests can be easily generated. Tools could
greatly assist with this. It is my hope that my memory barrier
documentation can spark some ideas about how we could do this. (Doing
all this manually really sucks!)

John Ogness

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: Barrier before pushing desc_ring tail: was [PATCH v2 2/3] printk: add lockless buffer
  2020-06-09 11:37   ` Barrier before pushing desc_ring tail: " Petr Mladek
@ 2020-06-09 15:56     ` John Ogness
  2020-06-11 12:01       ` Petr Mladek
  0 siblings, 1 reply; 30+ messages in thread
From: John Ogness @ 2020-06-09 15:56 UTC (permalink / raw)
  To: Petr Mladek
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel, Paul McKenney

On 2020-06-09, Petr Mladek <pmladek@suse.com> wrote:
>> --- /dev/null
>> +++ b/kernel/printk/printk_ringbuffer.c
>> +/*
>> + * Advance the desc ring tail. This function advances the tail by one
>> + * descriptor, thus invalidating the oldest descriptor. Before advancing
>> + * the tail, the tail descriptor is made reusable and all data blocks up to
>> + * and including the descriptor's data block are invalidated (i.e. the data
>> + * ring tail is pushed past the data block of the descriptor being made
>> + * reusable).
>> + */
>> +static bool desc_push_tail(struct printk_ringbuffer *rb,
>> +			   unsigned long tail_id)
>> +{
>> +	struct prb_desc_ring *desc_ring = &rb->desc_ring;
>> +	enum desc_state d_state;
>> +	struct prb_desc desc;
>> +
>> +	d_state = desc_read(desc_ring, tail_id, &desc);
>> +
>> +	switch (d_state) {
>> +	case desc_miss:
>> +		/*
>> +		 * If the ID is exactly 1 wrap behind the expected, it is
>> +		 * in the process of being reserved by another writer and
>> +		 * must be considered reserved.
>> +		 */
>> +		if (DESC_ID(atomic_long_read(&desc.state_var)) ==
>> +		    DESC_ID_PREV_WRAP(desc_ring, tail_id)) {
>> +			return false;
>> +		}
>> +
>> +		/*
>> +		 * The ID has changed. Another writer must have pushed the
>> +		 * tail and recycled the descriptor already. Success is
>> +		 * returned because the caller is only interested in the
>> +		 * specified tail being pushed, which it was.
>> +		 */
>> +		return true;
>> +	case desc_reserved:
>> +		return false;
>> +	case desc_committed:
>> +		desc_make_reusable(desc_ring, tail_id);
>> +		break;
>> +	case desc_reusable:
>> +		break;
>> +	}
>> +
>> +	/*
>> +	 * Data blocks must be invalidated before their associated
>> +	 * descriptor can be made available for recycling. Invalidating
>> +	 * them later is not possible because there is no way to trust
>> +	 * data blocks once their associated descriptor is gone.
>> +	 */
>> +
>> +	if (!data_push_tail(rb, &rb->text_data_ring, desc.text_blk_lpos.next))
>> +		return false;
>> +	if (!data_push_tail(rb, &rb->dict_data_ring, desc.dict_blk_lpos.next))
>> +		return false;
>> +
>> +	/*
>> +	 * Check the next descriptor after @tail_id before pushing the tail
>> +	 * to it because the tail must always be in a committed or reusable
>> +	 * state. The implementation of prb_first_seq() relies on this.
>> +	 *
>> +	 * A successful read implies that the next descriptor is less than or
>> +	 * equal to @head_id so there is no risk of pushing the tail past the
>> +	 * head.
>> +	 */
>> +	d_state = desc_read(desc_ring, DESC_ID(tail_id + 1),
>> +			    &desc); /* LMM(desc_push_tail:A) */
>> +	if (d_state == desc_committed || d_state == desc_reusable) {
>> +		/*
>> +		 * Any CPU that loads the new tail ID, must see that the
>> +		 * descriptor at @tail_id is in the reusable state. See the
>> +		 * read memory barrier part of desc_reserve:D for details.
>> +		 */
>> +		atomic_long_cmpxchg_relaxed(&desc_ring->tail_id, tail_id,
>> +			DESC_ID(tail_id + 1)); /* LMM(desc_push_tail:B) */
>
> I was quite confused by the above comment. Does it mean that we need
> a barrier here? Or does it explain why the cmpxchg has its own
> LMM marker?

This LMM marker is referenced quite often, but since it is a relaxed
cmpxchg(), its significance is not immediately clear. I was hoping to
add some hints as to why it is significant. The comment that it is
referring to is:

	/*
	 * Guarantee the tail ID is read before validating the
	 * recycled descriptor state. A read memory barrier is
	 * sufficient for this. This pairs with data_push_tail:C.
	 *
	 * Memory barrier involvement:
	 *
	 * If desc_reserve:C reads from desc_push_tail:B, then
	 * desc_reserve:F reads from desc_make_reusable:A.
	 *
	 * Relies on:
	 *
	 * MB from desc_make_reusable:A to desc_push_tail:B
	 *    matching
	 * RMB from desc_reserve:C to desc_reserve:F
	 *
	 * Note: desc_make_reusable:A, desc_push_tail:B, and
	 *       data_push_tail:C can all be different CPUs. However,
	 *       the desc_push_tail:B CPU must have previously seen
	 *       data_push_tail:D and the data_push_tail:D CPU (which
	 *       performs the full memory barrier) must have
	 *       previously seen desc_make_reusable:A.
	 */

English translation:

In order to push the data tail, a CPU must first see that the associated
descriptor is in the reusable state. Since a full memory barrier is
performed after that sighting and before doing the data tail push, _any_
CPU that sees the pushed data tail will be able to see that the
associated descriptor is in the reusable state.

In order to push the descriptor tail, a CPU must first see that the
associated data tail has been pushed. Therefore, that CPU would also see
that the associated descriptor is in the reusable state.

> I think that we actually need a full barrier here to make sure that
> all CPUs see the changes made by data_push_tail() before we
> allow to rewrite the descriptor. The changes in data_push_tail() might
> be done on different CPUs.

There needs to be a reason why the ordering of data tail pushing and
descriptor tail pushing is important.

> It is similar like the full barrier in data_push_tail() before changing
> data_ring->tail_lpos.

How so? That memory barrier exists to make sure the reusable descriptor
state is stored before pushing the data tail. This is important for
readers (which start from the data tail) so they can notice if the
descriptor has since been invalidated (reusable state).

But where is it important that the data tail change is seen before the
descriptor tail change? How are the data tail and descriptor tail
significantly related to each other?

John Ogness

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: data_ring head_lpos and tail_lpos synchronization: was [PATCH v2 2/3] printk: add lockless buffer
  2020-06-09 14:38   ` data_ring head_lpos and tail_lpos synchronization: " Petr Mladek
@ 2020-06-10  7:53     ` John Ogness
  0 siblings, 0 replies; 30+ messages in thread
From: John Ogness @ 2020-06-10  7:53 UTC (permalink / raw)
  To: Petr Mladek
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel, Paul McKenney

On 2020-06-09, Petr Mladek <pmladek@suse.com> wrote:
>> --- /dev/null
>> +++ b/kernel/printk/printk_ringbuffer.c
>> +/*
>> + * Advance the data ring tail to at least @lpos. This function puts
>> + * descriptors into the reusable state if the tail is pushed beyond
>> + * their associated data block.
>> + */
>> +static bool data_push_tail(struct printk_ringbuffer *rb,
>> +			   struct prb_data_ring *data_ring,
>> +			   unsigned long lpos)
>> +{
>> +	unsigned long tail_lpos;
>> +	unsigned long next_lpos;
>> +
>> +	/* If @lpos is not valid, there is nothing to do. */
>> +	if (lpos == INVALID_LPOS)
>> +		return true;
>> +
>> +	tail_lpos = atomic_long_read(&data_ring->tail_lpos);
>
> Hmm, I wonder whether data_ring->tail_lpos and data_ring->head_lpos
> are synchronized enough between each other.
>
> I feel that there should be read barrier here. But it seems that more
> barriers are missing. For example, let's have:
>
>
> CPU0				CPU1
>
> data_alloc()
>   begin_lpos = atomic_read(data_ring->head_lpos);
>
> 				data_alloc()
> 				  data_push_tail()
> 				    cmpxchg(data_ring->tail_lpos);
> 				    // A: no barrier
> 				  cmpxchg(data_ring->head_lpos);
>
>   data_push_tail()
>     // B: no barrier
>     tail_lpos = atomic_read(data_ring->tail_lpos);
>
> Problem 1:
>
>    CPU0 might see random ordering of data_ring->tail_lpos and
>    head_lpos values modified by CPU1. There are missing both
>    write and read barriers.

You need to explain why this is a problem. CPU0 saw some head and some
tail. Both values are at least the current values (i.e. there is no
danger that it sees a tail that is further than the tail really is).

CPU0 then uses the head/tail values to determine how far to advance the
tail and how far to advance the head. Both of these advances use
cmpxchg_relaxed(). So there is no danger of random head/tail
modifications. Upon cmpxchg_relaxed() failure, the new current values
are loaded and it retries based on the new values.

The only issue is if data_push_tail()/data_make_reusable() are able to
recognize that a data area is already recycled. And both functions have
memory barriers in place for that.

> Problem 2:
>
>    There might be still a chance because CPU0 does:
>
>      if (!data_make_reusable())
>        smp_rmb()
>        tail_lpos = atomic_read(data_ring->tail_lpos);
>
>    But CPU0 might still see old data_ring->tail because CPU1 did not
>    do write barrier.

I claim that it does not matter. The smp_rmb() here pairs with the full
memory barrier LMM(desc_reserve:D). The reasoning:

	 * Guarantee any data ring tail changes are stored before
	 * recycling the descriptor. Data ring tail changes can happen
	 * via desc_push_tail()->data_push_tail(). A full memory
	 * barrier is needed since another task may have pushed the
	 * data ring tails. This pairs with data_push_tail:A.

So if data_make_reusable() failed due to a descriptor already being
recycled, we know CPU0 will be able to read an updated tail value (and
try again with the new value).

John Ogness

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: blk->id read race: was: [PATCH v2 2/3] printk: add lockless buffer
  2020-06-09 14:18     ` John Ogness
@ 2020-06-10  8:42       ` Petr Mladek
  2020-06-10 13:55         ` John Ogness
  0 siblings, 1 reply; 30+ messages in thread
From: Petr Mladek @ 2020-06-10  8:42 UTC (permalink / raw)
  To: John Ogness
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel, Paul McKenney

On Tue 2020-06-09 16:18:35, John Ogness wrote:
> On 2020-06-09, Petr Mladek <pmladek@suse.com> wrote:
> >> --- /dev/null
> >> +++ b/kernel/printk/printk_ringbuffer.c
> >> +/*
> >> + * Given a data ring (text or dict), put the associated descriptor of each
> >> + * data block from @lpos_begin until @lpos_end into the reusable state.
> >> + *
> >> + * If there is any problem making the associated descriptor reusable, either
> >> + * the descriptor has not yet been committed or another writer task has
> >> + * already pushed the tail lpos past the problematic data block. Regardless,
> >> + * on error the caller can re-load the tail lpos to determine the situation.
> >> + */
> >> +static bool data_make_reusable(struct printk_ringbuffer *rb,
> >> +			       struct prb_data_ring *data_ring,
> >> +			       unsigned long lpos_begin,
> >> +			       unsigned long lpos_end,
> >> +			       unsigned long *lpos_out)
> >> +{
> >> +	struct prb_desc_ring *desc_ring = &rb->desc_ring;
> >> +	struct prb_data_blk_lpos *blk_lpos;
> >> +	struct prb_data_block *blk;
> >> +	unsigned long tail_lpos;
> >> +	enum desc_state d_state;
> >> +	struct prb_desc desc;
> >> +	unsigned long id;
> >> +
> >> +	/*
> >> +	 * Using the provided @data_ring, point @blk_lpos to the correct
> >> +	 * blk_lpos within the local copy of the descriptor.
> >> +	 */
> >> +	if (data_ring == &rb->text_data_ring)
> >> +		blk_lpos = &desc.text_blk_lpos;
> >> +	else
> >> +		blk_lpos = &desc.dict_blk_lpos;
> >> +
> >> +	/* Loop until @lpos_begin has advanced to or beyond @lpos_end. */
> >> +	while ((lpos_end - lpos_begin) - 1 < DATA_SIZE(data_ring)) {
> >> +		blk = to_block(data_ring, lpos_begin);
> >> +		id = READ_ONCE(blk->id); /* LMM(data_make_reusable:A) */
> >
> > This would deserve some comment:
> >
> > 1. Compiler could not optimize out the read because there is a data
> >    dependency on lpos_begin.
> >
> > 2. Compiler could not postpone the read because it is followed by
> >    smp_rmb().
> >
> > So, is READ_ONCE() realy needed?
> 
> I agree that it is not needed. Both the READ_ONCE() and its countering
> WRITE_ONCE() (data_alloc:B) only document the lockless shared access. I
> will remove both for the next version.

Sounds good.

> Do we still need a comment? Is it not obvious that there is a data
> dependency on @lpos_begin?

Sigh, I just wonder why I am always confusedby this. See below.


>         blk = to_block(data_ring, lpos_begin);
>         id = blk->id;
> 
> > Well, blk->id clearly can be modified in parallel so we need to be
> > careful. There is smp_rmb() right below. Do we needed smp_rmb() also
> > before?
> >
> > What about the following scenario?:
> >
> >
> > CPU0						CPU1
> >
> > 						data_alloc()
> > 						  data_push_tail()
> >
> > 						blk = to_block(data_ring, begin_lpos)
> > 						WRITE_ONCE(blk->id, id); /* LMM(data_alloc:B) */
> >
> > desc_push_tail()
> >   data_push_tail()
> >
> >     tail_lpos = data_ring->tail_lpos;
> >     // see data_ring->tail_lpos already updated by CPU1
> >
> >     data_make_reusable()
> >
> >       // lpos_begin = tail_lpos via parameter
> >       blk = to_block(data_ring, lpos_begin);
> >       id = blk->id
> >
> > Now: CPU0 might see outdated blk->id before CPU1 wrote new value
> >      because there is no read barrier betwen reading tail_lpos
> >      and blk->id here.
> 
> In your example, CPU1 is pushing the tail and then setting the block ID
> for the _newly_ allocated block, that is located is _before_ the new
> tail. If CPU0 sees the new tail already, it is still reading a valid
> block ID, which is _not_ from the block that CPU1 is in the process of
> writing.

Ah, I see. I wrongly assumed that both CPO0 and CPU1 are working with
the same block address. But if CPU0 sees the new tail_lpos, it is
already looking at another block. And it is the classic fight against
yet another potential CPUs that try to push the tail as well.

I wonder if the comment might look like:

/*
 * No barrier is needed between reading tail_lpos and the related
 * blk->id. Only CPU that modifies tail_lpos via cmpxchg is allowed
 * to modify the related blk->id. CPUs that see the moved tail_lpos
 * are looking at another block related to the new tail_lpos.
 * It does not mater when the previous winner modifies the previous
 * block.
 */

I am not sure how many people are confused like me. It is possible
that it is not worth it. I just know that I did this mistake
repeatedly ;-)

Best Regards,
Petr

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: redundant check in make_data_reusable(): was [PATCH v2 2/3] printk: add lockless buffer
  2020-06-09 14:48     ` John Ogness
@ 2020-06-10  9:38       ` Petr Mladek
  2020-06-10 10:24         ` John Ogness
  0 siblings, 1 reply; 30+ messages in thread
From: Petr Mladek @ 2020-06-10  9:38 UTC (permalink / raw)
  To: John Ogness
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel, Paul McKenney

On Tue 2020-06-09 16:48:30, John Ogness wrote:
> On 2020-06-09, Petr Mladek <pmladek@suse.com> wrote:
> >> --- /dev/null
> >> +++ b/kernel/printk/printk_ringbuffer.c
> >> +/*
> >> + * Given a data ring (text or dict), put the associated descriptor of each
> >> + * data block from @lpos_begin until @lpos_end into the reusable state.
> >> + *
> >> + * If there is any problem making the associated descriptor reusable, either
> >> + * the descriptor has not yet been committed or another writer task has
> >> + * already pushed the tail lpos past the problematic data block. Regardless,
> >> + * on error the caller can re-load the tail lpos to determine the situation.
> >> + */
> >> +static bool data_make_reusable(struct printk_ringbuffer *rb,
> >> +			       struct prb_data_ring *data_ring,
> >> +			       unsigned long lpos_begin,
> >> +			       unsigned long lpos_end,
> >> +			       unsigned long *lpos_out)
> >> +{
> >> +	struct prb_desc_ring *desc_ring = &rb->desc_ring;
> >> +	struct prb_data_blk_lpos *blk_lpos;
> >> +	struct prb_data_block *blk;
> >> +	unsigned long tail_lpos;
> >> +	enum desc_state d_state;
> >> +	struct prb_desc desc;
> >> +	unsigned long id;
> >> +
> >> +	/*
> >> +	 * Using the provided @data_ring, point @blk_lpos to the correct
> >> +	 * blk_lpos within the local copy of the descriptor.
> >> +	 */
> >> +	if (data_ring == &rb->text_data_ring)
> >> +		blk_lpos = &desc.text_blk_lpos;
> >> +	else
> >> +		blk_lpos = &desc.dict_blk_lpos;
> >> +
> >> +	/* Loop until @lpos_begin has advanced to or beyond @lpos_end. */
> >> +	while ((lpos_end - lpos_begin) - 1 < DATA_SIZE(data_ring)) {
> >> +		blk = to_block(data_ring, lpos_begin);
> >> +		id = READ_ONCE(blk->id); /* LMM(data_make_reusable:A) */
> >> +
> >> +		/*
> >> +		 * Guarantee the block ID is loaded before checking the tail
> >> +		 * lpos. The loaded block ID can only be considered valid if
> >> +		 * the tail lpos has not overtaken @lpos_begin. This pairs
> >> +		 * with data_alloc:A.
> >> +		 *
> >> +		 * Memory barrier involvement:
> >> +		 *
> >> +		 * If data_make_reusable:A reads from data_alloc:B, then
> >> +		 * data_make_reusable:C reads from data_push_tail:D.
> >> +		 *
> >> +		 * Relies on:
> >> +		 *
> >> +		 * MB from data_push_tail:D to data_alloc:B
> >> +		 *    matching
> >> +		 * RMB from data_make_reusable:A to data_make_reusable:C
> >> +		 *
> >> +		 * Note: data_push_tail:D and data_alloc:B can be different
> >> +		 *       CPUs. However, the data_alloc:B CPU (which performs
> >> +		 *       the full memory barrier) must have previously seen
> >> +		 *       data_push_tail:D.
> >> +		 */
> >> +		smp_rmb(); /* LMM(data_make_reusable:B) */
> >> +
> >> +		tail_lpos = atomic_long_read(&data_ring->tail_lpos
> >> +					); /* LMM(data_make_reusable:C) */
> >> +
> >> +		/*
> >> +		 * If @lpos_begin has fallen behind the tail lpos, the read
> >> +		 * block ID cannot be trusted. Fast forward @lpos_begin to the
> >> +		 * tail lpos and try again.
> >> +		 */
> >> +		if (lpos_begin - tail_lpos >= DATA_SIZE(data_ring)) {
> >> +			lpos_begin = tail_lpos;
> >> +			continue;
> >> +		}
> >
> > I am sorry if we have had this discussion already in past.
> 
> We have [0]. (Search for "Ouch.")

I see. Thanks a lot for the pointer.

> > Well, it would just prove that it really needs a comment why this
> > check is necessary.
> 
> The comment says why it is necessary. The previous read of the block ID
> cannot be trusted if the the tail has been pushed beyond it.

Not really. The comment describes what the check does. But it does not
explain why it is needed. The reason might be described be something like:

		* Make sure that the id read from tail_lpos is really
		* pointing to this lpos. The block might have been
		* reused in the meantime. Make sure to do not make
		* the new owner reusable.

But wait! This situation should get caught by the two existing descriptor
checks:

>		case desc_committed:
>			/*
>			 * This data block is invalid if the descriptor
>			 * does not point back to it.
>			 */
>			if (blk_lpos->begin != lpos_begin)
>				return false;
>			desc_make_reusable(desc_ring, id);
>			break;
>		case desc_reusable:
>			/*
>			 * This data block is invalid if the descriptor
>			 * does not point back to it.
>			 */
>			if (blk_lpos->begin != lpos_begin)
>				return false;
>			break;

Here again the comments describe what the check does but not why.
I would write something like:

			/*
			 * The block might have already been
			 * reused. Make sure that the descriptor really
			 * points back to the checked lpos. It covers
			 * both situations. Random data might point to
			 * a valid descriptor just by chance. Or the block
			 * has been already reused by another descriptor.
			 */

BTW: In theory, it might happen that the ringbuffer is reused many
times in the meantime and (blk_lpos->begin == lpos_begin) again.
But then lpos_begin would be valid lpos again and even the range
check would not catch this as well.

So, I think that the lpos range check is still redundant. We might
describe it as an extra paranoid check but I am not sure if it is
worth it.

But I would remove it and keep the code "simple" and design "clear".
Well, I do not resist on it.

Best Regards,
Petr

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: redundant check in make_data_reusable(): was [PATCH v2 2/3] printk: add lockless buffer
  2020-06-10  9:38       ` Petr Mladek
@ 2020-06-10 10:24         ` John Ogness
  2020-06-10 14:56           ` John Ogness
  2020-06-11 13:55           ` Petr Mladek
  0 siblings, 2 replies; 30+ messages in thread
From: John Ogness @ 2020-06-10 10:24 UTC (permalink / raw)
  To: Petr Mladek
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel, Paul McKenney

On 2020-06-10, Petr Mladek <pmladek@suse.com> wrote:
>> >> --- /dev/null
>> >> +++ b/kernel/printk/printk_ringbuffer.c
>> >> +/*
>> >> + * Given a data ring (text or dict), put the associated descriptor of each
>> >> + * data block from @lpos_begin until @lpos_end into the reusable state.
>> >> + *
>>>> + * If there is any problem making the associated descriptor reusable, either
>>>> + * the descriptor has not yet been committed or another writer task has
>>>> + * already pushed the tail lpos past the problematic data block. Regardless,
>>>> + * on error the caller can re-load the tail lpos to determine the situation.
>>>> + */
>>>> +static bool data_make_reusable(struct printk_ringbuffer *rb,
>>>> +			       struct prb_data_ring *data_ring,
>>>> +			       unsigned long lpos_begin,
>>>> +			       unsigned long lpos_end,
>>>> +			       unsigned long *lpos_out)
>>>> +{
>>>> +	struct prb_desc_ring *desc_ring = &rb->desc_ring;
>>>> +	struct prb_data_blk_lpos *blk_lpos;
>>>> +	struct prb_data_block *blk;
>>>> +	unsigned long tail_lpos;
>>>> +	enum desc_state d_state;
>>>> +	struct prb_desc desc;
>>>> +	unsigned long id;
>>>> +
>>>> +	/*
>>>> +	 * Using the provided @data_ring, point @blk_lpos to the correct
>>>> +	 * blk_lpos within the local copy of the descriptor.
>>>> +	 */
>>>> +	if (data_ring == &rb->text_data_ring)
>>>> +		blk_lpos = &desc.text_blk_lpos;
>>>> +	else
>>>> +		blk_lpos = &desc.dict_blk_lpos;
>>>> +
>>>> +	/* Loop until @lpos_begin has advanced to or beyond @lpos_end. */
>>>> +	while ((lpos_end - lpos_begin) - 1 < DATA_SIZE(data_ring)) {
>>>> +		blk = to_block(data_ring, lpos_begin);
>>>> +		id = READ_ONCE(blk->id); /* LMM(data_make_reusable:A) */
>>>> +
>>>> +		/*
>>>> +		 * Guarantee the block ID is loaded before checking the tail
>>>> +		 * lpos. The loaded block ID can only be considered valid if
>>>> +		 * the tail lpos has not overtaken @lpos_begin. This pairs
>>>> +		 * with data_alloc:A.
>>>> +		 *
>>>> +		 * Memory barrier involvement:
>>>> +		 *
>>>> +		 * If data_make_reusable:A reads from data_alloc:B, then
>>>> +		 * data_make_reusable:C reads from data_push_tail:D.
>>>> +		 *
>>>> +		 * Relies on:
>>>> +		 *
>>>> +		 * MB from data_push_tail:D to data_alloc:B
>>>> +		 *    matching
>>>> +		 * RMB from data_make_reusable:A to data_make_reusable:C
>>>> +		 *
>>>> +		 * Note: data_push_tail:D and data_alloc:B can be different
>>>> +		 *       CPUs. However, the data_alloc:B CPU (which performs
>>>> +		 *       the full memory barrier) must have previously seen
>>>> +		 *       data_push_tail:D.
>>>> +		 */
>>>> +		smp_rmb(); /* LMM(data_make_reusable:B) */
>>>> +
>>>> +		tail_lpos = atomic_long_read(&data_ring->tail_lpos
>>>> +					); /* LMM(data_make_reusable:C) */
>>>> +
>>>> +		/*
>>>> +		 * If @lpos_begin has fallen behind the tail lpos, the read
>>>> +		 * block ID cannot be trusted. Fast forward @lpos_begin to the
>>>> +		 * tail lpos and try again.
>>>> +		 */
>>>> +		if (lpos_begin - tail_lpos >= DATA_SIZE(data_ring)) {
>>>> +			lpos_begin = tail_lpos;
>>>> +			continue;
>>>> +		}
>>>
>>> I am sorry if we have had this discussion already in past.
>> 
>> We have [0]. (Search for "Ouch.")
>
> I see. Thanks a lot for the pointer.
>
>>> Well, it would just prove that it really needs a comment why this
>>> check is necessary.
>> 
>> The comment says why it is necessary. The previous read of the block ID
>> cannot be trusted if the the tail has been pushed beyond it.
>
> Not really. The comment describes what the check does. But it does not
> explain why it is needed. The reason might be described be something like:
>
> 		* Make sure that the id read from tail_lpos is really
> 		* pointing to this lpos. The block might have been
> 		* reused in the meantime. Make sure to do not make
> 		* the new owner reusable.

That is _not_ what this check is doing. I recommend looking closely at
the example you posted. This is not about whether or not a descriptor is
pointing to this lpos. In your example you showed that ID, state, and
lpos values could all look good, but it is for the _new_ record and we
should _not_ invalidate that one.

We can detect the scenario you pointed out by verifying the tail has not
moved beyond the data block that the ID was read from. The comment for
this check says:

    If @lpos_begin has fallen behind the tail lpos,
    the read block ID cannot be trusted.

This is exactly the why. It is only about whether we can trust that a
non-garbage block ID was read. Or do you want me to add:

    ... because data read that is behind the tail lpos must be
    considered garbage.

> But wait! This situation should get caught by the two existing descriptor
> checks:
>
>>		case desc_committed:
>>			/*
>>			 * This data block is invalid if the descriptor
>>			 * does not point back to it.
>>			 */
>>			if (blk_lpos->begin != lpos_begin)
>>				return false;
>>			desc_make_reusable(desc_ring, id);
>>			break;
>>		case desc_reusable:
>>			/*
>>			 * This data block is invalid if the descriptor
>>			 * does not point back to it.
>>			 */
>>			if (blk_lpos->begin != lpos_begin)
>>				return false;
>>			break;

No. Your example showed that it is not caught here.

> Here again the comments describe what the check does but not why.
> I would write something like:
>
> 			/*
> 			 * The block might have already been
> 			 * reused. Make sure that the descriptor really
> 			 * points back to the checked lpos. It covers
> 			 * both situations. Random data might point to
> 			 * a valid descriptor just by chance. Or the block
> 			 * has been already reused by another descriptor.
> 			 */

OK. I will expand the comments to something similar to this.

> So, I think that the lpos range check is still redundant. We might
> describe it as an extra paranoid check but I am not sure if it is
> worth it.
>
> But I would remove it and keep the code "simple" and design "clear".
> Well, I do not resist on it.

If we remove it, we end up back at your bug report. ;-)

John Ogness

[0] https://lkml.kernel.org/r/87ftecy343.fsf@linutronix.de

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: blk->id read race: was: [PATCH v2 2/3] printk: add lockless buffer
  2020-06-10  8:42       ` Petr Mladek
@ 2020-06-10 13:55         ` John Ogness
  0 siblings, 0 replies; 30+ messages in thread
From: John Ogness @ 2020-06-10 13:55 UTC (permalink / raw)
  To: Petr Mladek
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel, Paul McKenney

On 2020-06-10, Petr Mladek <pmladek@suse.com> wrote:
>>>> --- /dev/null
>>>> +++ b/kernel/printk/printk_ringbuffer.c
>>>> +/*
>>>> + * Given a data ring (text or dict), put the associated descriptor of each
>>>> + * data block from @lpos_begin until @lpos_end into the reusable state.
>>>> + *
>>>> + * If there is any problem making the associated descriptor reusable, either
>>>> + * the descriptor has not yet been committed or another writer task has
>>>> + * already pushed the tail lpos past the problematic data block. Regardless,
>>>> + * on error the caller can re-load the tail lpos to determine the situation.
>>>> + */
>>>> +static bool data_make_reusable(struct printk_ringbuffer *rb,
>>>> +			       struct prb_data_ring *data_ring,
>>>> +			       unsigned long lpos_begin,
>>>> +			       unsigned long lpos_end,
>>>> +			       unsigned long *lpos_out)
>>>> +{
>>>> +	struct prb_desc_ring *desc_ring = &rb->desc_ring;
>>>> +	struct prb_data_blk_lpos *blk_lpos;
>>>> +	struct prb_data_block *blk;
>>>> +	unsigned long tail_lpos;
>>>> +	enum desc_state d_state;
>>>> +	struct prb_desc desc;
>>>> +	unsigned long id;
>>>> +
>>>> +	/*
>>>> +	 * Using the provided @data_ring, point @blk_lpos to the correct
>>>> +	 * blk_lpos within the local copy of the descriptor.
>>>> +	 */
>>>> +	if (data_ring == &rb->text_data_ring)
>>>> +		blk_lpos = &desc.text_blk_lpos;
>>>> +	else
>>>> +		blk_lpos = &desc.dict_blk_lpos;
>>>> +
>>>> +	/* Loop until @lpos_begin has advanced to or beyond @lpos_end. */
>>>> +	while ((lpos_end - lpos_begin) - 1 < DATA_SIZE(data_ring)) {
>>>> +		blk = to_block(data_ring, lpos_begin);
>>>> +		id = READ_ONCE(blk->id); /* LMM(data_make_reusable:A) */
>>>
>>> This would deserve some comment:
>
> I wonder if the comment might look like:
>
> /*
>  * No barrier is needed between reading tail_lpos and the related
>  * blk->id. Only CPU that modifies tail_lpos via cmpxchg is allowed
>  * to modify the related blk->id. CPUs that see the moved tail_lpos
>  * are looking at another block related to the new tail_lpos.
>  * It does not mater when the previous winner modifies the previous
>  * block.
>  */

Sorry, but this comment does not make sense for me. The tail is pushed
_before_ the block ID is modified. So any kind of barrier here (where we
read the tail, then the block ID, i.e. the same order) would be
inappropriate anyway.

Also, this comment only talks about when a new value is seen, but not
about when the old value is seen. IMO it is seeing the old value that is
worthy of a comment since that is the only case with a data race.

In preparation for my next version I have added the following comment:

        blk = to_block(data_ring, lpos_begin);

        /*
         * When going from lpos to block pointer, the wrap around
         * feature of the lpos value is lost. Since another CPU could
         * invalidate this data area at any time, the data tail must
         * be re-checked after the block ID has been read.
         */

        id = blk->id; /* LMM(data_make_reusable:A) */

I think this comment also helps to further clarify "why" the following
data tail check occurs.

John Ogness

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: redundant check in make_data_reusable(): was [PATCH v2 2/3] printk: add lockless buffer
  2020-06-10 10:24         ` John Ogness
@ 2020-06-10 14:56           ` John Ogness
  2020-06-11 19:51             ` John Ogness
  2020-06-11 13:55           ` Petr Mladek
  1 sibling, 1 reply; 30+ messages in thread
From: John Ogness @ 2020-06-10 14:56 UTC (permalink / raw)
  To: Petr Mladek
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel, Paul McKenney

On 2020-06-10, Petr Mladek <pmladek@suse.com> wrote:
>> +static bool data_make_reusable(struct printk_ringbuffer *rb,
>> +			       struct prb_data_ring *data_ring,
>> +			       unsigned long lpos_begin,
>> +			       unsigned long lpos_end,
>> +			       unsigned long *lpos_out)
>> +{
>> +	struct prb_desc_ring *desc_ring = &rb->desc_ring;
>> +	struct prb_data_blk_lpos *blk_lpos;
>> +	struct prb_data_block *blk;
>> +	unsigned long tail_lpos;
>> +	enum desc_state d_state;
>> +	struct prb_desc desc;
>> +	unsigned long id;
>> +
>> +	/*
>> +	 * Using the provided @data_ring, point @blk_lpos to the correct
>> +	 * blk_lpos within the local copy of the descriptor.
>> +	 */
>> +	if (data_ring == &rb->text_data_ring)
>> +		blk_lpos = &desc.text_blk_lpos;
>> +	else
>> +		blk_lpos = &desc.dict_blk_lpos;
>> +
>> +	/* Loop until @lpos_begin has advanced to or beyond @lpos_end. */
>> +	while ((lpos_end - lpos_begin) - 1 < DATA_SIZE(data_ring)) {
>> +		blk = to_block(data_ring, lpos_begin);
>> +		id = READ_ONCE(blk->id); /* LMM(data_make_reusable:A) */
>> +
>> +		/*
>> +		 * Guarantee the block ID is loaded before checking the tail
>> +		 * lpos. The loaded block ID can only be considered valid if
>> +		 * the tail lpos has not overtaken @lpos_begin. This pairs
>> +		 * with data_alloc:A.
>> +		 *
>> +		 * Memory barrier involvement:
>> +		 *
>> +		 * If data_make_reusable:A reads from data_alloc:B, then
>> +		 * data_make_reusable:C reads from data_push_tail:D.
>> +		 *
>> +		 * Relies on:
>> +		 *
>> +		 * MB from data_push_tail:D to data_alloc:B
>> +		 *    matching
>> +		 * RMB from data_make_reusable:A to data_make_reusable:C
>> +		 *
>> +		 * Note: data_push_tail:D and data_alloc:B can be different
>> +		 *       CPUs. However, the data_alloc:B CPU (which performs
>> +		 *       the full memory barrier) must have previously seen
>> +		 *       data_push_tail:D.
>> +		 */
>> +		smp_rmb(); /* LMM(data_make_reusable:B) */
>> +
>> +		tail_lpos = atomic_long_read(&data_ring->tail_lpos
>> +					); /* LMM(data_make_reusable:C) */
>> +
>> +		/*
>> +		 * If @lpos_begin has fallen behind the tail lpos, the read
>> +		 * block ID cannot be trusted. Fast forward @lpos_begin to the
>> +		 * tail lpos and try again.
>> +		 */
>> +		if (lpos_begin - tail_lpos >= DATA_SIZE(data_ring)) {
>> +			lpos_begin = tail_lpos;
>> +			continue;
>> +		}
>> +
>> +		d_state = desc_read(desc_ring, id,
>> +				    &desc); /* LMM(data_make_reusable:D) */
>> +
>> +		switch (d_state) {
>> +		case desc_miss:
>> +			return false;
>> +		case desc_reserved:
>> +			return false;
>> +		case desc_committed:
>> +			/*
>> +			 * This data block is invalid if the descriptor
>> +			 * does not point back to it.
>> +			 */
>
> Here again the comments describe what the check does but not why.
> I would write something like:
>
> 			/*
> 			 * The block might have already been
> 			 * reused. Make sure that the descriptor really
> 			 * points back to the checked lpos. It covers
> 			 * both situations. Random data might point to
> 			 * a valid descriptor just by chance. Or the block
> 			 * has been already reused by another descriptor.
> 			 */

Originally this check was needed because the descriptor would be read
even if there was a data race reading the ID from the data
block. Validating the lpos value was a kind of protection against
reading random data that by chance yielded an ID of a committed/reusable
descriptor.

However, after you pointed out that this check was not enough, the code
now re-checks the data tail to make sure that no data race happened. So
actually it is not possible that a descriptor in the committed/reusable
state will point anywhere else. We know the ID is not random garbage or
recycled, so the state can be trusted.

I recommend to either remove this sanity check (for committed and
reusable) or at least change it to:

			WARN_ON_ONCE(blk_lpos->begin != lpos_begin);

Or can you see any possibility of this case?

>> +			if (blk_lpos->begin != lpos_begin)
>> +				return false;
>> +			desc_make_reusable(desc_ring, id);
>> +			break;
>> +		case desc_reusable:
>> +			/*
>> +			 * This data block is invalid if the descriptor
>> +			 * does not point back to it.
>> +			 */
>> +			if (blk_lpos->begin != lpos_begin)
>> +				return false;
>> +			break;
>> +		}
>> +
>> +		/* Advance @lpos_begin to the next data block. */
>> +		lpos_begin = blk_lpos->next;
>> +	}

John Ogness

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: Barrier before pushing desc_ring tail: was [PATCH v2 2/3] printk: add lockless buffer
  2020-06-09 15:56     ` John Ogness
@ 2020-06-11 12:01       ` Petr Mladek
  2020-06-11 23:06         ` John Ogness
  0 siblings, 1 reply; 30+ messages in thread
From: Petr Mladek @ 2020-06-11 12:01 UTC (permalink / raw)
  To: John Ogness
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel, Paul McKenney

On Tue 2020-06-09 17:56:03, John Ogness wrote:
> On 2020-06-09, Petr Mladek <pmladek@suse.com> wrote:
> >> --- /dev/null
> >> +++ b/kernel/printk/printk_ringbuffer.c
> >> +/*
> >> + * Advance the desc ring tail. This function advances the tail by one
> >> + * descriptor, thus invalidating the oldest descriptor. Before advancing
> >> + * the tail, the tail descriptor is made reusable and all data blocks up to
> >> + * and including the descriptor's data block are invalidated (i.e. the data
> >> + * ring tail is pushed past the data block of the descriptor being made
> >> + * reusable).
> >> + */
> >> +static bool desc_push_tail(struct printk_ringbuffer *rb,
> >> +			   unsigned long tail_id)
> >> +{
> >> +	struct prb_desc_ring *desc_ring = &rb->desc_ring;
> >> +	enum desc_state d_state;
> >> +	struct prb_desc desc;
> >> +
> >> +	d_state = desc_read(desc_ring, tail_id, &desc);
> >> +
> >> +	switch (d_state) {
> >> +	case desc_miss:
> >> +		/*
> >> +		 * If the ID is exactly 1 wrap behind the expected, it is
> >> +		 * in the process of being reserved by another writer and
> >> +		 * must be considered reserved.
> >> +		 */
> >> +		if (DESC_ID(atomic_long_read(&desc.state_var)) ==
> >> +		    DESC_ID_PREV_WRAP(desc_ring, tail_id)) {
> >> +			return false;
> >> +		}
> >> +
> >> +		/*
> >> +		 * The ID has changed. Another writer must have pushed the
> >> +		 * tail and recycled the descriptor already. Success is
> >> +		 * returned because the caller is only interested in the
> >> +		 * specified tail being pushed, which it was.
> >> +		 */
> >> +		return true;
> >> +	case desc_reserved:
> >> +		return false;
> >> +	case desc_committed:
> >> +		desc_make_reusable(desc_ring, tail_id);
> >> +		break;
> >> +	case desc_reusable:
> >> +		break;
> >> +	}
> >> +
> >> +	/*
> >> +	 * Data blocks must be invalidated before their associated
> >> +	 * descriptor can be made available for recycling. Invalidating
> >> +	 * them later is not possible because there is no way to trust
> >> +	 * data blocks once their associated descriptor is gone.
> >> +	 */
> >> +
> >> +	if (!data_push_tail(rb, &rb->text_data_ring, desc.text_blk_lpos.next))
> >> +		return false;
> >> +	if (!data_push_tail(rb, &rb->dict_data_ring, desc.dict_blk_lpos.next))
> >> +		return false;
> >> +
> >> +	/*
> >> +	 * Check the next descriptor after @tail_id before pushing the tail
> >> +	 * to it because the tail must always be in a committed or reusable
> >> +	 * state. The implementation of prb_first_seq() relies on this.
> >> +	 *
> >> +	 * A successful read implies that the next descriptor is less than or
> >> +	 * equal to @head_id so there is no risk of pushing the tail past the
> >> +	 * head.
> >> +	 */
> >> +	d_state = desc_read(desc_ring, DESC_ID(tail_id + 1),
> >> +			    &desc); /* LMM(desc_push_tail:A) */
> >> +	if (d_state == desc_committed || d_state == desc_reusable) {
> >> +		/*
> >> +		 * Any CPU that loads the new tail ID, must see that the
> >> +		 * descriptor at @tail_id is in the reusable state. See the
> >> +		 * read memory barrier part of desc_reserve:D for details.
> >> +		 */
> >> +		atomic_long_cmpxchg_relaxed(&desc_ring->tail_id, tail_id,
> >> +			DESC_ID(tail_id + 1)); /* LMM(desc_push_tail:B) */
> >
> > I was quite confused by the above comment. Does it mean that we need
> > a barrier here? Or does it explain why the cmpxchg has its own
> > LMM marker?
> 
> This LMM marker is referenced quite often, but since it is a relaxed
> cmpxchg(), its significance is not immediately clear. I was hoping to
> add some hints as to why it is significant. The comment that it is
> referring to is:
> 
> 	/*
> 	 * Guarantee the tail ID is read before validating the
> 	 * recycled descriptor state. A read memory barrier is
> 	 * sufficient for this. This pairs with data_push_tail:C.
> 	 *
> 	 * Memory barrier involvement:
> 	 *
> 	 * If desc_reserve:C reads from desc_push_tail:B, then
> 	 * desc_reserve:F reads from desc_make_reusable:A.
> 	 *
> 	 * Relies on:
> 	 *
> 	 * MB from desc_make_reusable:A to desc_push_tail:B
> 	 *    matching
> 	 * RMB from desc_reserve:C to desc_reserve:F
> 	 *
> 	 * Note: desc_make_reusable:A, desc_push_tail:B, and
> 	 *       data_push_tail:C can all be different CPUs. However,
> 	 *       the desc_push_tail:B CPU must have previously seen
> 	 *       data_push_tail:D and the data_push_tail:D CPU (which
> 	 *       performs the full memory barrier) must have
> 	 *       previously seen desc_make_reusable:A.
> 	 */
> 
> English translation:
> 
> In order to push the data tail, a CPU must first see that the associated
> descriptor is in the reusable state. Since a full memory barrier is
> performed after that sighting and before doing the data tail push, _any_
> CPU that sees the pushed data tail will be able to see that the
> associated descriptor is in the reusable state.
> 
> In order to push the descriptor tail, a CPU must first see that the
> associated data tail has been pushed. Therefore, that CPU would also see
> that the associated descriptor is in the reusable state.

Thanks a lot for this detailed description. It helped a lot.

Let me try another description from slightly different angle:

All this relies on the fact the the full barrier is called in
data_push_tail() and data_push_tail() is called right above.
But there are two situations where the barrier is not called.
It is when:

  1. desc.text_blk_lpos.next already is behind data_ring->tail_lpos.

     This is safe.

     It might happen when there was a race in the past. CPU1 reserved
     a descriptor before CPU2 and CPU2 was able to allocate data block
     before CPU1.

     As a result, both descriptors and both data blocks were moved
     into reusable state when the earlier descriptor was reused.
     It is because it pointed to newer data block and the older data
     block must have been invalidated together with the newer
     descriptor.

     Now, the full barrier was called before tail_lpos was moved.
     Both descriptors must have been in the reusable state already.


  2. desc.text_blk_lpos == INVALID_LPOS.

     It seems that this is not synchronized and other CPUs might see
     the old state.

     It happens for data blocks that do not have any data. So it
     probably does not cause real problems but ...



> > I think that we actually need a full barrier here to make sure that
> > all CPUs see the changes made by data_push_tail() before we
> > allow to rewrite the descriptor. The changes in data_push_tail() might
> > be done on different CPUs.
> 
> How so? That memory barrier exists to make sure the reusable descriptor
> state is stored before pushing the data tail. This is important for
> readers (which start from the data tail) so they can notice if the
> descriptor has since been invalidated (reusable state).
> 
> But where is it important that the data tail change is seen before the
> descriptor tail change? How are the data tail and descriptor tail
> significantly related to each other?

I have to admit that I did not think about it deeply enough. It was
more about feeling and seeing similar pattern.

You have a point. The state value of the descriptor is the central
point. It is used to synchronize operations with both desc_ring and
data_ring.

The state is modified only once. So one (full) memory barrier should
be enough to synchronize all CPUs.

The state has to be modified before the data block could be reused.
Therefore the full barrier has to be already in the data_ring code.

The question is what to do with the empty data case. I see three
possibilities:

  1. Ignore the case with empty block because it (probably) does not
     cause real problems.

  2. Call the full barrier in data_push_tail() even when the data
     block is empty.

  3. Call the full barrier also in desc_push_tail() as I suggested.


My opinion:

I prefer 3rd solution. The barrier would be superfluous in the most
common situation. But it would create more error-proof code.
We could always optimize it when it causes problems.

Anyway, I would feel very uneasy about the the 1st solution. And the
2nd solution is weird. It would be hard to explain.


Best Regards,
Petr

PS: I start feeling more confident about the code. The more barriers
the less possible races ;-)

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: redundant check in make_data_reusable(): was [PATCH v2 2/3] printk: add lockless buffer
  2020-06-10 10:24         ` John Ogness
  2020-06-10 14:56           ` John Ogness
@ 2020-06-11 13:55           ` Petr Mladek
  2020-06-11 20:25             ` John Ogness
  1 sibling, 1 reply; 30+ messages in thread
From: Petr Mladek @ 2020-06-11 13:55 UTC (permalink / raw)
  To: John Ogness
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel, Paul McKenney

On Wed 2020-06-10 12:24:01, John Ogness wrote:
> On 2020-06-10, Petr Mladek <pmladek@suse.com> wrote:
> >> >> --- /dev/null
> >> >> +++ b/kernel/printk/printk_ringbuffer.c
> >> >> +/*
> >> >> + * Given a data ring (text or dict), put the associated descriptor of each
> >> >> + * data block from @lpos_begin until @lpos_end into the reusable state.
> >> >> + *
> >>>> + * If there is any problem making the associated descriptor reusable, either
> >>>> + * the descriptor has not yet been committed or another writer task has
> >>>> + * already pushed the tail lpos past the problematic data block. Regardless,
> >>>> + * on error the caller can re-load the tail lpos to determine the situation.
> >>>> + */
> >>>> +static bool data_make_reusable(struct printk_ringbuffer *rb,
> >>>> +			       struct prb_data_ring *data_ring,
> >>>> +			       unsigned long lpos_begin,
> >>>> +			       unsigned long lpos_end,
> >>>> +			       unsigned long *lpos_out)
> >>>> +{
> >>>> +	struct prb_desc_ring *desc_ring = &rb->desc_ring;
> >>>> +	struct prb_data_blk_lpos *blk_lpos;
> >>>> +	struct prb_data_block *blk;
> >>>> +	unsigned long tail_lpos;
> >>>> +	enum desc_state d_state;
> >>>> +	struct prb_desc desc;
> >>>> +	unsigned long id;
> >>>> +
> >>>> +	/*
> >>>> +	 * Using the provided @data_ring, point @blk_lpos to the correct
> >>>> +	 * blk_lpos within the local copy of the descriptor.
> >>>> +	 */
> >>>> +	if (data_ring == &rb->text_data_ring)
> >>>> +		blk_lpos = &desc.text_blk_lpos;
> >>>> +	else
> >>>> +		blk_lpos = &desc.dict_blk_lpos;
> >>>> +
> >>>> +	/* Loop until @lpos_begin has advanced to or beyond @lpos_end. */
> >>>> +	while ((lpos_end - lpos_begin) - 1 < DATA_SIZE(data_ring)) {
> >>>> +		blk = to_block(data_ring, lpos_begin);
> >>>> +		id = READ_ONCE(blk->id); /* LMM(data_make_reusable:A) */
> >>>> +
> >>>> +		/*
> >>>> +		 * Guarantee the block ID is loaded before checking the tail
> >>>> +		 * lpos. The loaded block ID can only be considered valid if
> >>>> +		 * the tail lpos has not overtaken @lpos_begin. This pairs
> >>>> +		 * with data_alloc:A.
> >>>> +		 *
> >>>> +		 * Memory barrier involvement:
> >>>> +		 *
> >>>> +		 * If data_make_reusable:A reads from data_alloc:B, then
> >>>> +		 * data_make_reusable:C reads from data_push_tail:D.
> >>>> +		 *
> >>>> +		 * Relies on:
> >>>> +		 *
> >>>> +		 * MB from data_push_tail:D to data_alloc:B
> >>>> +		 *    matching
> >>>> +		 * RMB from data_make_reusable:A to data_make_reusable:C
> >>>> +		 *
> >>>> +		 * Note: data_push_tail:D and data_alloc:B can be different
> >>>> +		 *       CPUs. However, the data_alloc:B CPU (which performs
> >>>> +		 *       the full memory barrier) must have previously seen
> >>>> +		 *       data_push_tail:D.
> >>>> +		 */
> >>>> +		smp_rmb(); /* LMM(data_make_reusable:B) */
> >>>> +
> >>>> +		tail_lpos = atomic_long_read(&data_ring->tail_lpos
> >>>> +					); /* LMM(data_make_reusable:C) */
> >>>> +
> >>>> +		/*
> >>>> +		 * If @lpos_begin has fallen behind the tail lpos, the read
> >>>> +		 * block ID cannot be trusted. Fast forward @lpos_begin to the
> >>>> +		 * tail lpos and try again.
> >>>> +		 */
> >>>> +		if (lpos_begin - tail_lpos >= DATA_SIZE(data_ring)) {
> >>>> +			lpos_begin = tail_lpos;
> >>>> +			continue;
> >>>> +		}
> >>>
> >>> I am sorry if we have had this discussion already in past.
> >> 
> >> We have [0]. (Search for "Ouch.")
> >
> > I see. Thanks a lot for the pointer.
> >
> >>> Well, it would just prove that it really needs a comment why this
> >>> check is necessary.
> >> 
> >> The comment says why it is necessary. The previous read of the block ID
> >> cannot be trusted if the the tail has been pushed beyond it.
> >
> > Not really. The comment describes what the check does. But it does not
> > explain why it is needed. The reason might be described be something like:
> >
> > 		* Make sure that the id read from tail_lpos is really
> > 		* pointing to this lpos. The block might have been
> > 		* reused in the meantime. Make sure to do not make
> > 		* the new owner reusable.
> 
> That is _not_ what this check is doing. I recommend looking closely at
> the example you posted. This is not about whether or not a descriptor is
> pointing to this lpos. In your example you showed that ID, state, and
> lpos values could all look good, but it is for the _new_ record and we
> should _not_ invalidate that one.

OK, let's make sure that we are talking about the same example.
I was talking about this one from
https://lore.kernel.org/lkml/87ftecy343.fsf@linutronix.de/

% [*] Another problem would be when data_make_reusable() see the new
%     data already in the committed state. It would make fresh new
%     data reusable.
%
%     I mean the following:
%
% CPU0				CPU1
%
% data_alloc()
%   begin_lpos = dr->head_lpos
%   data_push_tail()
%     lpos = dr->tail_lpos
%				prb_reserve()
%				  # reserve the location of current
%				  # dr->tail_lpos
%				prb_commit()
%
%     id = blk->id
%     # read id for the freshly written data on CPU1
%     # and happily make them reusable
%     data_make_reusable()

Sigh, sigh, sigh, there is a hugely misleading comment in the example:

%				  # reserve the location of current
%				  # dr->tail_lpos

It is true that it reserves part of this location. But it will use
data_ring->head_lpos for the related desc->text_blk_lpos.begin !!!

See below:

> We can detect the scenario you pointed out by verifying the tail has not
> moved beyond the data block that the ID was read from. The comment for
> this check says:
> 
>     If @lpos_begin has fallen behind the tail lpos,
>     the read block ID cannot be trusted.
> 
> This is exactly the why. It is only about whether we can trust that a
> non-garbage block ID was read. Or do you want me to add:
> 
>     ... because data read that is behind the tail lpos must be
>     considered garbage.
> 
> > But wait! This situation should get caught by the two existing descriptor
> > checks:
> >
> >>		case desc_committed:
> >>			/*
> >>			 * This data block is invalid if the descriptor
> >>			 * does not point back to it.
> >>			 */
> >>			if (blk_lpos->begin != lpos_begin)
> >>				return false;
> >>			desc_make_reusable(desc_ring, id);
> >>			break;
> >>		case desc_reusable:
> >>			/*
> >>			 * This data block is invalid if the descriptor
> >>			 * does not point back to it.
> >>			 */
> >>			if (blk_lpos->begin != lpos_begin)
> >>				return false;
> >>			break;
> 
> No. Your example showed that it is not caught here.

I am afraid that my example was wrong:

If blk->id comes from the new descriptor written by CPU1 then
blk_lpos->begin is based on the old data_ring->head_lpos.
Then it is different from lpos_begin.


Let's put it another way. The state of the descriptor defines validity
of the data. Descriptor in committed state _must not_ point to invalid
data block!!!

If a descriptor in committed state point to lpos that was in invalid range
before reading the descriptor then we have a huge hole in the design.

This is why I believe that the check of the descriptor must be enough.


Best Regards,
Petr

PS: I am sorry if I create too much confusion. It is easy to
get lost :-(

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: redundant check in make_data_reusable(): was [PATCH v2 2/3] printk: add lockless buffer
  2020-06-10 14:56           ` John Ogness
@ 2020-06-11 19:51             ` John Ogness
  0 siblings, 0 replies; 30+ messages in thread
From: John Ogness @ 2020-06-11 19:51 UTC (permalink / raw)
  To: Petr Mladek
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel, Paul McKenney

On 2020-06-10, John Ogness <john.ogness@linutronix.de> wrote:
>>> +static bool data_make_reusable(struct printk_ringbuffer *rb,
>>> +			       struct prb_data_ring *data_ring,
>>> +			       unsigned long lpos_begin,
>>> +			       unsigned long lpos_end,
>>> +			       unsigned long *lpos_out)
>>> +{
>>> +	struct prb_desc_ring *desc_ring = &rb->desc_ring;
>>> +	struct prb_data_blk_lpos *blk_lpos;
>>> +	struct prb_data_block *blk;
>>> +	unsigned long tail_lpos;
>>> +	enum desc_state d_state;
>>> +	struct prb_desc desc;
>>> +	unsigned long id;
>>> +
>>> +	/*
>>> +	 * Using the provided @data_ring, point @blk_lpos to the correct
>>> +	 * blk_lpos within the local copy of the descriptor.
>>> +	 */
>>> +	if (data_ring == &rb->text_data_ring)
>>> +		blk_lpos = &desc.text_blk_lpos;
>>> +	else
>>> +		blk_lpos = &desc.dict_blk_lpos;
>>> +
>>> +	/* Loop until @lpos_begin has advanced to or beyond @lpos_end. */
>>> +	while ((lpos_end - lpos_begin) - 1 < DATA_SIZE(data_ring)) {
>>> +		blk = to_block(data_ring, lpos_begin);
>>> +		id = READ_ONCE(blk->id); /* LMM(data_make_reusable:A) */
>>> +
>>> +		/*
>>> +		 * Guarantee the block ID is loaded before checking the tail
>>> +		 * lpos. The loaded block ID can only be considered valid if
>>> +		 * the tail lpos has not overtaken @lpos_begin. This pairs
>>> +		 * with data_alloc:A.
>>> +		 *
>>> +		 * Memory barrier involvement:
>>> +		 *
>>> +		 * If data_make_reusable:A reads from data_alloc:B, then
>>> +		 * data_make_reusable:C reads from data_push_tail:D.
>>> +		 *
>>> +		 * Relies on:
>>> +		 *
>>> +		 * MB from data_push_tail:D to data_alloc:B
>>> +		 *    matching
>>> +		 * RMB from data_make_reusable:A to data_make_reusable:C
>>> +		 *
>>> +		 * Note: data_push_tail:D and data_alloc:B can be different
>>> +		 *       CPUs. However, the data_alloc:B CPU (which performs
>>> +		 *       the full memory barrier) must have previously seen
>>> +		 *       data_push_tail:D.
>>> +		 */
>>> +		smp_rmb(); /* LMM(data_make_reusable:B) */
>>> +
>>> +		tail_lpos = atomic_long_read(&data_ring->tail_lpos
>>> +					); /* LMM(data_make_reusable:C) */
>>> +
>>> +		/*
>>> +		 * If @lpos_begin has fallen behind the tail lpos, the read
>>> +		 * block ID cannot be trusted. Fast forward @lpos_begin to the
>>> +		 * tail lpos and try again.
>>> +		 */
>>> +		if (lpos_begin - tail_lpos >= DATA_SIZE(data_ring)) {
>>> +			lpos_begin = tail_lpos;
>>> +			continue;
>>> +		}
>>> +
>>> +		d_state = desc_read(desc_ring, id,
>>> +				    &desc); /* LMM(data_make_reusable:D) */
>>> +
>>> +		switch (d_state) {
>>> +		case desc_miss:
>>> +			return false;
>>> +		case desc_reserved:
>>> +			return false;
>>> +		case desc_committed:
>>> +			/*
>>> +			 * This data block is invalid if the descriptor
>>> +			 * does not point back to it.
>>> +			 */
>>
>> Here again the comments describe what the check does but not why.
>> I would write something like:
>>
>> 			/*
>> 			 * The block might have already been
>> 			 * reused. Make sure that the descriptor really
>> 			 * points back to the checked lpos. It covers
>> 			 * both situations. Random data might point to
>> 			 * a valid descriptor just by chance. Or the block
>> 			 * has been already reused by another descriptor.
>> 			 */
>
> Originally this check was needed because the descriptor would be read
> even if there was a data race reading the ID from the data
> block. Validating the lpos value was a kind of protection against
> reading random data that by chance yielded an ID of a committed/reusable
> descriptor.
>
> However, after you pointed out that this check was not enough, the code
> now re-checks the data tail to make sure that no data race happened. So
> actually it is not possible that a descriptor in the committed/reusable
> state will point anywhere else. We know the ID is not random garbage or
> recycled, so the state can be trusted.
>
> I recommend to either remove this sanity check (for committed and
> reusable) or at least change it to:
>
> 			WARN_ON_ONCE(blk_lpos->begin != lpos_begin);

Sorry, I wasn't thinking things out correctly which this suggestion. We
need the checks. The explanation is as follows...

With regard to the block ID read (data_make_reusable:A):

Failing the following tail check is a guarantee that we read
garbage. Data behind the data tail must never be trusted. It is correct
to try again with the new tail value.

Passing the following tail check could still be a garbage read. The
descriptor owning that data block may be in the reserved state and has
not yet written the new block ID. In that case we read whatever random
data was there even though the tail check passes. And that random data
could yield a valid descriptor ID, resulting in some state value that
does not apply.

The "descriptor pointing back to data block" checks are playing the
critical role of protecting against data_make_reusable:A reading
garbage. And indeed, I can reproduce this situation fairly easily on
ppc64le+kvm tests.

So please disregard this suggestion. And yes, the comments for these
checks need to be expanded to make clear just how critical they are and
explain how the situation can occur.

John Ogness

>>> +			if (blk_lpos->begin != lpos_begin)
>>> +				return false;
>>> +			desc_make_reusable(desc_ring, id);
>>> +			break;
>>> +		case desc_reusable:
>>> +			/*
>>> +			 * This data block is invalid if the descriptor
>>> +			 * does not point back to it.
>>> +			 */
>>> +			if (blk_lpos->begin != lpos_begin)
>>> +				return false;
>>> +			break;
>>> +		}
>>> +
>>> +		/* Advance @lpos_begin to the next data block. */
>>> +		lpos_begin = blk_lpos->next;
>>> +	}

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: redundant check in make_data_reusable(): was [PATCH v2 2/3] printk: add lockless buffer
  2020-06-11 13:55           ` Petr Mladek
@ 2020-06-11 20:25             ` John Ogness
  0 siblings, 0 replies; 30+ messages in thread
From: John Ogness @ 2020-06-11 20:25 UTC (permalink / raw)
  To: Petr Mladek
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel, Paul McKenney

On 2020-06-11, Petr Mladek <pmladek@suse.com> wrote:
>>>>>> --- /dev/null
>>>>>> +++ b/kernel/printk/printk_ringbuffer.c
>>>>>> +/*
>>>>>> + * Given a data ring (text or dict), put the associated descriptor of each
>>>>>> + * data block from @lpos_begin until @lpos_end into the reusable state.
>>>>>> + *
>>>>>> + * If there is any problem making the associated descriptor reusable, either
>>>>>> + * the descriptor has not yet been committed or another writer task has
>>>>>> + * already pushed the tail lpos past the problematic data block. Regardless,
>>>>>> + * on error the caller can re-load the tail lpos to determine the situation.
>>>>>> + */
>>>>>> +static bool data_make_reusable(struct printk_ringbuffer *rb,
>>>>>> +			       struct prb_data_ring *data_ring,
>>>>>> +			       unsigned long lpos_begin,
>>>>>> +			       unsigned long lpos_end,
>>>>>> +			       unsigned long *lpos_out)
>>>>>> +{
>>>>>> +	struct prb_desc_ring *desc_ring = &rb->desc_ring;
>>>>>> +	struct prb_data_blk_lpos *blk_lpos;
>>>>>> +	struct prb_data_block *blk;
>>>>>> +	unsigned long tail_lpos;
>>>>>> +	enum desc_state d_state;
>>>>>> +	struct prb_desc desc;
>>>>>> +	unsigned long id;
>>>>>> +
>>>>>> +	/*
>>>>>> +	 * Using the provided @data_ring, point @blk_lpos to the correct
>>>>>> +	 * blk_lpos within the local copy of the descriptor.
>>>>>> +	 */
>>>>>> +	if (data_ring == &rb->text_data_ring)
>>>>>> +		blk_lpos = &desc.text_blk_lpos;
>>>>>> +	else
>>>>>> +		blk_lpos = &desc.dict_blk_lpos;
>>>>>> +
>>>>>> +	/* Loop until @lpos_begin has advanced to or beyond @lpos_end. */
>>>>>> +	while ((lpos_end - lpos_begin) - 1 < DATA_SIZE(data_ring)) {
>>>>>> +		blk = to_block(data_ring, lpos_begin);
>>>>>> +		id = READ_ONCE(blk->id); /* LMM(data_make_reusable:A) */
>>>>>> +
>>>>>> +		/*
>>>>>> +		 * Guarantee the block ID is loaded before checking the tail
>>>>>> +		 * lpos. The loaded block ID can only be considered valid if
>>>>>> +		 * the tail lpos has not overtaken @lpos_begin. This pairs
>>>>>> +		 * with data_alloc:A.
>>>>>> +		 *
>>>>>> +		 * Memory barrier involvement:
>>>>>> +		 *
>>>>>> +		 * If data_make_reusable:A reads from data_alloc:B, then
>>>>>> +		 * data_make_reusable:C reads from data_push_tail:D.
>>>>>> +		 *
>>>>>> +		 * Relies on:
>>>>>> +		 *
>>>>>> +		 * MB from data_push_tail:D to data_alloc:B
>>>>>> +		 *    matching
>>>>>> +		 * RMB from data_make_reusable:A to data_make_reusable:C
>>>>>> +		 *
>>>>>> +		 * Note: data_push_tail:D and data_alloc:B can be different
>>>>>> +		 *       CPUs. However, the data_alloc:B CPU (which performs
>>>>>> +		 *       the full memory barrier) must have previously seen
>>>>>> +		 *       data_push_tail:D.
>>>>>> +		 */
>>>>>> +		smp_rmb(); /* LMM(data_make_reusable:B) */
>>>>>> +
>>>>>> +		tail_lpos = atomic_long_read(&data_ring->tail_lpos
>>>>>> +					); /* LMM(data_make_reusable:C) */
>>>>>> +
>>>>>> +		/*
>>>>>> +		 * If @lpos_begin has fallen behind the tail lpos, the read
>>>>>> +		 * block ID cannot be trusted. Fast forward @lpos_begin to the
>>>>>> +		 * tail lpos and try again.
>>>>>> +		 */
>>>>>> +		if (lpos_begin - tail_lpos >= DATA_SIZE(data_ring)) {
>>>>>> +			lpos_begin = tail_lpos;
>>>>>> +			continue;
>>>>>> +		}
>>>>>
>>>>> I am sorry if we have had this discussion already in past.
>>>> 
>>>> We have [0]. (Search for "Ouch.")
>>>
>>> I see. Thanks a lot for the pointer.
>>>
>>>>> Well, it would just prove that it really needs a comment why this
>>>>> check is necessary.
>>>> 
>>>> The comment says why it is necessary. The previous read of the block ID
>>>> cannot be trusted if the the tail has been pushed beyond it.
>>>
>>> Not really. The comment describes what the check does. But it does not
>>> explain why it is needed. The reason might be described be something like:
>>>
>>> 		* Make sure that the id read from tail_lpos is really
>>> 		* pointing to this lpos. The block might have been
>>> 		* reused in the meantime. Make sure to do not make
>>> 		* the new owner reusable.
>> 
>> That is _not_ what this check is doing. I recommend looking closely at
>> the example you posted. This is not about whether or not a descriptor is
>> pointing to this lpos. In your example you showed that ID, state, and
>> lpos values could all look good, but it is for the _new_ record and we
>> should _not_ invalidate that one.
>
> OK, let's make sure that we are talking about the same example.
> I was talking about this one from
> https://lore.kernel.org/lkml/87ftecy343.fsf@linutronix.de/
>
> % [*] Another problem would be when data_make_reusable() see the new
> %     data already in the committed state. It would make fresh new
> %     data reusable.
> %
> %     I mean the following:
> %
> % CPU0				CPU1
> %
> % data_alloc()
> %   begin_lpos = dr->head_lpos
> %   data_push_tail()
> %     lpos = dr->tail_lpos
> %				prb_reserve()
> %				  # reserve the location of current
> %				  # dr->tail_lpos
> %				prb_commit()
> %
> %     id = blk->id
> %     # read id for the freshly written data on CPU1
> %     # and happily make them reusable
> %     data_make_reusable()
>
> Sigh, sigh, sigh, there is a hugely misleading comment in the example:
>
> %				  # reserve the location of current
> %				  # dr->tail_lpos
>
> It is true that it reserves part of this location. But it will use
> data_ring->head_lpos for the related desc->text_blk_lpos.begin !!!

Aaargh! You are right!

> If blk->id comes from the new descriptor written by CPU1 then
> blk_lpos->begin is based on the old data_ring->head_lpos.
> Then it is different from lpos_begin.
>
> Let's put it another way. The state of the descriptor defines validity
> of the data. Descriptor in committed state _must not_ point to invalid
> data block!!!
>
> If a descriptor in committed state point to lpos that was in invalid
> range before reading the descriptor then we have a huge hole in the
> design.
>
> This is why I believe that the check of the descriptor must be enough.

You are right. The smp_rmb (data_make_reusable:B) and its following tail
check are not needed. Since data_make_reusable:A can read garbage even
if we pass the tail check, we might as well always allow garbage and
rely on the descriptor/lpos checks to catch it. (Actually, that was the
design!)

However, the pairing smp_mb (data_alloc:A) is still needed, but it will
then pair with data_push_tail:A. If data_make_reusable() reads garbage
(maybe newly written garbage), it is important that a new data tail is
visible.

The comment for data_alloc:A would change to something like:

	/*
	 * Guarantee any updated tail lpos is stored before modifying
	 * the newly allocated data area. Another context may be in
	 * data_make_reusable() and is reading a block ID from this
	 * area. data_make_reusable() can handle reading a garbage block
	 * ID value, but then it must be able to load a new tail lpos.
	 * This pairs with data_push_tail:A.
	 */
	smp_mb(); /* LMM(data_alloc:A) */

John Ogness

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: Barrier before pushing desc_ring tail: was [PATCH v2 2/3] printk: add lockless buffer
  2020-06-11 12:01       ` Petr Mladek
@ 2020-06-11 23:06         ` John Ogness
  0 siblings, 0 replies; 30+ messages in thread
From: John Ogness @ 2020-06-11 23:06 UTC (permalink / raw)
  To: Petr Mladek
  Cc: Peter Zijlstra, Sergey Senozhatsky, Sergey Senozhatsky,
	Steven Rostedt, Linus Torvalds, Greg Kroah-Hartman, Andrea Parri,
	Thomas Gleixner, kexec, linux-kernel, Paul McKenney

On 2020-06-11, Petr Mladek <pmladek@suse.com> wrote:
> All this relies on the fact the the full barrier is called in
> data_push_tail() and data_push_tail() is called right above.
> But there are two situations where the barrier is not called.
> It is when:
>
>   1. desc.text_blk_lpos.next already is behind data_ring->tail_lpos.
>
>      This is safe.

Yes, and I have since expanded the comment above the data_push_tail()
while loop to mention that:

	/*
	 * Loop until the tail lpos is at or beyond @lpos. This condition
	 * may already be satisfied, resulting in no full memory barrier
	 * from data_push_tail:C being performed. However, since this CPU
	 * sees the new tail lpos, any descriptor states that transitioned to
	 * the reusable state must already be visible.
	 */

>   2. desc.text_blk_lpos == INVALID_LPOS.
>
>      It seems that this is not synchronized and other CPUs might see
>      the old state.

Great catch! This could trigger the WARN_ON at desc_reserve:F and lead
to prb_reserve() unnecessarily failing.

> The question is what to do with the empty data case. I see three
> possibilities:
>
>   1. Ignore the case with empty block because it (probably) does not
>      cause real problems.

It could cause dropped messages.

>   2. Call the full barrier in data_push_tail() even when the data
>      block is empty.

This is the common case, since most records will not have dictionary
data.

>   3. Call the full barrier also in desc_push_tail() as I suggested.
>
> My opinion:
>
> I prefer 3rd solution.

Agreed. For my next version I am folding all smp_mb() and smp_wmb()
calls into their neighboring cmpxchg_relaxed(). This would apply here as
well, changing desc_push_tail:B to a full cmpxchg().

We still need the full memory barrier at data_push_tail:C. Readers rely
on it to be able to verify if their copied data is still valid:

CPU0 (writer0)        CPU1 (writer1)       CPU2 (reader)
                                           desc_read->committed
desc_make_reusable
smp_mb
data_push_tail
                      read new data tail
                      data_push_head
                      smp_mb
                      write new data
                                           read garbage new data
                                           desc_read->reusable

John Ogness

^ permalink raw reply	[flat|nested] 30+ messages in thread

end of thread, other threads:[~2020-06-11 23:06 UTC | newest]

Thread overview: 30+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-05-01  9:40 [PATCH v2 0/3] printk: replace ringbuffer John Ogness
2020-05-01  9:40 ` [PATCH v2 1/3] crash: add VMCOREINFO macro for anonymous structs John Ogness
2020-06-03 10:16   ` Petr Mladek
2020-05-01  9:40 ` [PATCH v2 2/3] printk: add lockless buffer John Ogness
     [not found]   ` <87v9ktcs3q.fsf@vostro.fn.ogness.net>
2020-05-18 17:22     ` Linus Torvalds
2020-05-19 20:34       ` John Ogness
2020-06-09  7:10   ` blk->id read race: was: " Petr Mladek
2020-06-09 14:18     ` John Ogness
2020-06-10  8:42       ` Petr Mladek
2020-06-10 13:55         ` John Ogness
2020-06-09  9:31   ` redundant check in make_data_reusable(): was " Petr Mladek
2020-06-09 14:48     ` John Ogness
2020-06-10  9:38       ` Petr Mladek
2020-06-10 10:24         ` John Ogness
2020-06-10 14:56           ` John Ogness
2020-06-11 19:51             ` John Ogness
2020-06-11 13:55           ` Petr Mladek
2020-06-11 20:25             ` John Ogness
2020-06-09  9:48   ` Full barrier in data_push_tail(): " Petr Mladek
2020-06-09 15:03     ` John Ogness
2020-06-09 11:37   ` Barrier before pushing desc_ring tail: " Petr Mladek
2020-06-09 15:56     ` John Ogness
2020-06-11 12:01       ` Petr Mladek
2020-06-11 23:06         ` John Ogness
2020-06-09 14:38   ` data_ring head_lpos and tail_lpos synchronization: " Petr Mladek
2020-06-10  7:53     ` John Ogness
2020-05-01  9:40 ` [PATCH v2 3/3] printk: use the lockless ringbuffer John Ogness
2020-05-06 14:50   ` John Ogness
2020-05-13 12:05 ` [PATCH v2 0/3] printk: replace ringbuffer Prarit Bhargava
2020-05-15 10:24 ` Sergey Senozhatsky

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).