All of lore.kernel.org
 help / color / mirror / Atom feed
From: "J. corwin Coburn" <corwin@redhat.com>
To: dm-devel@redhat.com, linux-block@vger.kernel.org
Cc: vdo-devel@redhat.com, "J. corwin Coburn" <corwin@redhat.com>
Subject: [dm-devel] [PATCH v2 07/39] Add specialized request queueing functionality.
Date: Tue, 23 May 2023 17:45:07 -0400	[thread overview]
Message-ID: <20230523214539.226387-8-corwin@redhat.com> (raw)
In-Reply-To: <20230523214539.226387-1-corwin@redhat.com>

This patch adds funnel_queue, a mostly lock-free multi-producer,
single-consumer queue. It also adds the request queue used by the dm-vdo
deduplication index, and the work_queue used by the dm-vdo data store. Both
of these are built on top of funnel queue and are intended to support the
dispatching of many short-running tasks. The work_queue also supports
priorities. Finally, this patch adds vdo_completion, the structure which is
enqueued on work_queues.

Signed-off-by: J. corwin Coburn <corwin@redhat.com>
---
 drivers/md/dm-vdo/completion.c    | 141 +++++++
 drivers/md/dm-vdo/completion.h    | 155 +++++++
 drivers/md/dm-vdo/cpu.h           |  58 +++
 drivers/md/dm-vdo/funnel-queue.c  | 169 ++++++++
 drivers/md/dm-vdo/funnel-queue.h  | 110 +++++
 drivers/md/dm-vdo/request-queue.c | 284 +++++++++++++
 drivers/md/dm-vdo/request-queue.h |  30 ++
 drivers/md/dm-vdo/work-queue.c    | 659 ++++++++++++++++++++++++++++++
 drivers/md/dm-vdo/work-queue.h    |  53 +++
 9 files changed, 1659 insertions(+)
 create mode 100644 drivers/md/dm-vdo/completion.c
 create mode 100644 drivers/md/dm-vdo/completion.h
 create mode 100644 drivers/md/dm-vdo/cpu.h
 create mode 100644 drivers/md/dm-vdo/funnel-queue.c
 create mode 100644 drivers/md/dm-vdo/funnel-queue.h
 create mode 100644 drivers/md/dm-vdo/request-queue.c
 create mode 100644 drivers/md/dm-vdo/request-queue.h
 create mode 100644 drivers/md/dm-vdo/work-queue.c
 create mode 100644 drivers/md/dm-vdo/work-queue.h

diff --git a/drivers/md/dm-vdo/completion.c b/drivers/md/dm-vdo/completion.c
new file mode 100644
index 00000000000..8feb9c05c19
--- /dev/null
+++ b/drivers/md/dm-vdo/completion.c
@@ -0,0 +1,141 @@
+// SPDX-License-Identifier: GPL-2.0-only
+/*
+ * Copyright Red Hat
+ */
+
+#include "completion.h"
+
+#include <linux/kernel.h>
+
+#include "logger.h"
+#include "permassert.h"
+
+#include "status-codes.h"
+#include "types.h"
+#include "vio.h"
+#include "vdo.h"
+
+/**
+ * DOC: vdo completions.
+ *
+ * Most of vdo's data structures are lock free, each either belonging to a single "zone," or
+ * divided into a number of zones whose accesses to the structure do not overlap. During normal
+ * operation, at most one thread will be operating in any given zone. Each zone has a
+ * vdo_work_queue which holds vdo_completions that are to be run in that zone. A completion may
+ * only be enqueued on one queue or operating in a single zone at a time.
+ *
+ * At each step of a multi-threaded operation, the completion performing the operation is given a
+ * callback, error handler, and thread id for the next step. A completion is "run" when it is
+ * operating on the correct thread (as specified by its callback_thread_id). If the value of its
+ * "result" field is an error (i.e. not VDO_SUCCESS), the function in its "error_handler" will be
+ * invoked. If the error_handler is NULL, or there is no error, the function set as its "callback"
+ * will be invoked. Generally, a completion will not be run directly, but rather will be
+ * "launched." In this case, it will check whether it is operating on the correct thread. If it is,
+ * it will run immediately. Otherwise, it will be enqueue on the vdo_work_queue associated with the
+ * completion's "callback_thread_id". When it is dequeued, it will be on the correct thread, and
+ * will get run. In some cases, the completion should get queued instead of running immediately,
+ * even if it is being launched from the correct thread. This is usually in cases where there is a
+ * long chain of callbacks, all on the same thread, which could overflow the stack. In such cases,
+ * the completion's "requeue" field should be set to true. Doing so will skip the current thread
+ * check and simply enqueue the completion.
+ *
+ * A completion may be "finished," in which case its "complete" field will be set to true before it
+ * is next run. It is a bug to attempt to set the result or re-finish a finished completion.
+ * Because a completion's fields are not safe to examine from any thread other than the one on
+ * which the completion is currently operating, this field is used only to aid in detecting
+ * programming errors. It can not be used for cross-thread checking on the status of an operation.
+ * A completion must be "reset" before it can be reused after it has been finished. Resetting will
+ * also clear any error from the result field.
+ **/
+
+void vdo_initialize_completion(struct vdo_completion *completion,
+			       struct vdo *vdo,
+			       enum vdo_completion_type type)
+{
+	memset(completion, 0, sizeof(*completion));
+	completion->vdo = vdo;
+	completion->type = type;
+	vdo_reset_completion(completion);
+}
+
+static inline void assert_incomplete(struct vdo_completion *completion)
+{
+	ASSERT_LOG_ONLY(!completion->complete, "completion is not complete");
+}
+
+/**
+ * vdo_set_completion_result() - Set the result of a completion.
+ *
+ * Older errors will not be masked.
+ */
+void vdo_set_completion_result(struct vdo_completion *completion, int result)
+{
+	assert_incomplete(completion);
+	if (completion->result == VDO_SUCCESS)
+		completion->result = result;
+}
+
+/**
+ * vdo_launch_completion_with_priority() - Run or enqueue a completion.
+ * @priority: The priority at which to enqueue the completion.
+ *
+ * If called on the correct thread (i.e. the one specified in the completion's callback_thread_id
+ * field) and not marked for requeue, the completion will be run immediately. Otherwise, the
+ * completion will be enqueued on the specified thread.
+ */
+void vdo_launch_completion_with_priority(struct vdo_completion *completion,
+					 enum vdo_completion_priority priority)
+{
+	thread_id_t callback_thread = completion->callback_thread_id;
+
+	if (completion->requeue || (callback_thread != vdo_get_callback_thread_id())) {
+		vdo_enqueue_completion(completion, priority);
+		return;
+	}
+
+	vdo_run_completion(completion);
+}
+
+/** vdo_finish_completion() - Mark a completion as complete and then launch it. */
+void vdo_finish_completion(struct vdo_completion *completion)
+{
+	assert_incomplete(completion);
+	completion->complete = true;
+	if (completion->callback != NULL)
+		vdo_launch_completion(completion);
+}
+
+void vdo_enqueue_completion(struct vdo_completion *completion,
+			    enum vdo_completion_priority priority)
+{
+	struct vdo *vdo = completion->vdo;
+	thread_id_t thread_id = completion->callback_thread_id;
+
+	if (ASSERT(thread_id < vdo->thread_config.thread_count,
+		   "thread_id %u (completion type %d) is less than thread count %u",
+		   thread_id,
+		   completion->type,
+		   vdo->thread_config.thread_count) != UDS_SUCCESS)
+		BUG();
+
+	completion->requeue = false;
+	completion->priority = priority;
+	completion->my_queue = NULL;
+	vdo_enqueue_work_queue(vdo->threads[thread_id].queue, completion);
+}
+
+/**
+ * vdo_requeue_completion_if_needed() - Requeue a completion if not called on the specified thread.
+ *
+ * Return: True if the completion was requeued; callers may not access the completion in this case.
+ */
+bool vdo_requeue_completion_if_needed(struct vdo_completion *completion,
+				      thread_id_t callback_thread_id)
+{
+	if (vdo_get_callback_thread_id() == callback_thread_id)
+		return false;
+
+	completion->callback_thread_id = callback_thread_id;
+	vdo_enqueue_completion(completion, VDO_WORK_Q_DEFAULT_PRIORITY);
+	return true;
+}
diff --git a/drivers/md/dm-vdo/completion.h b/drivers/md/dm-vdo/completion.h
new file mode 100644
index 00000000000..03b2daa6546
--- /dev/null
+++ b/drivers/md/dm-vdo/completion.h
@@ -0,0 +1,155 @@
+/* SPDX-License-Identifier: GPL-2.0-only */
+/*
+ * Copyright Red Hat
+ */
+
+#ifndef VDO_COMPLETION_H
+#define VDO_COMPLETION_H
+
+#include "permassert.h"
+
+#include "status-codes.h"
+#include "types.h"
+
+/**
+ * vdo_run_completion() - Run a completion's callback or error handler on the current thread.
+ *
+ * Context: This function must be called from the correct callback thread.
+ */
+static inline void vdo_run_completion(struct vdo_completion *completion)
+{
+	if ((completion->result != VDO_SUCCESS) && (completion->error_handler != NULL)) {
+		completion->error_handler(completion);
+		return;
+	}
+
+	completion->callback(completion);
+}
+
+void vdo_set_completion_result(struct vdo_completion *completion, int result);
+
+void vdo_initialize_completion(struct vdo_completion *completion,
+			       struct vdo *vdo,
+			       enum vdo_completion_type type);
+
+/**
+ * vdo_reset_completion() - Reset a completion to a clean state, while keeping the type, vdo and
+ *                          parent information.
+ */
+static inline void vdo_reset_completion(struct vdo_completion *completion)
+{
+	completion->result = VDO_SUCCESS;
+	completion->complete = false;
+}
+
+void vdo_launch_completion_with_priority(struct vdo_completion *completion,
+					 enum vdo_completion_priority priority);
+
+/**
+ * vdo_launch_completion() - Launch a completion with default priority.
+ */
+static inline void vdo_launch_completion(struct vdo_completion *completion)
+{
+	vdo_launch_completion_with_priority(completion, VDO_WORK_Q_DEFAULT_PRIORITY);
+}
+
+/**
+ * vdo_continue_completion() - Continue processing a completion.
+ * @result: The current result (will not mask older errors).
+ *
+ * Continue processing a completion by setting the current result and calling
+ * vdo_launch_completion().
+ */
+static inline void vdo_continue_completion(struct vdo_completion *completion, int result)
+{
+	vdo_set_completion_result(completion, result);
+	vdo_launch_completion(completion);
+}
+
+void vdo_finish_completion(struct vdo_completion *completion);
+
+/**
+ * vdo_fail_completion() - Set the result of a completion if it does not already have an error,
+ *                         then finish it.
+ */
+static inline void vdo_fail_completion(struct vdo_completion *completion, int result)
+{
+	vdo_set_completion_result(completion, result);
+	vdo_finish_completion(completion);
+}
+
+/**
+ * vdo_assert_completion_type() - Assert that a completion is of the correct type.
+ *
+ * Return: VDO_SUCCESS or an error
+ */
+static inline int
+vdo_assert_completion_type(struct vdo_completion *completion, enum vdo_completion_type expected)
+{
+	return ASSERT(expected == completion->type,
+		      "completion type should be %u, not %u",
+		      expected,
+		      completion->type);
+}
+
+static inline void vdo_set_completion_callback(struct vdo_completion *completion,
+					       vdo_action *callback,
+					       thread_id_t callback_thread_id)
+{
+	completion->callback = callback;
+	completion->callback_thread_id = callback_thread_id;
+}
+
+/**
+ * vdo_launch_completion_callback() - Set the callback for a completion and launch it immediately.
+ */
+static inline void vdo_launch_completion_callback(struct vdo_completion *completion,
+						  vdo_action *callback,
+						  thread_id_t callback_thread_id)
+{
+	vdo_set_completion_callback(completion, callback, callback_thread_id);
+	vdo_launch_completion(completion);
+}
+
+/**
+ * vdo_prepare_completion() - Prepare a completion for launch.
+ *
+ * Resets the completion, and then sets its callback, error handler, callback thread, and parent.
+ */
+static inline void vdo_prepare_completion(struct vdo_completion *completion,
+					  vdo_action *callback,
+					  vdo_action *error_handler,
+					  thread_id_t callback_thread_id,
+					  void *parent)
+{
+	vdo_reset_completion(completion);
+	vdo_set_completion_callback(completion, callback, callback_thread_id);
+	completion->error_handler = error_handler;
+	completion->parent = parent;
+}
+
+/**
+ * vdo_prepare_completion_for_requeue() - Prepare a completion for launch ensuring that it will
+ *                                        always be requeued.
+ *
+ * Resets the completion, and then sets its callback, error handler, callback thread, and parent.
+ */
+static inline void
+vdo_prepare_completion_for_requeue(struct vdo_completion *completion,
+				   vdo_action *callback,
+				   vdo_action *error_handler,
+				   thread_id_t callback_thread_id,
+				   void *parent)
+{
+	vdo_prepare_completion(completion, callback, error_handler, callback_thread_id, parent);
+	completion->requeue = true;
+}
+
+void vdo_enqueue_completion(struct vdo_completion *completion,
+			    enum vdo_completion_priority priority);
+
+
+bool vdo_requeue_completion_if_needed(struct vdo_completion *completion,
+				      thread_id_t callback_thread_id);
+
+#endif /* VDO_COMPLETION_H */
diff --git a/drivers/md/dm-vdo/cpu.h b/drivers/md/dm-vdo/cpu.h
new file mode 100644
index 00000000000..d2da22d9d60
--- /dev/null
+++ b/drivers/md/dm-vdo/cpu.h
@@ -0,0 +1,58 @@
+/* SPDX-License-Identifier: GPL-2.0-only */
+/*
+ * Copyright Red Hat
+ */
+
+#ifndef UDS_CPU_H
+#define UDS_CPU_H
+
+#include <linux/cache.h>
+
+/**
+ * uds_prefetch_address() - Minimize cache-miss latency by attempting to move data into a CPU cache
+ *                          before it is accessed.
+ *
+ * @address: the address to fetch (may be invalid)
+ * @for_write: must be constant at compile time--false if for reading, true if for writing
+ */
+static inline void uds_prefetch_address(const void *address, bool for_write)
+{
+	/*
+	 * for_write won't be a constant if we are compiled with optimization turned off, in which
+	 * case prefetching really doesn't matter. clang can't figure out that if for_write is a
+	 * constant, it can be passed as the second, mandatorily constant argument to prefetch(),
+	 * at least currently on llvm 12.
+	 */
+	if (__builtin_constant_p(for_write)) {
+		if (for_write)
+			__builtin_prefetch(address, true);
+		else
+			__builtin_prefetch(address, false);
+	}
+}
+
+/**
+ * uds_prefetch_range() - Minimize cache-miss latency by attempting to move a range of addresses
+ *                        into a CPU cache before they are accessed.
+ *
+ * @start: the starting address to fetch (may be invalid)
+ * @size: the number of bytes in the address range
+ * @for_write: must be constant at compile time--false if for reading, true if for writing
+ */
+static inline void uds_prefetch_range(const void *start, unsigned int size, bool for_write)
+{
+	/*
+	 * Count the number of cache lines to fetch, allowing for the address range to span an
+	 * extra cache line boundary due to address alignment.
+	 */
+	const char *address = (const char *) start;
+	unsigned int offset = ((uintptr_t) address % L1_CACHE_BYTES);
+	unsigned int cache_lines = (1 + ((size + offset) / L1_CACHE_BYTES));
+
+	while (cache_lines-- > 0) {
+		uds_prefetch_address(address, for_write);
+		address += L1_CACHE_BYTES;
+	}
+}
+
+#endif /* UDS_CPU_H */
diff --git a/drivers/md/dm-vdo/funnel-queue.c b/drivers/md/dm-vdo/funnel-queue.c
new file mode 100644
index 00000000000..7e36153ec0a
--- /dev/null
+++ b/drivers/md/dm-vdo/funnel-queue.c
@@ -0,0 +1,169 @@
+// SPDX-License-Identifier: GPL-2.0-only
+/*
+ * Copyright Red Hat
+ */
+
+#include "funnel-queue.h"
+
+#include "cpu.h"
+#include "memory-alloc.h"
+#include "permassert.h"
+#include "uds.h"
+
+int uds_make_funnel_queue(struct funnel_queue **queue_ptr)
+{
+	int result;
+	struct funnel_queue *queue;
+
+	result = UDS_ALLOCATE(1, struct funnel_queue, "funnel queue", &queue);
+	if (result != UDS_SUCCESS)
+		return result;
+
+	/*
+	 * Initialize the stub entry and put it in the queue, establishing the invariant that
+	 * queue->newest and queue->oldest are never null.
+	 */
+	queue->stub.next = NULL;
+	queue->newest = &queue->stub;
+	queue->oldest = &queue->stub;
+
+	*queue_ptr = queue;
+	return UDS_SUCCESS;
+}
+
+void uds_free_funnel_queue(struct funnel_queue *queue)
+{
+	UDS_FREE(queue);
+}
+
+static struct funnel_queue_entry *get_oldest(struct funnel_queue *queue)
+{
+	/*
+	 * Barrier requirements: We need a read barrier between reading a "next" field pointer
+	 * value and reading anything it points to. There's an accompanying barrier in
+	 * uds_funnel_queue_put() between its caller setting up the entry and making it visible.
+	 */
+	struct funnel_queue_entry *oldest = queue->oldest;
+	struct funnel_queue_entry *next = READ_ONCE(oldest->next);
+
+	if (oldest == &queue->stub) {
+		/*
+		 * When the oldest entry is the stub and it has no successor, the queue is
+		 * logically empty.
+		 */
+		if (next == NULL)
+			return NULL;
+		/*
+		 * The stub entry has a successor, so the stub can be dequeued and ignored without
+		 * breaking the queue invariants.
+		 */
+		oldest = next;
+		queue->oldest = oldest;
+		next = READ_ONCE(oldest->next);
+	}
+
+	/*
+	 * We have a non-stub candidate to dequeue. If it lacks a successor, we'll need to put the
+	 * stub entry back on the queue first.
+	 */
+	if (next == NULL) {
+		struct funnel_queue_entry *newest = READ_ONCE(queue->newest);
+
+		if (oldest != newest)
+			/*
+			 * Another thread has already swung queue->newest atomically, but not yet
+			 * assigned previous->next. The queue is really still empty.
+			 */
+			return NULL;
+
+		/*
+		 * Put the stub entry back on the queue, ensuring a successor will eventually be
+		 * seen.
+		 */
+		uds_funnel_queue_put(queue, &queue->stub);
+
+		/* Check again for a successor. */
+		next = READ_ONCE(oldest->next);
+		if (next == NULL)
+			/*
+			 * We lost a race with a producer who swapped queue->newest before we did,
+			 * but who hasn't yet updated previous->next. Try again later.
+			 */
+			return NULL;
+	}
+
+	return oldest;
+}
+
+/*
+ * Poll a queue, removing the oldest entry if the queue is not empty. This function must only be
+ * called from a single consumer thread.
+ */
+struct funnel_queue_entry *uds_funnel_queue_poll(struct funnel_queue *queue)
+{
+	struct funnel_queue_entry *oldest = get_oldest(queue);
+
+	if (oldest == NULL)
+		return oldest;
+
+	/*
+	 * Dequeue the oldest entry and return it. Only one consumer thread may call this function,
+	 * so no locking, atomic operations, or fences are needed; queue->oldest is owned by the
+	 * consumer and oldest->next is never used by a producer thread after it is swung from NULL
+	 * to non-NULL.
+	 */
+	queue->oldest = READ_ONCE(oldest->next);
+	/*
+	 * Make sure the caller sees the proper stored data for this entry. Since we've already
+	 * fetched the entry pointer we stored in "queue->oldest", this also ensures that on entry
+	 * to the next call we'll properly see the dependent data.
+	 */
+	smp_rmb();
+	/*
+	 * If "oldest" is a very light-weight work item, we'll be looking for the next one very
+	 * soon, so prefetch it now.
+	 */
+	uds_prefetch_address(queue->oldest, true);
+	WRITE_ONCE(oldest->next, NULL);
+	return oldest;
+}
+
+/*
+ * Check whether the funnel queue is empty or not. If the queue is in a transition state with one
+ * or more entries being added such that the list view is incomplete, this function will report the
+ * queue as empty.
+ */
+bool uds_is_funnel_queue_empty(struct funnel_queue *queue)
+{
+	return get_oldest(queue) == NULL;
+}
+
+/*
+ * Check whether the funnel queue is idle or not. If the queue has entries available to be
+ * retrieved, it is not idle. If the queue is in a transition state with one or more entries being
+ * added such that the list view is incomplete, it may not be possible to retrieve an entry with
+ * the uds_funnel_queue_poll() function, but the queue will not be considered idle.
+ */
+bool uds_is_funnel_queue_idle(struct funnel_queue *queue)
+{
+	/*
+	 * Oldest is not the stub, so there's another entry, though if next is NULL we can't
+	 * retrieve it yet.
+	 */
+	if (queue->oldest != &queue->stub)
+		return false;
+
+	/*
+	 * Oldest is the stub, but newest has been updated by _put(); either there's another,
+	 * retrievable entry in the list, or the list is officially empty but in the intermediate
+	 * state of having an entry added.
+	 *
+	 * Whether anything is retrievable depends on whether stub.next has been updated and become
+	 * visible to us, but for idleness we don't care. And due to memory ordering in _put(), the
+	 * update to newest would be visible to us at the same time or sooner.
+	 */
+	if (READ_ONCE(queue->newest) != &queue->stub)
+		return false;
+
+	return true;
+}
diff --git a/drivers/md/dm-vdo/funnel-queue.h b/drivers/md/dm-vdo/funnel-queue.h
new file mode 100644
index 00000000000..332acc579b6
--- /dev/null
+++ b/drivers/md/dm-vdo/funnel-queue.h
@@ -0,0 +1,110 @@
+/* SPDX-License-Identifier: GPL-2.0-only */
+/*
+ * Copyright Red Hat
+ */
+
+#ifndef UDS_FUNNEL_QUEUE_H
+#define UDS_FUNNEL_QUEUE_H
+
+#include <linux/atomic.h>
+#include <linux/cache.h>
+
+/*
+ * A funnel queue is a simple (almost) lock-free queue that accepts entries from multiple threads
+ * (multi-producer) and delivers them to a single thread (single-consumer). "Funnel" is an attempt
+ * to evoke the image of requests from more than one producer being "funneled down" to a single
+ * consumer.
+ *
+ * This is an unsynchronized but thread-safe data structure when used as intended. There is no
+ * mechanism to ensure that only one thread is consuming from the queue. If more than one thread
+ * attempts to consume from the queue, the resulting behavior is undefined. Clients must not
+ * directly access or manipulate the internals of the queue, which are only exposed for the purpose
+ * of allowing the very simple enqueue operation to be inlined.
+ *
+ * The implementation requires that a funnel_queue_entry structure (a link pointer) is embedded in
+ * the queue entries, and pointers to those structures are used exclusively by the queue. No macros
+ * are defined to template the queue, so the offset of the funnel_queue_entry in the records placed
+ * in the queue must all be the same so the client can derive their structure pointer from the
+ * entry pointer returned by uds_funnel_queue_poll().
+ *
+ * Callers are wholly responsible for allocating and freeing the entries. Entries may be freed as
+ * soon as they are returned since this queue is not susceptible to the "ABA problem" present in
+ * many lock-free data structures. The queue is dynamically allocated to ensure cache-line
+ * alignment, but no other dynamic allocation is used.
+ *
+ * The algorithm is not actually 100% lock-free. There is a single point in uds_funnel_queue_put()
+ * at which a preempted producer will prevent the consumers from seeing items added to the queue by
+ * later producers, and only if the queue is short enough or the consumer fast enough for it to
+ * reach what was the end of the queue at the time of the preemption.
+ *
+ * The consumer function, uds_funnel_queue_poll(), will return NULL when the queue is empty. To
+ * wait for data to consume, spin (if safe) or combine the queue with a struct event_count to
+ * signal the presence of new entries.
+ */
+
+/* This queue link structure must be embedded in client entries. */
+struct funnel_queue_entry {
+	/* The next (newer) entry in the queue. */
+	struct funnel_queue_entry *next;
+};
+
+/*
+ * The dynamically allocated queue structure, which is allocated on a cache line boundary so the
+ * producer and consumer fields in the structure will land on separate cache lines. This should be
+ * consider opaque but it is exposed here so uds_funnel_queue_put() can be inlined.
+ */
+struct __aligned(L1_CACHE_BYTES) funnel_queue {
+	/*
+	 * The producers' end of the queue, an atomically exchanged pointer that will never be
+	 * NULL.
+	 */
+	struct funnel_queue_entry *newest;
+
+	/* The consumer's end of the queue, which is owned by the consumer and never NULL. */
+	struct funnel_queue_entry *oldest __aligned(L1_CACHE_BYTES);
+
+	/* A dummy entry used to provide the non-NULL invariants above. */
+	struct funnel_queue_entry stub;
+};
+
+int __must_check uds_make_funnel_queue(struct funnel_queue **queue_ptr);
+
+void uds_free_funnel_queue(struct funnel_queue *queue);
+
+/*
+ * Put an entry on the end of the queue.
+ *
+ * The entry pointer must be to the struct funnel_queue_entry embedded in the caller's data
+ * structure. The caller must be able to derive the address of the start of their data structure
+ * from the pointer that passed in here, so every entry in the queue must have the struct
+ * funnel_queue_entry at the same offset within the client's structure.
+ */
+static inline void
+uds_funnel_queue_put(struct funnel_queue *queue, struct funnel_queue_entry *entry)
+{
+	struct funnel_queue_entry *previous;
+
+	/*
+	 * Barrier requirements: All stores relating to the entry ("next" pointer, containing data
+	 * structure fields) must happen before the previous->next store making it visible to the
+	 * consumer. Also, the entry's "next" field initialization to NULL must happen before any
+	 * other producer threads can see the entry (the xchg) and try to update the "next" field.
+	 *
+	 * xchg implements a full barrier.
+	 */
+	WRITE_ONCE(entry->next, NULL);
+	previous = xchg(&queue->newest, entry);
+	/*
+	 * Preemptions between these two statements hide the rest of the queue from the consumer,
+	 * preventing consumption until the following assignment runs.
+	 */
+	WRITE_ONCE(previous->next, entry);
+}
+
+struct funnel_queue_entry *__must_check uds_funnel_queue_poll(struct funnel_queue *queue);
+
+bool __must_check uds_is_funnel_queue_empty(struct funnel_queue *queue);
+
+bool __must_check uds_is_funnel_queue_idle(struct funnel_queue *queue);
+
+#endif /* UDS_FUNNEL_QUEUE_H */
diff --git a/drivers/md/dm-vdo/request-queue.c b/drivers/md/dm-vdo/request-queue.c
new file mode 100644
index 00000000000..058422f44d9
--- /dev/null
+++ b/drivers/md/dm-vdo/request-queue.c
@@ -0,0 +1,284 @@
+// SPDX-License-Identifier: GPL-2.0-only
+/*
+ * Copyright Red Hat
+ */
+
+#include "request-queue.h"
+
+#include <linux/atomic.h>
+#include <linux/compiler.h>
+#include <linux/wait.h>
+
+#include "funnel-queue.h"
+#include "logger.h"
+#include "memory-alloc.h"
+#include "uds-threads.h"
+
+/*
+ * This queue will attempt to handle requests in reasonably sized batches instead of reacting
+ * immediately to each new request. The wait time between batches is dynamically adjusted up or
+ * down to try to balance responsiveness against wasted thread run time.
+ *
+ * If the wait time becomes long enough, the queue will become dormant and must be explicitly
+ * awoken when a new request is enqueued. The enqueue operation updates "newest" in the funnel
+ * queue via xchg (which is a memory barrier), and later checks "dormant" to decide whether to do a
+ * wakeup of the worker thread.
+ *
+ * When deciding to go to sleep, the worker thread sets "dormant" and then examines "newest" to
+ * decide if the funnel queue is idle. In dormant mode, the last examination of "newest" before
+ * going to sleep is done inside the wait_event_interruptible() macro, after a point where one or
+ * more memory barriers have been issued. (Preparing to sleep uses spin locks.) Even if the funnel
+ * queue's "next" field update isn't visible yet to make the entry accessible, its existence will
+ * kick the worker thread out of dormant mode and back into timer-based mode.
+ *
+ * Unbatched requests are used to communicate between different zone threads and will also cause
+ * the queue to awaken immediately.
+ */
+
+enum {
+	NANOSECOND = 1,
+	MICROSECOND = 1000 * NANOSECOND,
+	MILLISECOND = 1000 * MICROSECOND,
+	DEFAULT_WAIT_TIME = 20 * MICROSECOND,
+	MINIMUM_WAIT_TIME = DEFAULT_WAIT_TIME / 2,
+	MAXIMUM_WAIT_TIME = MILLISECOND,
+	MINIMUM_BATCH = 32,
+	MAXIMUM_BATCH = 64,
+};
+
+struct uds_request_queue {
+	/* Wait queue for synchronizing producers and consumer */
+	struct wait_queue_head wait_head;
+	/* Function to process a request */
+	uds_request_queue_processor_t *processor;
+	/* Queue of new incoming requests */
+	struct funnel_queue *main_queue;
+	/* Queue of old requests to retry */
+	struct funnel_queue *retry_queue;
+	/* The thread id of the worker thread */
+	struct thread *thread;
+	/* True if the worker was started */
+	bool started;
+	/* When true, requests can be enqueued */
+	bool running;
+	/* A flag set when the worker is waiting without a timeout */
+	atomic_t dormant;
+};
+
+static inline struct uds_request *poll_queues(struct uds_request_queue *queue)
+{
+	struct funnel_queue_entry *entry;
+
+	entry = uds_funnel_queue_poll(queue->retry_queue);
+	if (entry != NULL)
+		return container_of(entry, struct uds_request, queue_link);
+
+	entry = uds_funnel_queue_poll(queue->main_queue);
+	if (entry != NULL)
+		return container_of(entry, struct uds_request, queue_link);
+
+	return NULL;
+}
+
+static inline bool are_queues_idle(struct uds_request_queue *queue)
+{
+	return uds_is_funnel_queue_idle(queue->retry_queue) &&
+	       uds_is_funnel_queue_idle(queue->main_queue);
+}
+
+/*
+ * Determine if there is a next request to process, and return it if there is. Also return flags
+ * indicating whether the worker thread can sleep (for the use of wait_event() macros) and whether
+ * the thread did sleep before returning a new request.
+ */
+static inline bool dequeue_request(struct uds_request_queue *queue,
+				   struct uds_request **request_ptr,
+				   bool *waited_ptr)
+{
+	struct uds_request *request = poll_queues(queue);
+
+	if (request != NULL) {
+		*request_ptr = request;
+		return true;
+	}
+
+	if (!READ_ONCE(queue->running)) {
+		/* Wake the worker thread so it can exit. */
+		*request_ptr = NULL;
+		return true;
+	}
+
+	*request_ptr = NULL;
+	*waited_ptr = true;
+	return false;
+}
+
+static void wait_for_request(struct uds_request_queue *queue,
+			     bool dormant,
+			     unsigned long timeout,
+			     struct uds_request **request,
+			     bool *waited)
+{
+	if (dormant) {
+		wait_event_interruptible(queue->wait_head,
+					 (dequeue_request(queue, request, waited) ||
+					  !are_queues_idle(queue)));
+		return;
+	}
+
+	wait_event_interruptible_hrtimeout(queue->wait_head,
+					   dequeue_request(queue, request, waited),
+					   ns_to_ktime(timeout));
+}
+
+static void request_queue_worker(void *arg)
+{
+	struct uds_request_queue *queue = (struct uds_request_queue *) arg;
+	struct uds_request *request = NULL;
+	unsigned long time_batch = DEFAULT_WAIT_TIME;
+	bool dormant = atomic_read(&queue->dormant);
+	bool waited = false;
+	long current_batch = 0;
+
+	for (;;) {
+		wait_for_request(queue, dormant, time_batch, &request, &waited);
+		if (likely(request != NULL)) {
+			current_batch++;
+			queue->processor(request);
+		} else if (!READ_ONCE(queue->running)) {
+			break;
+		}
+
+		if (dormant) {
+			/*
+			 * The queue has been roused from dormancy. Clear the flag so enqueuers can
+			 * stop broadcasting. No fence is needed for this transition.
+			 */
+			atomic_set(&queue->dormant, false);
+			dormant = false;
+			time_batch = DEFAULT_WAIT_TIME;
+		} else if (waited) {
+			/*
+			 * We waited for this request to show up. Adjust the wait time to smooth
+			 * out the batch size.
+			 */
+			if (current_batch < MINIMUM_BATCH) {
+				/*
+				 * If the last batch of requests was too small, increase the wait
+				 * time.
+				 */
+				time_batch += time_batch / 4;
+				if (time_batch >= MAXIMUM_WAIT_TIME) {
+					atomic_set(&queue->dormant, true);
+					dormant = true;
+				}
+			} else if (current_batch > MAXIMUM_BATCH) {
+				/*
+				 * If the last batch of requests was too large, decrease the wait
+				 * time.
+				 */
+				time_batch -= time_batch / 4;
+				if (time_batch < MINIMUM_WAIT_TIME)
+					time_batch = MINIMUM_WAIT_TIME;
+			}
+			current_batch = 0;
+		}
+	}
+
+	/*
+	 * Ensure that we process any remaining requests that were enqueued before trying to shut
+	 * down. The corresponding write barrier is in uds_request_queue_finish().
+	 */
+	smp_rmb();
+	while ((request = poll_queues(queue)) != NULL)
+		queue->processor(request);
+}
+
+int uds_make_request_queue(const char *queue_name,
+			   uds_request_queue_processor_t *processor,
+			   struct uds_request_queue **queue_ptr)
+{
+	int result;
+	struct uds_request_queue *queue;
+
+	result = UDS_ALLOCATE(1, struct uds_request_queue, __func__, &queue);
+	if (result != UDS_SUCCESS)
+		return result;
+
+	queue->processor = processor;
+	queue->running = true;
+	atomic_set(&queue->dormant, false);
+	init_waitqueue_head(&queue->wait_head);
+
+	result = uds_make_funnel_queue(&queue->main_queue);
+	if (result != UDS_SUCCESS) {
+		uds_request_queue_finish(queue);
+		return result;
+	}
+
+	result = uds_make_funnel_queue(&queue->retry_queue);
+	if (result != UDS_SUCCESS) {
+		uds_request_queue_finish(queue);
+		return result;
+	}
+
+	result = uds_create_thread(request_queue_worker, queue, queue_name, &queue->thread);
+	if (result != UDS_SUCCESS) {
+		uds_request_queue_finish(queue);
+		return result;
+	}
+
+	queue->started = true;
+	*queue_ptr = queue;
+	return UDS_SUCCESS;
+}
+
+static inline void wake_up_worker(struct uds_request_queue *queue)
+{
+	if (wq_has_sleeper(&queue->wait_head))
+		wake_up(&queue->wait_head);
+}
+
+void uds_request_queue_enqueue(struct uds_request_queue *queue, struct uds_request *request)
+{
+	struct funnel_queue *sub_queue;
+	bool unbatched = request->unbatched;
+
+	sub_queue = request->requeued ? queue->retry_queue : queue->main_queue;
+	uds_funnel_queue_put(sub_queue, &request->queue_link);
+
+	/*
+	 * We must wake the worker thread when it is dormant. A read fence isn't needed here since
+	 * we know the queue operation acts as one.
+	 */
+	if (atomic_read(&queue->dormant) || unbatched)
+		wake_up_worker(queue);
+}
+
+void uds_request_queue_finish(struct uds_request_queue *queue)
+{
+	int result;
+
+	if (queue == NULL)
+		return;
+
+	/*
+	 * This memory barrier ensures that any requests we queued will be seen. The point is that
+	 * when dequeue_request() sees the following update to the running flag, it will also be
+	 * able to see any change we made to a next field in the funnel queue entry. The
+	 * corresponding read barrier is in request_queue_worker().
+	 */
+	smp_wmb();
+	WRITE_ONCE(queue->running, false);
+
+	if (queue->started) {
+		wake_up_worker(queue);
+		result = uds_join_threads(queue->thread);
+		if (result != UDS_SUCCESS)
+			uds_log_warning_strerror(result, "Failed to join worker thread");
+	}
+
+	uds_free_funnel_queue(queue->main_queue);
+	uds_free_funnel_queue(queue->retry_queue);
+	UDS_FREE(queue);
+}
diff --git a/drivers/md/dm-vdo/request-queue.h b/drivers/md/dm-vdo/request-queue.h
new file mode 100644
index 00000000000..e736072b35a
--- /dev/null
+++ b/drivers/md/dm-vdo/request-queue.h
@@ -0,0 +1,30 @@
+/* SPDX-License-Identifier: GPL-2.0-only */
+/*
+ * Copyright Red Hat
+ */
+
+#ifndef UDS_REQUEST_QUEUE_H
+#define UDS_REQUEST_QUEUE_H
+
+#include "uds.h"
+
+/*
+ * A simple request queue which will handle new requests in the order in which they are received,
+ * and will attempt to handle requeued requests before new ones. However, the nature of the
+ * implementation means that it cannot guarantee this ordering; the prioritization is merely a
+ * hint.
+ */
+
+struct uds_request_queue;
+
+typedef void uds_request_queue_processor_t(struct uds_request *);
+
+int __must_check uds_make_request_queue(const char *queue_name,
+					uds_request_queue_processor_t *processor,
+					struct uds_request_queue **queue_ptr);
+
+void uds_request_queue_enqueue(struct uds_request_queue *queue, struct uds_request *request);
+
+void uds_request_queue_finish(struct uds_request_queue *queue);
+
+#endif /* UDS_REQUEST_QUEUE_H */
diff --git a/drivers/md/dm-vdo/work-queue.c b/drivers/md/dm-vdo/work-queue.c
new file mode 100644
index 00000000000..ecb0c345d4f
--- /dev/null
+++ b/drivers/md/dm-vdo/work-queue.c
@@ -0,0 +1,659 @@
+// SPDX-License-Identifier: GPL-2.0-only
+/*
+ * Copyright Red Hat
+ */
+
+#include "work-queue.h"
+
+#include <linux/atomic.h>
+#include <linux/cache.h>
+#include <linux/completion.h>
+#include <linux/err.h>
+#include <linux/kthread.h>
+#include <linux/percpu.h>
+
+#include "funnel-queue.h"
+#include "logger.h"
+#include "memory-alloc.h"
+#include "numeric.h"
+#include "permassert.h"
+#include "string-utils.h"
+
+#include "completion.h"
+#include "status-codes.h"
+
+static DEFINE_PER_CPU(unsigned int, service_queue_rotor);
+
+/**
+ * DOC: Work queue definition.
+ *
+ * There are two types of work queues: simple, with one worker thread, and round-robin, which uses
+ * a group of the former to do the work, and assigns work to them in round-robin fashion (roughly).
+ * Externally, both are represented via the same common sub-structure, though there's actually not
+ * a great deal of overlap between the two types internally.
+ */
+struct vdo_work_queue {
+	/* Name of just the work queue (e.g., "cpuQ12") */
+	char *name;
+	bool round_robin_mode;
+	struct vdo_thread *owner;
+	/* Life cycle functions, etc */
+	const struct vdo_work_queue_type *type;
+};
+
+struct simple_work_queue {
+	struct vdo_work_queue common;
+	struct funnel_queue *priority_lists[VDO_WORK_Q_MAX_PRIORITY + 1];
+	void *private;
+
+	/*
+	 * The fields above are unchanged after setup but often read, and are good candidates for
+	 * caching -- and if the max priority is 2, just fit in one x86-64 cache line if aligned.
+	 * The fields below are often modified as we sleep and wake, so we want a separate cache
+	 * line for performance.
+	 */
+
+	/* Any (0 or 1) worker threads waiting for new work to do */
+	wait_queue_head_t waiting_worker_threads ____cacheline_aligned;
+	/* Hack to reduce wakeup calls if the worker thread is running */
+	atomic_t idle;
+
+	/* These are infrequently used so in terms of performance we don't care where they land. */
+	struct task_struct *thread;
+	/* Notify creator once worker has initialized */
+	struct completion *started;
+};
+
+struct round_robin_work_queue {
+	struct vdo_work_queue common;
+	struct simple_work_queue **service_queues;
+	unsigned int num_service_queues;
+};
+
+static inline struct simple_work_queue *as_simple_work_queue(struct vdo_work_queue *queue)
+{
+	return ((queue == NULL) ? NULL : container_of(queue, struct simple_work_queue, common));
+}
+
+static inline struct round_robin_work_queue *
+as_round_robin_work_queue(struct vdo_work_queue *queue)
+{
+	return ((queue == NULL) ?
+		 NULL :
+		 container_of(queue, struct round_robin_work_queue, common));
+}
+
+/* Processing normal completions. */
+
+/*
+ * Dequeue and return the next waiting completion, if any.
+ *
+ * We scan the funnel queues from highest priority to lowest, once; there is therefore a race
+ * condition where a high-priority completion can be enqueued followed by a lower-priority one, and
+ * we'll grab the latter (but we'll catch the high-priority item on the next call). If strict
+ * enforcement of priorities becomes necessary, this function will need fixing.
+ */
+static struct vdo_completion *poll_for_completion(struct simple_work_queue *queue)
+{
+	int i;
+
+	for (i = queue->common.type->max_priority; i >= 0; i--) {
+		struct funnel_queue_entry *link = uds_funnel_queue_poll(queue->priority_lists[i]);
+
+		if (link != NULL)
+			return container_of(link, struct vdo_completion, work_queue_entry_link);
+	}
+
+	return NULL;
+}
+
+static void
+enqueue_work_queue_completion(struct simple_work_queue *queue, struct vdo_completion *completion)
+{
+	ASSERT_LOG_ONLY(completion->my_queue == NULL,
+			"completion %px (fn %px) to enqueue (%px) is not already queued (%px)",
+			completion,
+			completion->callback,
+			queue,
+			completion->my_queue);
+	if (completion->priority == VDO_WORK_Q_DEFAULT_PRIORITY)
+		completion->priority = queue->common.type->default_priority;
+
+	if (ASSERT(completion->priority <= queue->common.type->max_priority,
+		   "priority is in range for queue") != VDO_SUCCESS)
+		completion->priority = 0;
+
+	completion->my_queue = &queue->common;
+
+	/* Funnel queue handles the synchronization for the put. */
+	uds_funnel_queue_put(queue->priority_lists[completion->priority],
+			     &completion->work_queue_entry_link);
+
+	/*
+	 * Due to how funnel queue synchronization is handled (just atomic operations), the
+	 * simplest safe implementation here would be to wake-up any waiting threads after
+	 * enqueueing each item. Even if the funnel queue is not empty at the time of adding an
+	 * item to the queue, the consumer thread may not see this since it is not guaranteed to
+	 * have the same view of the queue as a producer thread.
+	 *
+	 * However, the above is wasteful so instead we attempt to minimize the number of thread
+	 * wakeups. Using an idle flag, and careful ordering using memory barriers, we should be
+	 * able to determine when the worker thread might be asleep or going to sleep. We use
+	 * cmpxchg to try to take ownership (vs other producer threads) of the responsibility for
+	 * waking the worker thread, so multiple wakeups aren't tried at once.
+	 *
+	 * This was tuned for some x86 boxes that were handy; it's untested whether doing the read
+	 * first is any better or worse for other platforms, even other x86 configurations.
+	 */
+	smp_mb();
+	if ((atomic_read(&queue->idle) != 1) || (atomic_cmpxchg(&queue->idle, 1, 0) != 1))
+		return;
+
+	/* There's a maximum of one thread in this list. */
+	wake_up(&queue->waiting_worker_threads);
+}
+
+static void run_start_hook(struct simple_work_queue *queue)
+{
+	if (queue->common.type->start != NULL)
+		queue->common.type->start(queue->private);
+}
+
+static void run_finish_hook(struct simple_work_queue *queue)
+{
+	if (queue->common.type->finish != NULL)
+		queue->common.type->finish(queue->private);
+}
+
+/*
+ * Wait for the next completion to process, or until kthread_should_stop indicates that it's time
+ * for us to shut down.
+ *
+ * If kthread_should_stop says it's time to stop but we have pending completions return a
+ * completion.
+ *
+ * Also update statistics relating to scheduler interactions.
+ */
+static struct vdo_completion *wait_for_next_completion(struct simple_work_queue *queue)
+{
+	struct vdo_completion *completion;
+	DEFINE_WAIT(wait);
+
+	while (true) {
+		prepare_to_wait(&queue->waiting_worker_threads, &wait, TASK_INTERRUPTIBLE);
+		/*
+		 * Don't set the idle flag until a wakeup will not be lost.
+		 *
+		 * Force synchronization between setting the idle flag and checking the funnel
+		 * queue; the producer side will do them in the reverse order. (There's still a
+		 * race condition we've chosen to allow, because we've got a timeout below that
+		 * unwedges us if we hit it, but this may narrow the window a little.)
+		 */
+		atomic_set(&queue->idle, 1);
+		smp_mb(); /* store-load barrier between "idle" and funnel queue */
+
+		completion = poll_for_completion(queue);
+		if (completion != NULL)
+			break;
+
+		/*
+		 * We need to check for thread-stop after setting TASK_INTERRUPTIBLE state up
+		 * above. Otherwise, schedule() will put the thread to sleep and might miss a
+		 * wakeup from kthread_stop() call in vdo_finish_work_queue().
+		 */
+		if (kthread_should_stop())
+			break;
+
+		schedule();
+
+		/*
+		 * Most of the time when we wake, it should be because there's work to do. If it
+		 * was a spurious wakeup, continue looping.
+		 */
+		completion = poll_for_completion(queue);
+		if (completion != NULL)
+			break;
+	}
+
+	finish_wait(&queue->waiting_worker_threads, &wait);
+	atomic_set(&queue->idle, 0);
+
+	return completion;
+}
+
+static void process_completion(struct simple_work_queue *queue, struct vdo_completion *completion)
+{
+	if (ASSERT(completion->my_queue == &queue->common,
+		   "completion %px from queue %px marked as being in this queue (%px)",
+		   completion,
+		   queue,
+		   completion->my_queue) == UDS_SUCCESS)
+		completion->my_queue = NULL;
+
+	vdo_run_completion(completion);
+}
+
+static void service_work_queue(struct simple_work_queue *queue)
+{
+	run_start_hook(queue);
+
+	while (true) {
+		struct vdo_completion *completion = poll_for_completion(queue);
+
+		if (completion == NULL)
+			completion = wait_for_next_completion(queue);
+
+		if (completion == NULL)
+			/* No completions but kthread_should_stop() was triggered. */
+			break;
+
+		process_completion(queue, completion);
+
+		/*
+		 * Be friendly to a CPU that has other work to do, if the kernel has told us to.
+		 * This speeds up some performance tests; that "other work" might include other VDO
+		 * threads.
+		 */
+		if (need_resched())
+			cond_resched();
+	}
+
+	run_finish_hook(queue);
+}
+
+static int work_queue_runner(void *ptr)
+{
+	struct simple_work_queue *queue = ptr;
+
+	complete(queue->started);
+	service_work_queue(queue);
+	return 0;
+}
+
+/* Creation & teardown */
+
+static void free_simple_work_queue(struct simple_work_queue *queue)
+{
+	unsigned int i;
+
+	for (i = 0; i <= VDO_WORK_Q_MAX_PRIORITY; i++)
+		uds_free_funnel_queue(queue->priority_lists[i]);
+	UDS_FREE(queue->common.name);
+	UDS_FREE(queue);
+}
+
+static void free_round_robin_work_queue(struct round_robin_work_queue *queue)
+{
+	struct simple_work_queue **queue_table = queue->service_queues;
+	unsigned int count = queue->num_service_queues;
+	unsigned int i;
+
+	queue->service_queues = NULL;
+
+	for (i = 0; i < count; i++)
+		free_simple_work_queue(queue_table[i]);
+	UDS_FREE(queue_table);
+	UDS_FREE(queue->common.name);
+	UDS_FREE(queue);
+}
+
+void vdo_free_work_queue(struct vdo_work_queue *queue)
+{
+	if (queue == NULL)
+		return;
+
+	vdo_finish_work_queue(queue);
+
+	if (queue->round_robin_mode)
+		free_round_robin_work_queue(as_round_robin_work_queue(queue));
+	else
+		free_simple_work_queue(as_simple_work_queue(queue));
+}
+
+static int make_simple_work_queue(const char *thread_name_prefix,
+				  const char *name,
+				  struct vdo_thread *owner,
+				  void *private,
+				  const struct vdo_work_queue_type *type,
+				  struct simple_work_queue **queue_ptr)
+{
+	DECLARE_COMPLETION_ONSTACK(started);
+	struct simple_work_queue *queue;
+	int i;
+	struct task_struct *thread = NULL;
+	int result;
+
+	ASSERT_LOG_ONLY((type->max_priority <= VDO_WORK_Q_MAX_PRIORITY),
+			"queue priority count %u within limit %u",
+			type->max_priority,
+			VDO_WORK_Q_MAX_PRIORITY);
+
+	result = UDS_ALLOCATE(1, struct simple_work_queue, "simple work queue", &queue);
+	if (result != UDS_SUCCESS)
+		return result;
+
+	queue->private = private;
+	queue->started = &started;
+	queue->common.type = type;
+	queue->common.owner = owner;
+	init_waitqueue_head(&queue->waiting_worker_threads);
+
+	result = uds_duplicate_string(name, "queue name", &queue->common.name);
+	if (result != VDO_SUCCESS) {
+		UDS_FREE(queue);
+		return -ENOMEM;
+	}
+
+	for (i = 0; i <= type->max_priority; i++) {
+		result = uds_make_funnel_queue(&queue->priority_lists[i]);
+		if (result != UDS_SUCCESS) {
+			free_simple_work_queue(queue);
+			return result;
+		}
+	}
+
+	thread = kthread_run(work_queue_runner,
+			     queue,
+			     "%s:%s",
+			     thread_name_prefix,
+			     queue->common.name);
+	if (IS_ERR(thread)) {
+		free_simple_work_queue(queue);
+		return (int) PTR_ERR(thread);
+	}
+
+	queue->thread = thread;
+
+	/*
+	 * If we don't wait to ensure the thread is running VDO code, a quick kthread_stop (due to
+	 * errors elsewhere) could cause it to never get as far as running VDO, skipping the
+	 * cleanup code.
+	 *
+	 * Eventually we should just make that path safe too, and then we won't need this
+	 * synchronization.
+	 */
+	wait_for_completion(&started);
+
+	*queue_ptr = queue;
+	return UDS_SUCCESS;
+}
+
+/**
+ * vdo_make_work_queue() - Create a work queue; if multiple threads are requested, completions will
+ *                         be distributed to them in round-robin fashion.
+ *
+ * Each queue is associated with a struct vdo_thread which has a single vdo thread id. Regardless
+ * of the actual number of queues and threads allocated here, code outside of the queue
+ * implementation will treat this as a single zone.
+ */
+int vdo_make_work_queue(const char *thread_name_prefix,
+			const char *name,
+			struct vdo_thread *owner,
+			const struct vdo_work_queue_type *type,
+			unsigned int thread_count,
+			void *thread_privates[],
+			struct vdo_work_queue **queue_ptr)
+{
+	struct round_robin_work_queue *queue;
+	int result;
+	char thread_name[TASK_COMM_LEN];
+	unsigned int i;
+
+	if (thread_count == 1) {
+		struct simple_work_queue *simple_queue;
+		void *context = ((thread_privates != NULL) ? thread_privates[0] : NULL);
+
+		result = make_simple_work_queue(thread_name_prefix,
+						name,
+						owner,
+						context,
+						type,
+						&simple_queue);
+		if (result == VDO_SUCCESS)
+			*queue_ptr = &simple_queue->common;
+		return result;
+	}
+
+	result = UDS_ALLOCATE(1, struct round_robin_work_queue, "round-robin work queue", &queue);
+	if (result != UDS_SUCCESS)
+		return result;
+
+	result = UDS_ALLOCATE(thread_count,
+			      struct simple_work_queue *,
+			      "subordinate work queues",
+			      &queue->service_queues);
+	if (result != UDS_SUCCESS) {
+		UDS_FREE(queue);
+		return result;
+	}
+
+	queue->num_service_queues = thread_count;
+	queue->common.round_robin_mode = true;
+	queue->common.owner = owner;
+
+	result = uds_duplicate_string(name, "queue name", &queue->common.name);
+	if (result != VDO_SUCCESS) {
+		UDS_FREE(queue->service_queues);
+		UDS_FREE(queue);
+		return -ENOMEM;
+	}
+
+	*queue_ptr = &queue->common;
+
+	for (i = 0; i < thread_count; i++) {
+		void *context = ((thread_privates != NULL) ? thread_privates[i] : NULL);
+
+		snprintf(thread_name, sizeof(thread_name), "%s%u", name, i);
+		result = make_simple_work_queue(thread_name_prefix,
+						thread_name,
+						owner,
+						context,
+						type,
+						&queue->service_queues[i]);
+		if (result != VDO_SUCCESS) {
+			queue->num_service_queues = i;
+			/* Destroy previously created subordinates. */
+			vdo_free_work_queue(UDS_FORGET(*queue_ptr));
+			return result;
+		}
+	}
+
+	return VDO_SUCCESS;
+}
+
+static void finish_simple_work_queue(struct simple_work_queue *queue)
+{
+	if (queue->thread == NULL)
+		return;
+
+	/* Tells the worker thread to shut down and waits for it to exit. */
+	kthread_stop(queue->thread);
+	queue->thread = NULL;
+}
+
+static void finish_round_robin_work_queue(struct round_robin_work_queue *queue)
+{
+	struct simple_work_queue **queue_table = queue->service_queues;
+	unsigned int count = queue->num_service_queues;
+	unsigned int i;
+
+	for (i = 0; i < count; i++)
+		finish_simple_work_queue(queue_table[i]);
+}
+
+/* No enqueueing of completions should be done once this function is called. */
+void vdo_finish_work_queue(struct vdo_work_queue *queue)
+{
+	if (queue == NULL)
+		return;
+
+	if (queue->round_robin_mode)
+		finish_round_robin_work_queue(as_round_robin_work_queue(queue));
+	else
+		finish_simple_work_queue(as_simple_work_queue(queue));
+}
+
+/* Debugging dumps */
+
+static void dump_simple_work_queue(struct simple_work_queue *queue)
+{
+	const char *thread_status = "no threads";
+	char task_state_report = '-';
+
+	if (queue->thread != NULL) {
+		task_state_report = task_state_to_char(queue->thread);
+		thread_status = atomic_read(&queue->idle) ? "idle" : "running";
+	}
+
+	uds_log_info("workQ %px (%s) %s (%c)",
+		     &queue->common,
+		     queue->common.name,
+		     thread_status,
+		     task_state_report);
+
+	/* ->waiting_worker_threads wait queue status? anyone waiting? */
+}
+
+/*
+ * Write to the buffer some info about the completion, for logging. Since the common use case is
+ * dumping info about a lot of completions to syslog all at once, the format favors brevity over
+ * readability.
+ */
+void vdo_dump_work_queue(struct vdo_work_queue *queue)
+{
+	if (queue->round_robin_mode) {
+		struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue);
+		unsigned int i;
+
+		for (i = 0; i < round_robin->num_service_queues; i++)
+			dump_simple_work_queue(round_robin->service_queues[i]);
+	} else {
+		dump_simple_work_queue(as_simple_work_queue(queue));
+	}
+}
+
+static void get_function_name(void *pointer, char *buffer, size_t buffer_length)
+{
+	if (pointer == NULL) {
+		/*
+		 * Format "%ps" logs a null pointer as "(null)" with a bunch of leading spaces. We
+		 * sometimes use this when logging lots of data; don't be so verbose.
+		 */
+		strncpy(buffer, "-", buffer_length);
+	} else {
+		/*
+		 * Use a pragma to defeat gcc's format checking, which doesn't understand that
+		 * "%ps" actually does support a precision spec in Linux kernel code.
+		 */
+		char *space;
+
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wformat"
+		snprintf(buffer, buffer_length, "%.*ps", buffer_length - 1, pointer);
+#pragma GCC diagnostic pop
+
+		space = strchr(buffer, ' ');
+		if (space != NULL)
+			*space = '\0';
+	}
+}
+
+void vdo_dump_completion_to_buffer(struct vdo_completion *completion, char *buffer, size_t length)
+{
+	size_t current_length =
+		scnprintf(buffer,
+			  length,
+			  "%.*s/",
+			  TASK_COMM_LEN,
+			  (completion->my_queue == NULL ? "-" : completion->my_queue->name));
+
+	if (current_length < length)
+		get_function_name((void *) completion->callback,
+				  buffer + current_length,
+				  length - current_length);
+}
+
+/* Completion submission */
+/*
+ * If the completion has a timeout that has already passed, the timeout handler function may be
+ * invoked by this function.
+ */
+void vdo_enqueue_work_queue(struct vdo_work_queue *queue, struct vdo_completion *completion)
+{
+	/*
+	 * Convert the provided generic vdo_work_queue to the simple_work_queue to actually queue
+	 * on.
+	 */
+	struct simple_work_queue *simple_queue = NULL;
+
+	if (!queue->round_robin_mode) {
+		simple_queue = as_simple_work_queue(queue);
+	} else {
+		struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue);
+
+		/*
+		 * It shouldn't be a big deal if the same rotor gets used for multiple work queues.
+		 * Any patterns that might develop are likely to be disrupted by random ordering of
+		 * multiple completions and migration between cores, unless the load is so light as
+		 * to be regular in ordering of tasks and the threads are confined to individual
+		 * cores; with a load that light we won't care.
+		 */
+		unsigned int rotor = this_cpu_inc_return(service_queue_rotor);
+		unsigned int index = rotor % round_robin->num_service_queues;
+
+		simple_queue = round_robin->service_queues[index];
+	}
+
+	enqueue_work_queue_completion(simple_queue, completion);
+}
+
+/* Misc */
+
+/*
+ * Return the work queue pointer recorded at initialization time in the work-queue stack handle
+ * initialized on the stack of the current thread, if any.
+ */
+static struct simple_work_queue *get_current_thread_work_queue(void)
+{
+	/*
+	 * In interrupt context, if a vdo thread is what got interrupted, the calls below will find
+	 * the queue for the thread which was interrupted. However, the interrupted thread may have
+	 * been processing a completion, in which case starting to process another would violate
+	 * our concurrency assumptions.
+	 */
+	if (in_interrupt())
+		return NULL;
+	if (kthread_func(current) != work_queue_runner)
+		/* Not a VDO work queue thread. */
+		return NULL;
+	return kthread_data(current);
+}
+
+struct vdo_work_queue *vdo_get_current_work_queue(void)
+{
+	struct simple_work_queue *queue = get_current_thread_work_queue();
+
+	return (queue == NULL) ? NULL : &queue->common;
+}
+
+struct vdo_thread *vdo_get_work_queue_owner(struct vdo_work_queue *queue)
+{
+	return queue->owner;
+}
+
+/**
+ * vdo_get_work_queue_private_data() - Returns the private data for the current thread's work
+ *                                     queue, or NULL if none or if the current thread is not a
+ *                                     work queue thread.
+ */
+void *vdo_get_work_queue_private_data(void)
+{
+	struct simple_work_queue *queue = get_current_thread_work_queue();
+
+	return (queue != NULL) ? queue->private : NULL;
+}
+
+bool vdo_work_queue_type_is(struct vdo_work_queue *queue, const struct vdo_work_queue_type *type)
+{
+	return (queue->type == type);
+}
diff --git a/drivers/md/dm-vdo/work-queue.h b/drivers/md/dm-vdo/work-queue.h
new file mode 100644
index 00000000000..d1e05f8901d
--- /dev/null
+++ b/drivers/md/dm-vdo/work-queue.h
@@ -0,0 +1,53 @@
+/* SPDX-License-Identifier: GPL-2.0-only */
+/*
+ * Copyright Red Hat
+ */
+
+#ifndef VDO_WORK_QUEUE_H
+#define VDO_WORK_QUEUE_H
+
+#include <linux/sched.h> /* for TASK_COMM_LEN */
+
+#include "types.h"
+
+enum {
+	MAX_VDO_WORK_QUEUE_NAME_LEN = TASK_COMM_LEN,
+};
+
+struct vdo_work_queue_type {
+	void (*start)(void *context);
+	void (*finish)(void *context);
+	enum vdo_completion_priority max_priority;
+	enum vdo_completion_priority default_priority;
+};
+
+struct vdo_completion;
+struct vdo_thread;
+struct vdo_work_queue;
+
+int vdo_make_work_queue(const char *thread_name_prefix,
+			const char *name,
+			struct vdo_thread *owner,
+			const struct vdo_work_queue_type *type,
+			unsigned int thread_count,
+			void *thread_privates[],
+			struct vdo_work_queue **queue_ptr);
+
+void vdo_enqueue_work_queue(struct vdo_work_queue *queue, struct vdo_completion *completion);
+
+void vdo_finish_work_queue(struct vdo_work_queue *queue);
+
+void vdo_free_work_queue(struct vdo_work_queue *queue);
+
+void vdo_dump_work_queue(struct vdo_work_queue *queue);
+
+void vdo_dump_completion_to_buffer(struct vdo_completion *completion, char *buffer, size_t length);
+
+void *vdo_get_work_queue_private_data(void);
+struct vdo_work_queue *vdo_get_current_work_queue(void);
+struct vdo_thread *vdo_get_work_queue_owner(struct vdo_work_queue *queue);
+
+bool __must_check
+vdo_work_queue_type_is(struct vdo_work_queue *queue, const struct vdo_work_queue_type *type);
+
+#endif /* VDO_WORK_QUEUE_H */
-- 
2.40.1

--
dm-devel mailing list
dm-devel@redhat.com
https://listman.redhat.com/mailman/listinfo/dm-devel


WARNING: multiple messages have this Message-ID (diff)
From: "J. corwin Coburn" <corwin@redhat.com>
To: dm-devel@redhat.com, linux-block@vger.kernel.org
Cc: vdo-devel@redhat.com, "J. corwin Coburn" <corwin@redhat.com>
Subject: [PATCH v2 07/39] Add specialized request queueing functionality.
Date: Tue, 23 May 2023 17:45:07 -0400	[thread overview]
Message-ID: <20230523214539.226387-8-corwin@redhat.com> (raw)
In-Reply-To: <20230523214539.226387-1-corwin@redhat.com>

This patch adds funnel_queue, a mostly lock-free multi-producer,
single-consumer queue. It also adds the request queue used by the dm-vdo
deduplication index, and the work_queue used by the dm-vdo data store. Both
of these are built on top of funnel queue and are intended to support the
dispatching of many short-running tasks. The work_queue also supports
priorities. Finally, this patch adds vdo_completion, the structure which is
enqueued on work_queues.

Signed-off-by: J. corwin Coburn <corwin@redhat.com>
---
 drivers/md/dm-vdo/completion.c    | 141 +++++++
 drivers/md/dm-vdo/completion.h    | 155 +++++++
 drivers/md/dm-vdo/cpu.h           |  58 +++
 drivers/md/dm-vdo/funnel-queue.c  | 169 ++++++++
 drivers/md/dm-vdo/funnel-queue.h  | 110 +++++
 drivers/md/dm-vdo/request-queue.c | 284 +++++++++++++
 drivers/md/dm-vdo/request-queue.h |  30 ++
 drivers/md/dm-vdo/work-queue.c    | 659 ++++++++++++++++++++++++++++++
 drivers/md/dm-vdo/work-queue.h    |  53 +++
 9 files changed, 1659 insertions(+)
 create mode 100644 drivers/md/dm-vdo/completion.c
 create mode 100644 drivers/md/dm-vdo/completion.h
 create mode 100644 drivers/md/dm-vdo/cpu.h
 create mode 100644 drivers/md/dm-vdo/funnel-queue.c
 create mode 100644 drivers/md/dm-vdo/funnel-queue.h
 create mode 100644 drivers/md/dm-vdo/request-queue.c
 create mode 100644 drivers/md/dm-vdo/request-queue.h
 create mode 100644 drivers/md/dm-vdo/work-queue.c
 create mode 100644 drivers/md/dm-vdo/work-queue.h

diff --git a/drivers/md/dm-vdo/completion.c b/drivers/md/dm-vdo/completion.c
new file mode 100644
index 00000000000..8feb9c05c19
--- /dev/null
+++ b/drivers/md/dm-vdo/completion.c
@@ -0,0 +1,141 @@
+// SPDX-License-Identifier: GPL-2.0-only
+/*
+ * Copyright Red Hat
+ */
+
+#include "completion.h"
+
+#include <linux/kernel.h>
+
+#include "logger.h"
+#include "permassert.h"
+
+#include "status-codes.h"
+#include "types.h"
+#include "vio.h"
+#include "vdo.h"
+
+/**
+ * DOC: vdo completions.
+ *
+ * Most of vdo's data structures are lock free, each either belonging to a single "zone," or
+ * divided into a number of zones whose accesses to the structure do not overlap. During normal
+ * operation, at most one thread will be operating in any given zone. Each zone has a
+ * vdo_work_queue which holds vdo_completions that are to be run in that zone. A completion may
+ * only be enqueued on one queue or operating in a single zone at a time.
+ *
+ * At each step of a multi-threaded operation, the completion performing the operation is given a
+ * callback, error handler, and thread id for the next step. A completion is "run" when it is
+ * operating on the correct thread (as specified by its callback_thread_id). If the value of its
+ * "result" field is an error (i.e. not VDO_SUCCESS), the function in its "error_handler" will be
+ * invoked. If the error_handler is NULL, or there is no error, the function set as its "callback"
+ * will be invoked. Generally, a completion will not be run directly, but rather will be
+ * "launched." In this case, it will check whether it is operating on the correct thread. If it is,
+ * it will run immediately. Otherwise, it will be enqueue on the vdo_work_queue associated with the
+ * completion's "callback_thread_id". When it is dequeued, it will be on the correct thread, and
+ * will get run. In some cases, the completion should get queued instead of running immediately,
+ * even if it is being launched from the correct thread. This is usually in cases where there is a
+ * long chain of callbacks, all on the same thread, which could overflow the stack. In such cases,
+ * the completion's "requeue" field should be set to true. Doing so will skip the current thread
+ * check and simply enqueue the completion.
+ *
+ * A completion may be "finished," in which case its "complete" field will be set to true before it
+ * is next run. It is a bug to attempt to set the result or re-finish a finished completion.
+ * Because a completion's fields are not safe to examine from any thread other than the one on
+ * which the completion is currently operating, this field is used only to aid in detecting
+ * programming errors. It can not be used for cross-thread checking on the status of an operation.
+ * A completion must be "reset" before it can be reused after it has been finished. Resetting will
+ * also clear any error from the result field.
+ **/
+
+void vdo_initialize_completion(struct vdo_completion *completion,
+			       struct vdo *vdo,
+			       enum vdo_completion_type type)
+{
+	memset(completion, 0, sizeof(*completion));
+	completion->vdo = vdo;
+	completion->type = type;
+	vdo_reset_completion(completion);
+}
+
+static inline void assert_incomplete(struct vdo_completion *completion)
+{
+	ASSERT_LOG_ONLY(!completion->complete, "completion is not complete");
+}
+
+/**
+ * vdo_set_completion_result() - Set the result of a completion.
+ *
+ * Older errors will not be masked.
+ */
+void vdo_set_completion_result(struct vdo_completion *completion, int result)
+{
+	assert_incomplete(completion);
+	if (completion->result == VDO_SUCCESS)
+		completion->result = result;
+}
+
+/**
+ * vdo_launch_completion_with_priority() - Run or enqueue a completion.
+ * @priority: The priority at which to enqueue the completion.
+ *
+ * If called on the correct thread (i.e. the one specified in the completion's callback_thread_id
+ * field) and not marked for requeue, the completion will be run immediately. Otherwise, the
+ * completion will be enqueued on the specified thread.
+ */
+void vdo_launch_completion_with_priority(struct vdo_completion *completion,
+					 enum vdo_completion_priority priority)
+{
+	thread_id_t callback_thread = completion->callback_thread_id;
+
+	if (completion->requeue || (callback_thread != vdo_get_callback_thread_id())) {
+		vdo_enqueue_completion(completion, priority);
+		return;
+	}
+
+	vdo_run_completion(completion);
+}
+
+/** vdo_finish_completion() - Mark a completion as complete and then launch it. */
+void vdo_finish_completion(struct vdo_completion *completion)
+{
+	assert_incomplete(completion);
+	completion->complete = true;
+	if (completion->callback != NULL)
+		vdo_launch_completion(completion);
+}
+
+void vdo_enqueue_completion(struct vdo_completion *completion,
+			    enum vdo_completion_priority priority)
+{
+	struct vdo *vdo = completion->vdo;
+	thread_id_t thread_id = completion->callback_thread_id;
+
+	if (ASSERT(thread_id < vdo->thread_config.thread_count,
+		   "thread_id %u (completion type %d) is less than thread count %u",
+		   thread_id,
+		   completion->type,
+		   vdo->thread_config.thread_count) != UDS_SUCCESS)
+		BUG();
+
+	completion->requeue = false;
+	completion->priority = priority;
+	completion->my_queue = NULL;
+	vdo_enqueue_work_queue(vdo->threads[thread_id].queue, completion);
+}
+
+/**
+ * vdo_requeue_completion_if_needed() - Requeue a completion if not called on the specified thread.
+ *
+ * Return: True if the completion was requeued; callers may not access the completion in this case.
+ */
+bool vdo_requeue_completion_if_needed(struct vdo_completion *completion,
+				      thread_id_t callback_thread_id)
+{
+	if (vdo_get_callback_thread_id() == callback_thread_id)
+		return false;
+
+	completion->callback_thread_id = callback_thread_id;
+	vdo_enqueue_completion(completion, VDO_WORK_Q_DEFAULT_PRIORITY);
+	return true;
+}
diff --git a/drivers/md/dm-vdo/completion.h b/drivers/md/dm-vdo/completion.h
new file mode 100644
index 00000000000..03b2daa6546
--- /dev/null
+++ b/drivers/md/dm-vdo/completion.h
@@ -0,0 +1,155 @@
+/* SPDX-License-Identifier: GPL-2.0-only */
+/*
+ * Copyright Red Hat
+ */
+
+#ifndef VDO_COMPLETION_H
+#define VDO_COMPLETION_H
+
+#include "permassert.h"
+
+#include "status-codes.h"
+#include "types.h"
+
+/**
+ * vdo_run_completion() - Run a completion's callback or error handler on the current thread.
+ *
+ * Context: This function must be called from the correct callback thread.
+ */
+static inline void vdo_run_completion(struct vdo_completion *completion)
+{
+	if ((completion->result != VDO_SUCCESS) && (completion->error_handler != NULL)) {
+		completion->error_handler(completion);
+		return;
+	}
+
+	completion->callback(completion);
+}
+
+void vdo_set_completion_result(struct vdo_completion *completion, int result);
+
+void vdo_initialize_completion(struct vdo_completion *completion,
+			       struct vdo *vdo,
+			       enum vdo_completion_type type);
+
+/**
+ * vdo_reset_completion() - Reset a completion to a clean state, while keeping the type, vdo and
+ *                          parent information.
+ */
+static inline void vdo_reset_completion(struct vdo_completion *completion)
+{
+	completion->result = VDO_SUCCESS;
+	completion->complete = false;
+}
+
+void vdo_launch_completion_with_priority(struct vdo_completion *completion,
+					 enum vdo_completion_priority priority);
+
+/**
+ * vdo_launch_completion() - Launch a completion with default priority.
+ */
+static inline void vdo_launch_completion(struct vdo_completion *completion)
+{
+	vdo_launch_completion_with_priority(completion, VDO_WORK_Q_DEFAULT_PRIORITY);
+}
+
+/**
+ * vdo_continue_completion() - Continue processing a completion.
+ * @result: The current result (will not mask older errors).
+ *
+ * Continue processing a completion by setting the current result and calling
+ * vdo_launch_completion().
+ */
+static inline void vdo_continue_completion(struct vdo_completion *completion, int result)
+{
+	vdo_set_completion_result(completion, result);
+	vdo_launch_completion(completion);
+}
+
+void vdo_finish_completion(struct vdo_completion *completion);
+
+/**
+ * vdo_fail_completion() - Set the result of a completion if it does not already have an error,
+ *                         then finish it.
+ */
+static inline void vdo_fail_completion(struct vdo_completion *completion, int result)
+{
+	vdo_set_completion_result(completion, result);
+	vdo_finish_completion(completion);
+}
+
+/**
+ * vdo_assert_completion_type() - Assert that a completion is of the correct type.
+ *
+ * Return: VDO_SUCCESS or an error
+ */
+static inline int
+vdo_assert_completion_type(struct vdo_completion *completion, enum vdo_completion_type expected)
+{
+	return ASSERT(expected == completion->type,
+		      "completion type should be %u, not %u",
+		      expected,
+		      completion->type);
+}
+
+static inline void vdo_set_completion_callback(struct vdo_completion *completion,
+					       vdo_action *callback,
+					       thread_id_t callback_thread_id)
+{
+	completion->callback = callback;
+	completion->callback_thread_id = callback_thread_id;
+}
+
+/**
+ * vdo_launch_completion_callback() - Set the callback for a completion and launch it immediately.
+ */
+static inline void vdo_launch_completion_callback(struct vdo_completion *completion,
+						  vdo_action *callback,
+						  thread_id_t callback_thread_id)
+{
+	vdo_set_completion_callback(completion, callback, callback_thread_id);
+	vdo_launch_completion(completion);
+}
+
+/**
+ * vdo_prepare_completion() - Prepare a completion for launch.
+ *
+ * Resets the completion, and then sets its callback, error handler, callback thread, and parent.
+ */
+static inline void vdo_prepare_completion(struct vdo_completion *completion,
+					  vdo_action *callback,
+					  vdo_action *error_handler,
+					  thread_id_t callback_thread_id,
+					  void *parent)
+{
+	vdo_reset_completion(completion);
+	vdo_set_completion_callback(completion, callback, callback_thread_id);
+	completion->error_handler = error_handler;
+	completion->parent = parent;
+}
+
+/**
+ * vdo_prepare_completion_for_requeue() - Prepare a completion for launch ensuring that it will
+ *                                        always be requeued.
+ *
+ * Resets the completion, and then sets its callback, error handler, callback thread, and parent.
+ */
+static inline void
+vdo_prepare_completion_for_requeue(struct vdo_completion *completion,
+				   vdo_action *callback,
+				   vdo_action *error_handler,
+				   thread_id_t callback_thread_id,
+				   void *parent)
+{
+	vdo_prepare_completion(completion, callback, error_handler, callback_thread_id, parent);
+	completion->requeue = true;
+}
+
+void vdo_enqueue_completion(struct vdo_completion *completion,
+			    enum vdo_completion_priority priority);
+
+
+bool vdo_requeue_completion_if_needed(struct vdo_completion *completion,
+				      thread_id_t callback_thread_id);
+
+#endif /* VDO_COMPLETION_H */
diff --git a/drivers/md/dm-vdo/cpu.h b/drivers/md/dm-vdo/cpu.h
new file mode 100644
index 00000000000..d2da22d9d60
--- /dev/null
+++ b/drivers/md/dm-vdo/cpu.h
@@ -0,0 +1,58 @@
+/* SPDX-License-Identifier: GPL-2.0-only */
+/*
+ * Copyright Red Hat
+ */
+
+#ifndef UDS_CPU_H
+#define UDS_CPU_H
+
+#include <linux/cache.h>
+
+/**
+ * uds_prefetch_address() - Minimize cache-miss latency by attempting to move data into a CPU cache
+ *                          before it is accessed.
+ *
+ * @address: the address to fetch (may be invalid)
+ * @for_write: must be constant at compile time--false if for reading, true if for writing
+ */
+static inline void uds_prefetch_address(const void *address, bool for_write)
+{
+	/*
+	 * for_write won't be a constant if we are compiled with optimization turned off, in which
+	 * case prefetching really doesn't matter. clang can't figure out that if for_write is a
+	 * constant, it can be passed as the second, mandatorily constant argument to prefetch(),
+	 * at least currently on llvm 12.
+	 */
+	if (__builtin_constant_p(for_write)) {
+		if (for_write)
+			__builtin_prefetch(address, true);
+		else
+			__builtin_prefetch(address, false);
+	}
+}
+
+/**
+ * uds_prefetch_range() - Minimize cache-miss latency by attempting to move a range of addresses
+ *                        into a CPU cache before they are accessed.
+ *
+ * @start: the starting address to fetch (may be invalid)
+ * @size: the number of bytes in the address range
+ * @for_write: must be constant at compile time--false if for reading, true if for writing
+ */
+static inline void uds_prefetch_range(const void *start, unsigned int size, bool for_write)
+{
+	/*
+	 * Count the number of cache lines to fetch, allowing for the address range to span an
+	 * extra cache line boundary due to address alignment.
+	 */
+	const char *address = (const char *) start;
+	unsigned int offset = ((uintptr_t) address % L1_CACHE_BYTES);
+	unsigned int cache_lines = (1 + ((size + offset) / L1_CACHE_BYTES));
+
+	while (cache_lines-- > 0) {
+		uds_prefetch_address(address, for_write);
+		address += L1_CACHE_BYTES;
+	}
+}
+
+#endif /* UDS_CPU_H */
diff --git a/drivers/md/dm-vdo/funnel-queue.c b/drivers/md/dm-vdo/funnel-queue.c
new file mode 100644
index 00000000000..7e36153ec0a
--- /dev/null
+++ b/drivers/md/dm-vdo/funnel-queue.c
@@ -0,0 +1,169 @@
+// SPDX-License-Identifier: GPL-2.0-only
+/*
+ * Copyright Red Hat
+ */
+
+#include "funnel-queue.h"
+
+#include "cpu.h"
+#include "memory-alloc.h"
+#include "permassert.h"
+#include "uds.h"
+
+int uds_make_funnel_queue(struct funnel_queue **queue_ptr)
+{
+	int result;
+	struct funnel_queue *queue;
+
+	result = UDS_ALLOCATE(1, struct funnel_queue, "funnel queue", &queue);
+	if (result != UDS_SUCCESS)
+		return result;
+
+	/*
+	 * Initialize the stub entry and put it in the queue, establishing the invariant that
+	 * queue->newest and queue->oldest are never null.
+	 */
+	queue->stub.next = NULL;
+	queue->newest = &queue->stub;
+	queue->oldest = &queue->stub;
+
+	*queue_ptr = queue;
+	return UDS_SUCCESS;
+}
+
+void uds_free_funnel_queue(struct funnel_queue *queue)
+{
+	UDS_FREE(queue);
+}
+
+static struct funnel_queue_entry *get_oldest(struct funnel_queue *queue)
+{
+	/*
+	 * Barrier requirements: We need a read barrier between reading a "next" field pointer
+	 * value and reading anything it points to. There's an accompanying barrier in
+	 * uds_funnel_queue_put() between its caller setting up the entry and making it visible.
+	 */
+	struct funnel_queue_entry *oldest = queue->oldest;
+	struct funnel_queue_entry *next = READ_ONCE(oldest->next);
+
+	if (oldest == &queue->stub) {
+		/*
+		 * When the oldest entry is the stub and it has no successor, the queue is
+		 * logically empty.
+		 */
+		if (next == NULL)
+			return NULL;
+		/*
+		 * The stub entry has a successor, so the stub can be dequeued and ignored without
+		 * breaking the queue invariants.
+		 */
+		oldest = next;
+		queue->oldest = oldest;
+		next = READ_ONCE(oldest->next);
+	}
+
+	/*
+	 * We have a non-stub candidate to dequeue. If it lacks a successor, we'll need to put the
+	 * stub entry back on the queue first.
+	 */
+	if (next == NULL) {
+		struct funnel_queue_entry *newest = READ_ONCE(queue->newest);
+
+		if (oldest != newest)
+			/*
+			 * Another thread has already swung queue->newest atomically, but not yet
+			 * assigned previous->next. The queue is really still empty.
+			 */
+			return NULL;
+
+		/*
+		 * Put the stub entry back on the queue, ensuring a successor will eventually be
+		 * seen.
+		 */
+		uds_funnel_queue_put(queue, &queue->stub);
+
+		/* Check again for a successor. */
+		next = READ_ONCE(oldest->next);
+		if (next == NULL)
+			/*
+			 * We lost a race with a producer who swapped queue->newest before we did,
+			 * but who hasn't yet updated previous->next. Try again later.
+			 */
+			return NULL;
+	}
+
+	return oldest;
+}
+
+/*
+ * Poll a queue, removing the oldest entry if the queue is not empty. This function must only be
+ * called from a single consumer thread.
+ */
+struct funnel_queue_entry *uds_funnel_queue_poll(struct funnel_queue *queue)
+{
+	struct funnel_queue_entry *oldest = get_oldest(queue);
+
+	if (oldest == NULL)
+		return oldest;
+
+	/*
+	 * Dequeue the oldest entry and return it. Only one consumer thread may call this function,
+	 * so no locking, atomic operations, or fences are needed; queue->oldest is owned by the
+	 * consumer and oldest->next is never used by a producer thread after it is swung from NULL
+	 * to non-NULL.
+	 */
+	queue->oldest = READ_ONCE(oldest->next);
+	/*
+	 * Make sure the caller sees the proper stored data for this entry. Since we've already
+	 * fetched the entry pointer we stored in "queue->oldest", this also ensures that on entry
+	 * to the next call we'll properly see the dependent data.
+	 */
+	smp_rmb();
+	/*
+	 * If "oldest" is a very light-weight work item, we'll be looking for the next one very
+	 * soon, so prefetch it now.
+	 */
+	uds_prefetch_address(queue->oldest, true);
+	WRITE_ONCE(oldest->next, NULL);
+	return oldest;
+}
+
+/*
+ * Check whether the funnel queue is empty or not. If the queue is in a transition state with one
+ * or more entries being added such that the list view is incomplete, this function will report the
+ * queue as empty.
+ */
+bool uds_is_funnel_queue_empty(struct funnel_queue *queue)
+{
+	return get_oldest(queue) == NULL;
+}
+
+/*
+ * Check whether the funnel queue is idle or not. If the queue has entries available to be
+ * retrieved, it is not idle. If the queue is in a transition state with one or more entries being
+ * added such that the list view is incomplete, it may not be possible to retrieve an entry with
+ * the uds_funnel_queue_poll() function, but the queue will not be considered idle.
+ */
+bool uds_is_funnel_queue_idle(struct funnel_queue *queue)
+{
+	/*
+	 * Oldest is not the stub, so there's another entry, though if next is NULL we can't
+	 * retrieve it yet.
+	 */
+	if (queue->oldest != &queue->stub)
+		return false;
+
+	/*
+	 * Oldest is the stub, but newest has been updated by _put(); either there's another,
+	 * retrievable entry in the list, or the list is officially empty but in the intermediate
+	 * state of having an entry added.
+	 *
+	 * Whether anything is retrievable depends on whether stub.next has been updated and become
+	 * visible to us, but for idleness we don't care. And due to memory ordering in _put(), the
+	 * update to newest would be visible to us at the same time or sooner.
+	 */
+	if (READ_ONCE(queue->newest) != &queue->stub)
+		return false;
+
+	return true;
+}
diff --git a/drivers/md/dm-vdo/funnel-queue.h b/drivers/md/dm-vdo/funnel-queue.h
new file mode 100644
index 00000000000..332acc579b6
--- /dev/null
+++ b/drivers/md/dm-vdo/funnel-queue.h
@@ -0,0 +1,110 @@
+/* SPDX-License-Identifier: GPL-2.0-only */
+/*
+ * Copyright Red Hat
+ */
+
+#ifndef UDS_FUNNEL_QUEUE_H
+#define UDS_FUNNEL_QUEUE_H
+
+#include <linux/atomic.h>
+#include <linux/cache.h>
+
+/*
+ * A funnel queue is a simple (almost) lock-free queue that accepts entries from multiple threads
+ * (multi-producer) and delivers them to a single thread (single-consumer). "Funnel" is an attempt
+ * to evoke the image of requests from more than one producer being "funneled down" to a single
+ * consumer.
+ *
+ * This is an unsynchronized but thread-safe data structure when used as intended. There is no
+ * mechanism to ensure that only one thread is consuming from the queue. If more than one thread
+ * attempts to consume from the queue, the resulting behavior is undefined. Clients must not
+ * directly access or manipulate the internals of the queue, which are only exposed for the purpose
+ * of allowing the very simple enqueue operation to be inlined.
+ *
+ * The implementation requires that a funnel_queue_entry structure (a link pointer) is embedded in
+ * the queue entries, and pointers to those structures are used exclusively by the queue. No macros
+ * are defined to template the queue, so the offset of the funnel_queue_entry in the records placed
+ * in the queue must all be the same so the client can derive their structure pointer from the
+ * entry pointer returned by uds_funnel_queue_poll().
+ *
+ * Callers are wholly responsible for allocating and freeing the entries. Entries may be freed as
+ * soon as they are returned since this queue is not susceptible to the "ABA problem" present in
+ * many lock-free data structures. The queue is dynamically allocated to ensure cache-line
+ * alignment, but no other dynamic allocation is used.
+ *
+ * The algorithm is not actually 100% lock-free. There is a single point in uds_funnel_queue_put()
+ * at which a preempted producer will prevent the consumers from seeing items added to the queue by
+ * later producers, and only if the queue is short enough or the consumer fast enough for it to
+ * reach what was the end of the queue at the time of the preemption.
+ *
+ * The consumer function, uds_funnel_queue_poll(), will return NULL when the queue is empty. To
+ * wait for data to consume, spin (if safe) or combine the queue with a struct event_count to
+ * signal the presence of new entries.
+ */
+
+/* This queue link structure must be embedded in client entries. */
+struct funnel_queue_entry {
+	/* The next (newer) entry in the queue. */
+	struct funnel_queue_entry *next;
+};
+
+/*
+ * The dynamically allocated queue structure, which is allocated on a cache line boundary so the
+ * producer and consumer fields in the structure will land on separate cache lines. This should be
+ * consider opaque but it is exposed here so uds_funnel_queue_put() can be inlined.
+ */
+struct __aligned(L1_CACHE_BYTES) funnel_queue {
+	/*
+	 * The producers' end of the queue, an atomically exchanged pointer that will never be
+	 * NULL.
+	 */
+	struct funnel_queue_entry *newest;
+
+	/* The consumer's end of the queue, which is owned by the consumer and never NULL. */
+	struct funnel_queue_entry *oldest __aligned(L1_CACHE_BYTES);
+
+	/* A dummy entry used to provide the non-NULL invariants above. */
+	struct funnel_queue_entry stub;
+};
+
+int __must_check uds_make_funnel_queue(struct funnel_queue **queue_ptr);
+
+void uds_free_funnel_queue(struct funnel_queue *queue);
+
+/*
+ * Put an entry on the end of the queue.
+ *
+ * The entry pointer must be to the struct funnel_queue_entry embedded in the caller's data
+ * structure. The caller must be able to derive the address of the start of their data structure
+ * from the pointer that passed in here, so every entry in the queue must have the struct
+ * funnel_queue_entry at the same offset within the client's structure.
+ */
+static inline void
+uds_funnel_queue_put(struct funnel_queue *queue, struct funnel_queue_entry *entry)
+{
+	struct funnel_queue_entry *previous;
+
+	/*
+	 * Barrier requirements: All stores relating to the entry ("next" pointer, containing data
+	 * structure fields) must happen before the previous->next store making it visible to the
+	 * consumer. Also, the entry's "next" field initialization to NULL must happen before any
+	 * other producer threads can see the entry (the xchg) and try to update the "next" field.
+	 *
+	 * xchg implements a full barrier.
+	 */
+	WRITE_ONCE(entry->next, NULL);
+	previous = xchg(&queue->newest, entry);
+	/*
+	 * Preemptions between these two statements hide the rest of the queue from the consumer,
+	 * preventing consumption until the following assignment runs.
+	 */
+	WRITE_ONCE(previous->next, entry);
+}
+
+struct funnel_queue_entry *__must_check uds_funnel_queue_poll(struct funnel_queue *queue);
+
+bool __must_check uds_is_funnel_queue_empty(struct funnel_queue *queue);
+
+bool __must_check uds_is_funnel_queue_idle(struct funnel_queue *queue);
+
+#endif /* UDS_FUNNEL_QUEUE_H */
diff --git a/drivers/md/dm-vdo/request-queue.c b/drivers/md/dm-vdo/request-queue.c
new file mode 100644
index 00000000000..058422f44d9
--- /dev/null
+++ b/drivers/md/dm-vdo/request-queue.c
@@ -0,0 +1,284 @@
+// SPDX-License-Identifier: GPL-2.0-only
+/*
+ * Copyright Red Hat
+ */
+
+#include "request-queue.h"
+
+#include <linux/atomic.h>
+#include <linux/compiler.h>
+#include <linux/wait.h>
+
+#include "funnel-queue.h"
+#include "logger.h"
+#include "memory-alloc.h"
+#include "uds-threads.h"
+
+/*
+ * This queue will attempt to handle requests in reasonably sized batches instead of reacting
+ * immediately to each new request. The wait time between batches is dynamically adjusted up or
+ * down to try to balance responsiveness against wasted thread run time.
+ *
+ * If the wait time becomes long enough, the queue will become dormant and must be explicitly
+ * awoken when a new request is enqueued. The enqueue operation updates "newest" in the funnel
+ * queue via xchg (which is a memory barrier), and later checks "dormant" to decide whether to do a
+ * wakeup of the worker thread.
+ *
+ * When deciding to go to sleep, the worker thread sets "dormant" and then examines "newest" to
+ * decide if the funnel queue is idle. In dormant mode, the last examination of "newest" before
+ * going to sleep is done inside the wait_event_interruptible() macro, after a point where one or
+ * more memory barriers have been issued. (Preparing to sleep uses spin locks.) Even if the funnel
+ * queue's "next" field update isn't visible yet to make the entry accessible, its existence will
+ * kick the worker thread out of dormant mode and back into timer-based mode.
+ *
+ * Unbatched requests are used to communicate between different zone threads and will also cause
+ * the queue to awaken immediately.
+ */
+
+enum {
+	NANOSECOND = 1,
+	MICROSECOND = 1000 * NANOSECOND,
+	MILLISECOND = 1000 * MICROSECOND,
+	DEFAULT_WAIT_TIME = 20 * MICROSECOND,
+	MINIMUM_WAIT_TIME = DEFAULT_WAIT_TIME / 2,
+	MAXIMUM_WAIT_TIME = MILLISECOND,
+	MINIMUM_BATCH = 32,
+	MAXIMUM_BATCH = 64,
+};
+
+struct uds_request_queue {
+	/* Wait queue for synchronizing producers and consumer */
+	struct wait_queue_head wait_head;
+	/* Function to process a request */
+	uds_request_queue_processor_t *processor;
+	/* Queue of new incoming requests */
+	struct funnel_queue *main_queue;
+	/* Queue of old requests to retry */
+	struct funnel_queue *retry_queue;
+	/* The thread id of the worker thread */
+	struct thread *thread;
+	/* True if the worker was started */
+	bool started;
+	/* When true, requests can be enqueued */
+	bool running;
+	/* A flag set when the worker is waiting without a timeout */
+	atomic_t dormant;
+};
+
+static inline struct uds_request *poll_queues(struct uds_request_queue *queue)
+{
+	struct funnel_queue_entry *entry;
+
+	entry = uds_funnel_queue_poll(queue->retry_queue);
+	if (entry != NULL)
+		return container_of(entry, struct uds_request, queue_link);
+
+	entry = uds_funnel_queue_poll(queue->main_queue);
+	if (entry != NULL)
+		return container_of(entry, struct uds_request, queue_link);
+
+	return NULL;
+}
+
+static inline bool are_queues_idle(struct uds_request_queue *queue)
+{
+	return uds_is_funnel_queue_idle(queue->retry_queue) &&
+	       uds_is_funnel_queue_idle(queue->main_queue);
+}
+
+/*
+ * Determine if there is a next request to process, and return it if there is. Also return flags
+ * indicating whether the worker thread can sleep (for the use of wait_event() macros) and whether
+ * the thread did sleep before returning a new request.
+ */
+static inline bool dequeue_request(struct uds_request_queue *queue,
+				   struct uds_request **request_ptr,
+				   bool *waited_ptr)
+{
+	struct uds_request *request = poll_queues(queue);
+
+	if (request != NULL) {
+		*request_ptr = request;
+		return true;
+	}
+
+	if (!READ_ONCE(queue->running)) {
+		/* Wake the worker thread so it can exit. */
+		*request_ptr = NULL;
+		return true;
+	}
+
+	*request_ptr = NULL;
+	*waited_ptr = true;
+	return false;
+}
+
+static void wait_for_request(struct uds_request_queue *queue,
+			     bool dormant,
+			     unsigned long timeout,
+			     struct uds_request **request,
+			     bool *waited)
+{
+	if (dormant) {
+		wait_event_interruptible(queue->wait_head,
+					 (dequeue_request(queue, request, waited) ||
+					  !are_queues_idle(queue)));
+		return;
+	}
+
+	wait_event_interruptible_hrtimeout(queue->wait_head,
+					   dequeue_request(queue, request, waited),
+					   ns_to_ktime(timeout));
+}
+
+static void request_queue_worker(void *arg)
+{
+	struct uds_request_queue *queue = (struct uds_request_queue *) arg;
+	struct uds_request *request = NULL;
+	unsigned long time_batch = DEFAULT_WAIT_TIME;
+	bool dormant = atomic_read(&queue->dormant);
+	bool waited = false;
+	long current_batch = 0;
+
+	for (;;) {
+		wait_for_request(queue, dormant, time_batch, &request, &waited);
+		if (likely(request != NULL)) {
+			current_batch++;
+			queue->processor(request);
+		} else if (!READ_ONCE(queue->running)) {
+			break;
+		}
+
+		if (dormant) {
+			/*
+			 * The queue has been roused from dormancy. Clear the flag so enqueuers can
+			 * stop broadcasting. No fence is needed for this transition.
+			 */
+			atomic_set(&queue->dormant, false);
+			dormant = false;
+			time_batch = DEFAULT_WAIT_TIME;
+		} else if (waited) {
+			/*
+			 * We waited for this request to show up. Adjust the wait time to smooth
+			 * out the batch size.
+			 */
+			if (current_batch < MINIMUM_BATCH) {
+				/*
+				 * If the last batch of requests was too small, increase the wait
+				 * time.
+				 */
+				time_batch += time_batch / 4;
+				if (time_batch >= MAXIMUM_WAIT_TIME) {
+					atomic_set(&queue->dormant, true);
+					dormant = true;
+				}
+			} else if (current_batch > MAXIMUM_BATCH) {
+				/*
+				 * If the last batch of requests was too large, decrease the wait
+				 * time.
+				 */
+				time_batch -= time_batch / 4;
+				if (time_batch < MINIMUM_WAIT_TIME)
+					time_batch = MINIMUM_WAIT_TIME;
+			}
+			current_batch = 0;
+		}
+	}
+
+	/*
+	 * Ensure that we process any remaining requests that were enqueued before trying to shut
+	 * down. The corresponding write barrier is in uds_request_queue_finish().
+	 */
+	smp_rmb();
+	while ((request = poll_queues(queue)) != NULL)
+		queue->processor(request);
+}
+
+int uds_make_request_queue(const char *queue_name,
+			   uds_request_queue_processor_t *processor,
+			   struct uds_request_queue **queue_ptr)
+{
+	int result;
+	struct uds_request_queue *queue;
+
+	result = UDS_ALLOCATE(1, struct uds_request_queue, __func__, &queue);
+	if (result != UDS_SUCCESS)
+		return result;
+
+	queue->processor = processor;
+	queue->running = true;
+	atomic_set(&queue->dormant, false);
+	init_waitqueue_head(&queue->wait_head);
+
+	result = uds_make_funnel_queue(&queue->main_queue);
+	if (result != UDS_SUCCESS) {
+		uds_request_queue_finish(queue);
+		return result;
+	}
+
+	result = uds_make_funnel_queue(&queue->retry_queue);
+	if (result != UDS_SUCCESS) {
+		uds_request_queue_finish(queue);
+		return result;
+	}
+
+	result = uds_create_thread(request_queue_worker, queue, queue_name, &queue->thread);
+	if (result != UDS_SUCCESS) {
+		uds_request_queue_finish(queue);
+		return result;
+	}
+
+	queue->started = true;
+	*queue_ptr = queue;
+	return UDS_SUCCESS;
+}
+
+static inline void wake_up_worker(struct uds_request_queue *queue)
+{
+	if (wq_has_sleeper(&queue->wait_head))
+		wake_up(&queue->wait_head);
+}
+
+void uds_request_queue_enqueue(struct uds_request_queue *queue, struct uds_request *request)
+{
+	struct funnel_queue *sub_queue;
+	bool unbatched = request->unbatched;
+
+	sub_queue = request->requeued ? queue->retry_queue : queue->main_queue;
+	uds_funnel_queue_put(sub_queue, &request->queue_link);
+
+	/*
+	 * We must wake the worker thread when it is dormant. A read fence isn't needed here since
+	 * we know the queue operation acts as one.
+	 */
+	if (atomic_read(&queue->dormant) || unbatched)
+		wake_up_worker(queue);
+}
+
+void uds_request_queue_finish(struct uds_request_queue *queue)
+{
+	int result;
+
+	if (queue == NULL)
+		return;
+
+	/*
+	 * This memory barrier ensures that any requests we queued will be seen. The point is that
+	 * when dequeue_request() sees the following update to the running flag, it will also be
+	 * able to see any change we made to a next field in the funnel queue entry. The
+	 * corresponding read barrier is in request_queue_worker().
+	 */
+	smp_wmb();
+	WRITE_ONCE(queue->running, false);
+
+	if (queue->started) {
+		wake_up_worker(queue);
+		result = uds_join_threads(queue->thread);
+		if (result != UDS_SUCCESS)
+			uds_log_warning_strerror(result, "Failed to join worker thread");
+	}
+
+	uds_free_funnel_queue(queue->main_queue);
+	uds_free_funnel_queue(queue->retry_queue);
+	UDS_FREE(queue);
+}
diff --git a/drivers/md/dm-vdo/request-queue.h b/drivers/md/dm-vdo/request-queue.h
new file mode 100644
index 00000000000..e736072b35a
--- /dev/null
+++ b/drivers/md/dm-vdo/request-queue.h
@@ -0,0 +1,30 @@
+/* SPDX-License-Identifier: GPL-2.0-only */
+/*
+ * Copyright Red Hat
+ */
+
+#ifndef UDS_REQUEST_QUEUE_H
+#define UDS_REQUEST_QUEUE_H
+
+#include "uds.h"
+
+/*
+ * A simple request queue which will handle new requests in the order in which they are received,
+ * and will attempt to handle requeued requests before new ones. However, the nature of the
+ * implementation means that it cannot guarantee this ordering; the prioritization is merely a
+ * hint.
+ */
+
+struct uds_request_queue;
+
+typedef void uds_request_queue_processor_t(struct uds_request *);
+
+int __must_check uds_make_request_queue(const char *queue_name,
+					uds_request_queue_processor_t *processor,
+					struct uds_request_queue **queue_ptr);
+
+void uds_request_queue_enqueue(struct uds_request_queue *queue, struct uds_request *request);
+
+void uds_request_queue_finish(struct uds_request_queue *queue);
+
+#endif /* UDS_REQUEST_QUEUE_H */
diff --git a/drivers/md/dm-vdo/work-queue.c b/drivers/md/dm-vdo/work-queue.c
new file mode 100644
index 00000000000..ecb0c345d4f
--- /dev/null
+++ b/drivers/md/dm-vdo/work-queue.c
@@ -0,0 +1,659 @@
+// SPDX-License-Identifier: GPL-2.0-only
+/*
+ * Copyright Red Hat
+ */
+
+#include "work-queue.h"
+
+#include <linux/atomic.h>
+#include <linux/cache.h>
+#include <linux/completion.h>
+#include <linux/err.h>
+#include <linux/kthread.h>
+#include <linux/percpu.h>
+
+#include "funnel-queue.h"
+#include "logger.h"
+#include "memory-alloc.h"
+#include "numeric.h"
+#include "permassert.h"
+#include "string-utils.h"
+
+#include "completion.h"
+#include "status-codes.h"
+
+static DEFINE_PER_CPU(unsigned int, service_queue_rotor);
+
+/**
+ * DOC: Work queue definition.
+ *
+ * There are two types of work queues: simple, with one worker thread, and round-robin, which uses
+ * a group of the former to do the work, and assigns work to them in round-robin fashion (roughly).
+ * Externally, both are represented via the same common sub-structure, though there's actually not
+ * a great deal of overlap between the two types internally.
+ */
+struct vdo_work_queue {
+	/* Name of just the work queue (e.g., "cpuQ12") */
+	char *name;
+	bool round_robin_mode;
+	struct vdo_thread *owner;
+	/* Life cycle functions, etc */
+	const struct vdo_work_queue_type *type;
+};
+
+struct simple_work_queue {
+	struct vdo_work_queue common;
+	struct funnel_queue *priority_lists[VDO_WORK_Q_MAX_PRIORITY + 1];
+	void *private;
+
+	/*
+	 * The fields above are unchanged after setup but often read, and are good candidates for
+	 * caching -- and if the max priority is 2, just fit in one x86-64 cache line if aligned.
+	 * The fields below are often modified as we sleep and wake, so we want a separate cache
+	 * line for performance.
+	 */
+
+	/* Any (0 or 1) worker threads waiting for new work to do */
+	wait_queue_head_t waiting_worker_threads ____cacheline_aligned;
+	/* Hack to reduce wakeup calls if the worker thread is running */
+	atomic_t idle;
+
+	/* These are infrequently used so in terms of performance we don't care where they land. */
+	struct task_struct *thread;
+	/* Notify creator once worker has initialized */
+	struct completion *started;
+};
+
+struct round_robin_work_queue {
+	struct vdo_work_queue common;
+	struct simple_work_queue **service_queues;
+	unsigned int num_service_queues;
+};
+
+static inline struct simple_work_queue *as_simple_work_queue(struct vdo_work_queue *queue)
+{
+	return ((queue == NULL) ? NULL : container_of(queue, struct simple_work_queue, common));
+}
+
+static inline struct round_robin_work_queue *
+as_round_robin_work_queue(struct vdo_work_queue *queue)
+{
+	return ((queue == NULL) ?
+		 NULL :
+		 container_of(queue, struct round_robin_work_queue, common));
+}
+
+/* Processing normal completions. */
+
+/*
+ * Dequeue and return the next waiting completion, if any.
+ *
+ * We scan the funnel queues from highest priority to lowest, once; there is therefore a race
+ * condition where a high-priority completion can be enqueued followed by a lower-priority one, and
+ * we'll grab the latter (but we'll catch the high-priority item on the next call). If strict
+ * enforcement of priorities becomes necessary, this function will need fixing.
+ */
+static struct vdo_completion *poll_for_completion(struct simple_work_queue *queue)
+{
+	int i;
+
+	for (i = queue->common.type->max_priority; i >= 0; i--) {
+		struct funnel_queue_entry *link = uds_funnel_queue_poll(queue->priority_lists[i]);
+
+		if (link != NULL)
+			return container_of(link, struct vdo_completion, work_queue_entry_link);
+	}
+
+	return NULL;
+}
+
+static void
+enqueue_work_queue_completion(struct simple_work_queue *queue, struct vdo_completion *completion)
+{
+	ASSERT_LOG_ONLY(completion->my_queue == NULL,
+			"completion %px (fn %px) to enqueue (%px) is not already queued (%px)",
+			completion,
+			completion->callback,
+			queue,
+			completion->my_queue);
+	if (completion->priority == VDO_WORK_Q_DEFAULT_PRIORITY)
+		completion->priority = queue->common.type->default_priority;
+
+	if (ASSERT(completion->priority <= queue->common.type->max_priority,
+		   "priority is in range for queue") != VDO_SUCCESS)
+		completion->priority = 0;
+
+	completion->my_queue = &queue->common;
+
+	/* Funnel queue handles the synchronization for the put. */
+	uds_funnel_queue_put(queue->priority_lists[completion->priority],
+			     &completion->work_queue_entry_link);
+
+	/*
+	 * Due to how funnel queue synchronization is handled (just atomic operations), the
+	 * simplest safe implementation here would be to wake-up any waiting threads after
+	 * enqueueing each item. Even if the funnel queue is not empty at the time of adding an
+	 * item to the queue, the consumer thread may not see this since it is not guaranteed to
+	 * have the same view of the queue as a producer thread.
+	 *
+	 * However, the above is wasteful so instead we attempt to minimize the number of thread
+	 * wakeups. Using an idle flag, and careful ordering using memory barriers, we should be
+	 * able to determine when the worker thread might be asleep or going to sleep. We use
+	 * cmpxchg to try to take ownership (vs other producer threads) of the responsibility for
+	 * waking the worker thread, so multiple wakeups aren't tried at once.
+	 *
+	 * This was tuned for some x86 boxes that were handy; it's untested whether doing the read
+	 * first is any better or worse for other platforms, even other x86 configurations.
+	 */
+	smp_mb();
+	if ((atomic_read(&queue->idle) != 1) || (atomic_cmpxchg(&queue->idle, 1, 0) != 1))
+		return;
+
+	/* There's a maximum of one thread in this list. */
+	wake_up(&queue->waiting_worker_threads);
+}
+
+static void run_start_hook(struct simple_work_queue *queue)
+{
+	if (queue->common.type->start != NULL)
+		queue->common.type->start(queue->private);
+}
+
+static void run_finish_hook(struct simple_work_queue *queue)
+{
+	if (queue->common.type->finish != NULL)
+		queue->common.type->finish(queue->private);
+}
+
+/*
+ * Wait for the next completion to process, or until kthread_should_stop indicates that it's time
+ * for us to shut down.
+ *
+ * If kthread_should_stop says it's time to stop but we have pending completions return a
+ * completion.
+ *
+ * Also update statistics relating to scheduler interactions.
+ */
+static struct vdo_completion *wait_for_next_completion(struct simple_work_queue *queue)
+{
+	struct vdo_completion *completion;
+	DEFINE_WAIT(wait);
+
+	while (true) {
+		prepare_to_wait(&queue->waiting_worker_threads, &wait, TASK_INTERRUPTIBLE);
+		/*
+		 * Don't set the idle flag until a wakeup will not be lost.
+		 *
+		 * Force synchronization between setting the idle flag and checking the funnel
+		 * queue; the producer side will do them in the reverse order. (There's still a
+		 * race condition we've chosen to allow, because we've got a timeout below that
+		 * unwedges us if we hit it, but this may narrow the window a little.)
+		 */
+		atomic_set(&queue->idle, 1);
+		smp_mb(); /* store-load barrier between "idle" and funnel queue */
+
+		completion = poll_for_completion(queue);
+		if (completion != NULL)
+			break;
+
+		/*
+		 * We need to check for thread-stop after setting TASK_INTERRUPTIBLE state up
+		 * above. Otherwise, schedule() will put the thread to sleep and might miss a
+		 * wakeup from kthread_stop() call in vdo_finish_work_queue().
+		 */
+		if (kthread_should_stop())
+			break;
+
+		schedule();
+
+		/*
+		 * Most of the time when we wake, it should be because there's work to do. If it
+		 * was a spurious wakeup, continue looping.
+		 */
+		completion = poll_for_completion(queue);
+		if (completion != NULL)
+			break;
+	}
+
+	finish_wait(&queue->waiting_worker_threads, &wait);
+	atomic_set(&queue->idle, 0);
+
+	return completion;
+}
+
+static void process_completion(struct simple_work_queue *queue, struct vdo_completion *completion)
+{
+	if (ASSERT(completion->my_queue == &queue->common,
+		   "completion %px from queue %px marked as being in this queue (%px)",
+		   completion,
+		   queue,
+		   completion->my_queue) == UDS_SUCCESS)
+		completion->my_queue = NULL;
+
+	vdo_run_completion(completion);
+}
+
+static void service_work_queue(struct simple_work_queue *queue)
+{
+	run_start_hook(queue);
+
+	while (true) {
+		struct vdo_completion *completion = poll_for_completion(queue);
+
+		if (completion == NULL)
+			completion = wait_for_next_completion(queue);
+
+		if (completion == NULL)
+			/* No completions but kthread_should_stop() was triggered. */
+			break;
+
+		process_completion(queue, completion);
+
+		/*
+		 * Be friendly to a CPU that has other work to do, if the kernel has told us to.
+		 * This speeds up some performance tests; that "other work" might include other VDO
+		 * threads.
+		 */
+		if (need_resched())
+			cond_resched();
+	}
+
+	run_finish_hook(queue);
+}
+
+static int work_queue_runner(void *ptr)
+{
+	struct simple_work_queue *queue = ptr;
+
+	complete(queue->started);
+	service_work_queue(queue);
+	return 0;
+}
+
+/* Creation & teardown */
+
+static void free_simple_work_queue(struct simple_work_queue *queue)
+{
+	unsigned int i;
+
+	for (i = 0; i <= VDO_WORK_Q_MAX_PRIORITY; i++)
+		uds_free_funnel_queue(queue->priority_lists[i]);
+	UDS_FREE(queue->common.name);
+	UDS_FREE(queue);
+}
+
+static void free_round_robin_work_queue(struct round_robin_work_queue *queue)
+{
+	struct simple_work_queue **queue_table = queue->service_queues;
+	unsigned int count = queue->num_service_queues;
+	unsigned int i;
+
+	queue->service_queues = NULL;
+
+	for (i = 0; i < count; i++)
+		free_simple_work_queue(queue_table[i]);
+	UDS_FREE(queue_table);
+	UDS_FREE(queue->common.name);
+	UDS_FREE(queue);
+}
+
+void vdo_free_work_queue(struct vdo_work_queue *queue)
+{
+	if (queue == NULL)
+		return;
+
+	vdo_finish_work_queue(queue);
+
+	if (queue->round_robin_mode)
+		free_round_robin_work_queue(as_round_robin_work_queue(queue));
+	else
+		free_simple_work_queue(as_simple_work_queue(queue));
+}
+
+static int make_simple_work_queue(const char *thread_name_prefix,
+				  const char *name,
+				  struct vdo_thread *owner,
+				  void *private,
+				  const struct vdo_work_queue_type *type,
+				  struct simple_work_queue **queue_ptr)
+{
+	DECLARE_COMPLETION_ONSTACK(started);
+	struct simple_work_queue *queue;
+	int i;
+	struct task_struct *thread = NULL;
+	int result;
+
+	ASSERT_LOG_ONLY((type->max_priority <= VDO_WORK_Q_MAX_PRIORITY),
+			"queue priority count %u within limit %u",
+			type->max_priority,
+			VDO_WORK_Q_MAX_PRIORITY);
+
+	result = UDS_ALLOCATE(1, struct simple_work_queue, "simple work queue", &queue);
+	if (result != UDS_SUCCESS)
+		return result;
+
+	queue->private = private;
+	queue->started = &started;
+	queue->common.type = type;
+	queue->common.owner = owner;
+	init_waitqueue_head(&queue->waiting_worker_threads);
+
+	result = uds_duplicate_string(name, "queue name", &queue->common.name);
+	if (result != VDO_SUCCESS) {
+		UDS_FREE(queue);
+		return -ENOMEM;
+	}
+
+	for (i = 0; i <= type->max_priority; i++) {
+		result = uds_make_funnel_queue(&queue->priority_lists[i]);
+		if (result != UDS_SUCCESS) {
+			free_simple_work_queue(queue);
+			return result;
+		}
+	}
+
+	thread = kthread_run(work_queue_runner,
+			     queue,
+			     "%s:%s",
+			     thread_name_prefix,
+			     queue->common.name);
+	if (IS_ERR(thread)) {
+		free_simple_work_queue(queue);
+		return (int) PTR_ERR(thread);
+	}
+
+	queue->thread = thread;
+
+	/*
+	 * If we don't wait to ensure the thread is running VDO code, a quick kthread_stop (due to
+	 * errors elsewhere) could cause it to never get as far as running VDO, skipping the
+	 * cleanup code.
+	 *
+	 * Eventually we should just make that path safe too, and then we won't need this
+	 * synchronization.
+	 */
+	wait_for_completion(&started);
+
+	*queue_ptr = queue;
+	return UDS_SUCCESS;
+}
+
+/**
+ * vdo_make_work_queue() - Create a work queue; if multiple threads are requested, completions will
+ *                         be distributed to them in round-robin fashion.
+ *
+ * Each queue is associated with a struct vdo_thread which has a single vdo thread id. Regardless
+ * of the actual number of queues and threads allocated here, code outside of the queue
+ * implementation will treat this as a single zone.
+ */
+int vdo_make_work_queue(const char *thread_name_prefix,
+			const char *name,
+			struct vdo_thread *owner,
+			const struct vdo_work_queue_type *type,
+			unsigned int thread_count,
+			void *thread_privates[],
+			struct vdo_work_queue **queue_ptr)
+{
+	struct round_robin_work_queue *queue;
+	int result;
+	char thread_name[TASK_COMM_LEN];
+	unsigned int i;
+
+	if (thread_count == 1) {
+		struct simple_work_queue *simple_queue;
+		void *context = ((thread_privates != NULL) ? thread_privates[0] : NULL);
+
+		result = make_simple_work_queue(thread_name_prefix,
+						name,
+						owner,
+						context,
+						type,
+						&simple_queue);
+		if (result == VDO_SUCCESS)
+			*queue_ptr = &simple_queue->common;
+		return result;
+	}
+
+	result = UDS_ALLOCATE(1, struct round_robin_work_queue, "round-robin work queue", &queue);
+	if (result != UDS_SUCCESS)
+		return result;
+
+	result = UDS_ALLOCATE(thread_count,
+			      struct simple_work_queue *,
+			      "subordinate work queues",
+			      &queue->service_queues);
+	if (result != UDS_SUCCESS) {
+		UDS_FREE(queue);
+		return result;
+	}
+
+	queue->num_service_queues = thread_count;
+	queue->common.round_robin_mode = true;
+	queue->common.owner = owner;
+
+	result = uds_duplicate_string(name, "queue name", &queue->common.name);
+	if (result != VDO_SUCCESS) {
+		UDS_FREE(queue->service_queues);
+		UDS_FREE(queue);
+		return -ENOMEM;
+	}
+
+	*queue_ptr = &queue->common;
+
+	for (i = 0; i < thread_count; i++) {
+		void *context = ((thread_privates != NULL) ? thread_privates[i] : NULL);
+
+		snprintf(thread_name, sizeof(thread_name), "%s%u", name, i);
+		result = make_simple_work_queue(thread_name_prefix,
+						thread_name,
+						owner,
+						context,
+						type,
+						&queue->service_queues[i]);
+		if (result != VDO_SUCCESS) {
+			queue->num_service_queues = i;
+			/* Destroy previously created subordinates. */
+			vdo_free_work_queue(UDS_FORGET(*queue_ptr));
+			return result;
+		}
+	}
+
+	return VDO_SUCCESS;
+}
+
+static void finish_simple_work_queue(struct simple_work_queue *queue)
+{
+	if (queue->thread == NULL)
+		return;
+
+	/* Tells the worker thread to shut down and waits for it to exit. */
+	kthread_stop(queue->thread);
+	queue->thread = NULL;
+}
+
+static void finish_round_robin_work_queue(struct round_robin_work_queue *queue)
+{
+	struct simple_work_queue **queue_table = queue->service_queues;
+	unsigned int count = queue->num_service_queues;
+	unsigned int i;
+
+	for (i = 0; i < count; i++)
+		finish_simple_work_queue(queue_table[i]);
+}
+
+/* No enqueueing of completions should be done once this function is called. */
+void vdo_finish_work_queue(struct vdo_work_queue *queue)
+{
+	if (queue == NULL)
+		return;
+
+	if (queue->round_robin_mode)
+		finish_round_robin_work_queue(as_round_robin_work_queue(queue));
+	else
+		finish_simple_work_queue(as_simple_work_queue(queue));
+}
+
+/* Debugging dumps */
+
+static void dump_simple_work_queue(struct simple_work_queue *queue)
+{
+	const char *thread_status = "no threads";
+	char task_state_report = '-';
+
+	if (queue->thread != NULL) {
+		task_state_report = task_state_to_char(queue->thread);
+		thread_status = atomic_read(&queue->idle) ? "idle" : "running";
+	}
+
+	uds_log_info("workQ %px (%s) %s (%c)",
+		     &queue->common,
+		     queue->common.name,
+		     thread_status,
+		     task_state_report);
+
+	/* ->waiting_worker_threads wait queue status? anyone waiting? */
+}
+
+/*
+ * Write to the buffer some info about the completion, for logging. Since the common use case is
+ * dumping info about a lot of completions to syslog all at once, the format favors brevity over
+ * readability.
+ */
+void vdo_dump_work_queue(struct vdo_work_queue *queue)
+{
+	if (queue->round_robin_mode) {
+		struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue);
+		unsigned int i;
+
+		for (i = 0; i < round_robin->num_service_queues; i++)
+			dump_simple_work_queue(round_robin->service_queues[i]);
+	} else {
+		dump_simple_work_queue(as_simple_work_queue(queue));
+	}
+}
+
+static void get_function_name(void *pointer, char *buffer, size_t buffer_length)
+{
+	if (pointer == NULL) {
+		/*
+		 * Format "%ps" logs a null pointer as "(null)" with a bunch of leading spaces. We
+		 * sometimes use this when logging lots of data; don't be so verbose.
+		 */
+		strncpy(buffer, "-", buffer_length);
+	} else {
+		/*
+		 * Use a pragma to defeat gcc's format checking, which doesn't understand that
+		 * "%ps" actually does support a precision spec in Linux kernel code.
+		 */
+		char *space;
+
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wformat"
+		snprintf(buffer, buffer_length, "%.*ps", buffer_length - 1, pointer);
+#pragma GCC diagnostic pop
+
+		space = strchr(buffer, ' ');
+		if (space != NULL)
+			*space = '\0';
+	}
+}
+
+void vdo_dump_completion_to_buffer(struct vdo_completion *completion, char *buffer, size_t length)
+{
+	size_t current_length =
+		scnprintf(buffer,
+			  length,
+			  "%.*s/",
+			  TASK_COMM_LEN,
+			  (completion->my_queue == NULL ? "-" : completion->my_queue->name));
+
+	if (current_length < length)
+		get_function_name((void *) completion->callback,
+				  buffer + current_length,
+				  length - current_length);
+}
+
+/* Completion submission */
+/*
+ * If the completion has a timeout that has already passed, the timeout handler function may be
+ * invoked by this function.
+ */
+void vdo_enqueue_work_queue(struct vdo_work_queue *queue, struct vdo_completion *completion)
+{
+	/*
+	 * Convert the provided generic vdo_work_queue to the simple_work_queue to actually queue
+	 * on.
+	 */
+	struct simple_work_queue *simple_queue = NULL;
+
+	if (!queue->round_robin_mode) {
+		simple_queue = as_simple_work_queue(queue);
+	} else {
+		struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue);
+
+		/*
+		 * It shouldn't be a big deal if the same rotor gets used for multiple work queues.
+		 * Any patterns that might develop are likely to be disrupted by random ordering of
+		 * multiple completions and migration between cores, unless the load is so light as
+		 * to be regular in ordering of tasks and the threads are confined to individual
+		 * cores; with a load that light we won't care.
+		 */
+		unsigned int rotor = this_cpu_inc_return(service_queue_rotor);
+		unsigned int index = rotor % round_robin->num_service_queues;
+
+		simple_queue = round_robin->service_queues[index];
+	}
+
+	enqueue_work_queue_completion(simple_queue, completion);
+}
+
+/* Misc */
+
+/*
+ * Return the work queue pointer recorded at initialization time in the work-queue stack handle
+ * initialized on the stack of the current thread, if any.
+ */
+static struct simple_work_queue *get_current_thread_work_queue(void)
+{
+	/*
+	 * In interrupt context, if a vdo thread is what got interrupted, the calls below will find
+	 * the queue for the thread which was interrupted. However, the interrupted thread may have
+	 * been processing a completion, in which case starting to process another would violate
+	 * our concurrency assumptions.
+	 */
+	if (in_interrupt())
+		return NULL;
+	if (kthread_func(current) != work_queue_runner)
+		/* Not a VDO work queue thread. */
+		return NULL;
+	return kthread_data(current);
+}
+
+struct vdo_work_queue *vdo_get_current_work_queue(void)
+{
+	struct simple_work_queue *queue = get_current_thread_work_queue();
+
+	return (queue == NULL) ? NULL : &queue->common;
+}
+
+struct vdo_thread *vdo_get_work_queue_owner(struct vdo_work_queue *queue)
+{
+	return queue->owner;
+}
+
+/**
+ * vdo_get_work_queue_private_data() - Returns the private data for the current thread's work
+ *                                     queue, or NULL if none or if the current thread is not a
+ *                                     work queue thread.
+ */
+void *vdo_get_work_queue_private_data(void)
+{
+	struct simple_work_queue *queue = get_current_thread_work_queue();
+
+	return (queue != NULL) ? queue->private : NULL;
+}
+
+bool vdo_work_queue_type_is(struct vdo_work_queue *queue, const struct vdo_work_queue_type *type)
+{
+	return (queue->type == type);
+}
diff --git a/drivers/md/dm-vdo/work-queue.h b/drivers/md/dm-vdo/work-queue.h
new file mode 100644
index 00000000000..d1e05f8901d
--- /dev/null
+++ b/drivers/md/dm-vdo/work-queue.h
@@ -0,0 +1,53 @@
+/* SPDX-License-Identifier: GPL-2.0-only */
+/*
+ * Copyright Red Hat
+ */
+
+#ifndef VDO_WORK_QUEUE_H
+#define VDO_WORK_QUEUE_H
+
+#include <linux/sched.h> /* for TASK_COMM_LEN */
+
+#include "types.h"
+
+enum {
+	MAX_VDO_WORK_QUEUE_NAME_LEN = TASK_COMM_LEN,
+};
+
+struct vdo_work_queue_type {
+	void (*start)(void *context);
+	void (*finish)(void *context);
+	enum vdo_completion_priority max_priority;
+	enum vdo_completion_priority default_priority;
+};
+
+struct vdo_completion;
+struct vdo_thread;
+struct vdo_work_queue;
+
+int vdo_make_work_queue(const char *thread_name_prefix,
+			const char *name,
+			struct vdo_thread *owner,
+			const struct vdo_work_queue_type *type,
+			unsigned int thread_count,
+			void *thread_privates[],
+			struct vdo_work_queue **queue_ptr);
+
+void vdo_enqueue_work_queue(struct vdo_work_queue *queue, struct vdo_completion *completion);
+
+void vdo_finish_work_queue(struct vdo_work_queue *queue);
+
+void vdo_free_work_queue(struct vdo_work_queue *queue);
+
+void vdo_dump_work_queue(struct vdo_work_queue *queue);
+
+void vdo_dump_completion_to_buffer(struct vdo_completion *completion, char *buffer, size_t length);
+
+void *vdo_get_work_queue_private_data(void);
+struct vdo_work_queue *vdo_get_current_work_queue(void);
+struct vdo_thread *vdo_get_work_queue_owner(struct vdo_work_queue *queue);
+
+bool __must_check
+vdo_work_queue_type_is(struct vdo_work_queue *queue, const struct vdo_work_queue_type *type);
+
+#endif /* VDO_WORK_QUEUE_H */
-- 
2.40.1


  parent reply	other threads:[~2023-05-23 21:46 UTC|newest]

Thread overview: 121+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-05-23 21:45 [dm-devel] [PATCH v2 00/39] Add the dm-vdo deduplication and compression device mapper target J. corwin Coburn
2023-05-23 21:45 ` J. corwin Coburn
2023-05-23 21:45 ` [dm-devel] [PATCH v2 01/39] Add documentation for dm-vdo J. corwin Coburn
2023-05-23 21:45   ` J. corwin Coburn
2023-05-24 22:36   ` kernel test robot
2023-05-24 22:36     ` [dm-devel] " kernel test robot
2023-05-23 21:45 ` [dm-devel] [PATCH v2 02/39] Add the MurmurHash3 fast hashing algorithm J. corwin Coburn
2023-05-23 21:45   ` J. corwin Coburn
2023-05-23 22:06   ` [dm-devel] " Eric Biggers
2023-05-23 22:06     ` Eric Biggers
2023-05-23 22:13     ` corwin
2023-05-23 22:13       ` corwin
2023-05-23 22:25       ` Eric Biggers
2023-05-23 22:25         ` Eric Biggers
2023-05-23 23:06         ` Eric Biggers
2023-05-23 23:06           ` Eric Biggers
2023-05-24  4:15           ` corwin
2023-05-24  4:15             ` corwin
2023-05-23 21:45 ` [dm-devel] [PATCH v2 03/39] Add memory allocation utilities J. corwin Coburn
2023-05-23 21:45   ` J. corwin Coburn
2023-05-23 22:14   ` [dm-devel] " Eric Biggers
2023-05-23 22:14     ` Eric Biggers
2023-05-23 21:45 ` [dm-devel] [PATCH v2 04/39] Add basic logging and support utilities J. corwin Coburn
2023-05-23 21:45   ` J. corwin Coburn
2023-05-23 21:45 ` [dm-devel] [PATCH v2 05/39] Add vdo type declarations, constants, and simple data structures J. corwin Coburn
2023-05-23 21:45   ` J. corwin Coburn
2023-05-23 21:45 ` [dm-devel] [PATCH v2 06/39] Add thread and synchronization utilities J. corwin Coburn
2023-05-23 21:45   ` J. corwin Coburn
2023-05-24  5:15   ` kernel test robot
2023-05-24  5:15     ` [dm-devel] " kernel test robot
2023-05-23 21:45 ` J. corwin Coburn [this message]
2023-05-23 21:45   ` [PATCH v2 07/39] Add specialized request queueing functionality J. corwin Coburn
2023-05-23 21:45 ` [PATCH v2 08/39] Add basic data structures J. corwin Coburn
2023-05-23 21:45   ` [dm-devel] " J. corwin Coburn
2023-05-23 21:45 ` [dm-devel] [PATCH v2 09/39] Add deduplication configuration structures J. corwin Coburn
2023-05-23 21:45   ` J. corwin Coburn
2023-05-23 21:45 ` [dm-devel] [PATCH v2 10/39] Add deduplication index storage interface J. corwin Coburn
2023-05-23 21:45   ` J. corwin Coburn
2023-05-23 21:45 ` [dm-devel] [PATCH v2 11/39] Implement the delta index J. corwin Coburn
2023-05-23 21:45   ` J. corwin Coburn
2023-05-23 21:45 ` [dm-devel] [PATCH v2 12/39] Implement the volume index J. corwin Coburn
2023-05-23 21:45   ` J. corwin Coburn
2023-05-23 21:45 ` [dm-devel] [PATCH v2 13/39] Implement the open chapter and chapter indexes J. corwin Coburn
2023-05-23 21:45   ` J. corwin Coburn
2023-05-23 21:45 ` [PATCH v2 14/39] Implement the chapter volume store J. corwin Coburn
2023-05-23 21:45   ` [dm-devel] " J. corwin Coburn
2023-05-23 21:45 ` [PATCH v2 15/39] Implement top-level deduplication index J. corwin Coburn
2023-05-23 21:45   ` [dm-devel] " J. corwin Coburn
2023-05-23 21:45 ` [dm-devel] [PATCH v2 16/39] Implement external deduplication index interface J. corwin Coburn
2023-05-23 21:45   ` J. corwin Coburn
2023-05-23 21:45 ` [PATCH v2 17/39] Add administrative state and scheduling for vdo J. corwin Coburn
2023-05-23 21:45   ` [dm-devel] " J. corwin Coburn
2023-05-23 21:45 ` [dm-devel] [PATCH v2 18/39] Add vio, the request object for vdo metadata J. corwin Coburn
2023-05-23 21:45   ` J. corwin Coburn
2023-05-23 21:45 ` [PATCH v2 19/39] Add data_vio, the request object which services incoming bios J. corwin Coburn
2023-05-23 21:45   ` [dm-devel] " J. corwin Coburn
2023-05-23 21:45 ` [PATCH v2 20/39] Add flush support to vdo J. corwin Coburn
2023-05-23 21:45   ` [dm-devel] " J. corwin Coburn
2023-05-23 21:45 ` [PATCH v2 21/39] Add the vdo io_submitter J. corwin Coburn
2023-05-23 21:45   ` [dm-devel] " J. corwin Coburn
2023-05-23 21:45 ` [PATCH v2 22/39] Add hash locks and hash zones J. corwin Coburn
2023-05-23 21:45   ` [dm-devel] " J. corwin Coburn
2023-05-23 21:45 ` [PATCH v2 23/39] Add use of the deduplication index in " J. corwin Coburn
2023-05-23 21:45   ` [dm-devel] " J. corwin Coburn
2023-05-23 21:45 ` [PATCH v2 24/39] Add the compressed block bin packer J. corwin Coburn
2023-05-23 21:45   ` [dm-devel] " J. corwin Coburn
2023-05-23 21:45 ` [dm-devel] [PATCH v2 25/39] Add vdo_slab J. corwin Coburn
2023-05-23 21:45   ` J. corwin Coburn
2023-05-23 21:45 ` [PATCH v2 26/39] Add the slab summary J. corwin Coburn
2023-05-23 21:45   ` [dm-devel] " J. corwin Coburn
2023-05-23 21:45 ` [PATCH v2 27/39] Add the block allocators and physical zones J. corwin Coburn
2023-05-23 21:45   ` [dm-devel] " J. corwin Coburn
2023-05-23 21:45 ` [dm-devel] [PATCH v2 28/39] Add the slab depot itself J. corwin Coburn
2023-05-23 21:45   ` J. corwin Coburn
2023-05-23 21:45 ` [dm-devel] [PATCH v2 29/39] Add the vdo block map J. corwin Coburn
2023-05-23 21:45   ` J. corwin Coburn
2023-05-23 21:45 ` [dm-devel] [PATCH v2 30/39] Implement the vdo block map page cache J. corwin Coburn
2023-05-23 21:45   ` J. corwin Coburn
2023-05-23 21:45 ` [dm-devel] [PATCH v2 31/39] Add the vdo recovery journal J. corwin Coburn
2023-05-23 21:45   ` J. corwin Coburn
2023-05-23 21:45 ` [dm-devel] [PATCH v2 32/39] Add repair (crash recovery and read-only rebuild) of damaged vdos J. corwin Coburn
2023-05-23 21:45   ` J. corwin Coburn
2023-05-23 21:45 ` [PATCH v2 33/39] Add the vdo structure itself J. corwin Coburn
2023-05-23 21:45   ` [dm-devel] " J. corwin Coburn
2023-05-23 21:45 ` [dm-devel] [PATCH v2 34/39] Add the on-disk formats and marshalling of vdo structures J. corwin Coburn
2023-05-23 21:45   ` J. corwin Coburn
2023-05-23 21:45 ` [dm-devel] [PATCH v2 35/39] Add statistics tracking J. corwin Coburn
2023-05-23 21:45   ` J. corwin Coburn
2023-05-23 21:45 ` [PATCH v2 36/39] Add sysfs support for setting vdo parameters and fetching statistics J. corwin Coburn
2023-05-23 21:45   ` [dm-devel] " J. corwin Coburn
2023-05-23 21:45 ` [dm-devel] [PATCH v2 37/39] Add vdo debugging support J. corwin Coburn
2023-05-23 21:45   ` J. corwin Coburn
2023-05-23 21:45 ` [dm-devel] [PATCH v2 38/39] Add dm-vdo-target.c J. corwin Coburn
2023-05-23 21:45   ` J. corwin Coburn
2023-05-23 21:45 ` [dm-devel] [PATCH v2 39/39] Enable configuration and building of dm-vdo J. corwin Coburn
2023-05-23 21:45   ` J. corwin Coburn
2023-05-23 22:40 ` [dm-devel] [PATCH v2 00/39] Add the dm-vdo deduplication and compression device mapper target Eric Biggers
2023-05-23 22:40   ` Eric Biggers
2023-05-30 23:03   ` [vdo-devel] " Matthew Sakai
2023-05-30 23:03     ` [dm-devel] [vdo-devel] " Matthew Sakai
2023-07-18 15:51 ` Mike Snitzer
2023-07-18 15:51   ` [dm-devel] " Mike Snitzer
2023-07-22  1:59   ` [dm-devel] [vdo-devel] " Kenneth Raeburn
2023-07-23  6:24     ` Sweet Tea Dorminy
2023-07-23  6:24       ` [dm-devel] " Sweet Tea Dorminy
2023-07-26 23:33       ` Ken Raeburn
2023-07-26 23:33         ` [dm-devel] " Ken Raeburn
2023-07-27 15:29         ` Sweet Tea Dorminy
2023-07-27 15:29           ` Sweet Tea Dorminy
2023-07-26 23:32     ` Ken Raeburn
2023-07-26 23:32       ` [dm-devel] " Ken Raeburn
2023-07-27 14:57       ` [dm-devel] " Mike Snitzer
2023-07-27 14:57         ` Mike Snitzer
2023-07-28  8:28         ` [dm-devel] " Ken Raeburn
2023-07-28  8:28           ` Ken Raeburn
2023-07-28 14:49           ` Mike Snitzer
2023-07-28 14:49             ` [dm-devel] " Mike Snitzer
2023-07-24 18:03   ` [vdo-devel] " Ken Raeburn
2023-07-24 18:03     ` [dm-devel] " Ken Raeburn
2023-08-09 23:40 ` Matthew Sakai
2023-08-09 23:40   ` [dm-devel] " Matthew Sakai

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20230523214539.226387-8-corwin@redhat.com \
    --to=corwin@redhat.com \
    --cc=dm-devel@redhat.com \
    --cc=linux-block@vger.kernel.org \
    --cc=vdo-devel@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.