All of lore.kernel.org
 help / color / mirror / Atom feed
From: Mike Snitzer <snitzer@kernel.org>
To: dm-devel@redhat.com
Cc: Matthew Sakai <msakai@redhat.com>
Subject: [dm-devel] [PATCH v3 22/39] dm vdo: add hash locks and hash zones
Date: Thu, 14 Sep 2023 15:16:18 -0400	[thread overview]
Message-ID: <20230914191635.39805-23-snitzer@kernel.org> (raw)
In-Reply-To: <20230914191635.39805-1-snitzer@kernel.org>

From: Matthew Sakai <msakai@redhat.com>

In order to deduplicate concurrent writes of the same data (to
different locations), data_vios which are writing the same data are
grouped together in a "hash lock," named for and keyed by the hash of
the data being written. Each hash lock is assigned to a hash zone
based on a portion of its hash.

Co-developed-by: J. corwin Coburn <corwin@hurlbutnet.net>
Signed-off-by: J. corwin Coburn <corwin@hurlbutnet.net>
Co-developed-by: Michael Sclafani <vdo-devel@redhat.com>
Signed-off-by: Michael Sclafani <vdo-devel@redhat.com>
Co-developed-by: Sweet Tea Dorminy <sweettea-kernel@dorminy.me>
Signed-off-by: Sweet Tea Dorminy <sweettea-kernel@dorminy.me>
Signed-off-by: Matthew Sakai <msakai@redhat.com>
Signed-off-by: Mike Snitzer <snitzer@kernel.org>
---
 drivers/md/dm-vdo/dedupe.c | 2450 ++++++++++++++++++++++++++++++++++++
 drivers/md/dm-vdo/dedupe.h |   93 ++
 2 files changed, 2543 insertions(+)
 create mode 100644 drivers/md/dm-vdo/dedupe.c
 create mode 100644 drivers/md/dm-vdo/dedupe.h

diff --git a/drivers/md/dm-vdo/dedupe.c b/drivers/md/dm-vdo/dedupe.c
new file mode 100644
index 000000000000..1d6774e8e546
--- /dev/null
+++ b/drivers/md/dm-vdo/dedupe.c
@@ -0,0 +1,2450 @@
+// SPDX-License-Identifier: GPL-2.0-only
+/*
+ * Copyright 2023 Red Hat
+ */
+
+/**
+ * DOC:
+ *
+ * Hash Locks:
+ *
+ * A hash_lock controls and coordinates writing, index access, and dedupe among groups of data_vios
+ * concurrently writing identical blocks, allowing them to deduplicate not only against advice but
+ * also against each other. This saves on index queries and allows those data_vios to concurrently
+ * deduplicate against a single block instead of being serialized through a PBN read lock. Only one
+ * index query is needed for each hash_lock, instead of one for every data_vio.
+ *
+ * A hash_lock acts like a state machine perhaps more than as a lock. Other than the starting and
+ * ending states INITIALIZING and BYPASSING, every state represents and is held for the duration of
+ * an asynchronous operation. All state transitions are performed on the thread of the hash_zone
+ * containing the lock. An asynchronous operation is almost always performed upon entering a state,
+ * and the callback from that operation triggers exiting the state and entering a new state.
+ *
+ * In all states except DEDUPING, there is a single data_vio, called the lock agent, performing the
+ * asynchronous operations on behalf of the lock. The agent will change during the lifetime of the
+ * lock if the lock is shared by more than one data_vio. data_vios waiting to deduplicate are kept
+ * on a wait queue. Viewed a different way, the agent holds the lock exclusively until the lock
+ * enters the DEDUPING state, at which point it becomes a shared lock that all the waiters (and any
+ * new data_vios that arrive) use to share a PBN lock. In state DEDUPING, there is no agent. When
+ * the last data_vio in the lock calls back in DEDUPING, it becomes the agent and the lock becomes
+ * exclusive again. New data_vios that arrive in the lock will also go on the wait queue.
+ *
+ * The existence of lock waiters is a key factor controlling which state the lock transitions to
+ * next. When the lock is new or has waiters, it will always try to reach DEDUPING, and when it
+ * doesn't, it will try to clean up and exit.
+ *
+ * Deduping requires holding a PBN lock on a block that is known to contain data identical to the
+ * data_vios in the lock, so the lock will send the agent to the duplicate zone to acquire the PBN
+ * lock (LOCKING), to the kernel I/O threads to read and verify the data (VERIFYING), or to write a
+ * new copy of the data to a full data block or a slot in a compressed block (WRITING).
+ *
+ * Cleaning up consists of updating the index when the data location is different from the initial
+ * index query (UPDATING, triggered by stale advice, compression, and rollover), releasing the PBN
+ * lock on the duplicate block (UNLOCKING), and if the agent is the last data_vio referencing the
+ * lock, releasing the hash_lock itself back to the hash zone (BYPASSING).
+ *
+ * The shortest sequence of states is for non-concurrent writes of new data:
+ *   INITIALIZING -> QUERYING -> WRITING -> BYPASSING
+ * This sequence is short because no PBN read lock or index update is needed.
+ *
+ * Non-concurrent, finding valid advice looks like this (endpoints elided):
+ *   -> QUERYING -> LOCKING -> VERIFYING -> DEDUPING -> UNLOCKING ->
+ * Or with stale advice (endpoints elided):
+ *   -> QUERYING -> LOCKING -> VERIFYING -> UNLOCKING -> WRITING -> UPDATING ->
+ *
+ * When there are not enough available reference count increments available on a PBN for a data_vio
+ * to deduplicate, a new lock is forked and the excess waiters roll over to the new lock (which
+ * goes directly to WRITING). The new lock takes the place of the old lock in the lock map so new
+ * data_vios will be directed to it. The two locks will proceed independently, but only the new
+ * lock will have the right to update the index (unless it also forks).
+ *
+ * Since rollover happens in a lock instance, once a valid data location has been selected, it will
+ * not change. QUERYING and WRITING are only performed once per lock lifetime. All other
+ * non-endpoint states can be re-entered.
+ *
+ * The function names in this module follow a convention referencing the states and transitions in
+ * the state machine. For example, for the LOCKING state, there are start_locking() and
+ * finish_locking() functions.  start_locking() is invoked by the finish function of the state (or
+ * states) that transition to LOCKING. It performs the actual lock state change and must be invoked
+ * on the hash zone thread.  finish_locking() is called by (or continued via callback from) the
+ * code actually obtaining the lock. It does any bookkeeping or decision-making required and
+ * invokes the appropriate start function of the state being transitioned to after LOCKING.
+ *
+ * ----------------------------------------------------------------------
+ *
+ * Index Queries:
+ *
+ * A query to the UDS index is handled asynchronously by the index's threads. When the query is
+ * complete, a callback supplied with the query will be called from one of the those threads. Under
+ * heavy system load, the index may be slower to respond then is desirable for reasonable I/O
+ * throughput. Since deduplication of writes is not necessary for correct operation of a VDO
+ * device, it is acceptable to timeout out slow index queries and proceed to fulfill a write
+ * request without deduplicating. However, because the uds_request struct itself is supplied by the
+ * caller, we can not simply reuse a uds_request object which we have chosen to timeout. Hence,
+ * each hash_zone maintains a pool of dedupe_contexts which each contain a uds_request along with a
+ * reference to the data_vio on behalf of which they are performing a query.
+ *
+ * When a hash_lock needs to query the index, it attempts to acquire an unused dedupe_context from
+ * its hash_zone's pool. If one is available, that context is prepared, associated with the
+ * hash_lock's agent, added to the list of pending contexts, and then sent to the index. The
+ * context's state will be transitioned from DEDUPE_CONTEXT_IDLE to DEDUPE_CONTEXT_PENDING. If all
+ * goes well, the dedupe callback will be called by the index which will change the context's state
+ * to DEDUPE_CONTEXT_COMPLETE, and the associated data_vio will be enqueued to run back in the hash
+ * zone where the query results will be processed and the context will be put back in the idle
+ * state and returned to the hash_zone's available list.
+ *
+ * The first time an index query is launched from a given hash_zone, a timer is started. When the
+ * timer fires, the hash_zone's completion is enqueued to run in the hash_zone where the zone's
+ * pending list will be searched for any contexts in the pending state which have been running for
+ * too long. Those contexts are transitioned to the DEDUPE_CONTEXT_TIMED_OUT state and moved to the
+ * zone's timed_out list where they won't be examined again if there is a subsequent time out). The
+ * data_vios associated with timed out contexts are sent to continue processing their write
+ * operation without deduplicating. The timer is also restarted.
+ *
+ * When the dedupe callback is run for a context which is in the timed out state, that context is
+ * moved to the DEDUPE_CONTEXT_TIMED_OUT_COMPLETE state. No other action need be taken as the
+ * associated data_vios have already been dispatched.
+ *
+ * If a hash_lock needs a dedupe context, and the available list is empty, the timed_out list will
+ * be searched for any contexts which are timed out and complete. One of these will be used
+ * immediately, and the rest will be returned to the available list and marked idle.
+ */
+
+#include "dedupe.h"
+
+#include <linux/atomic.h>
+#include <linux/jiffies.h>
+#include <linux/kernel.h>
+#include <linux/kobject.h>
+#include <linux/list.h>
+#include <linux/ratelimit.h>
+#include <linux/spinlock.h>
+#include <linux/timer.h>
+
+#include "logger.h"
+#include "memory-alloc.h"
+#include "numeric.h"
+#include "permassert.h"
+#include "string-utils.h"
+#include "uds.h"
+
+#include "action-manager.h"
+#include "admin-state.h"
+#include "completion.h"
+#include "constants.h"
+#include "data-vio.h"
+#include "io-submitter.h"
+#include "packer.h"
+#include "physical-zone.h"
+#include "pointer-map.h"
+#include "slab-depot.h"
+#include "statistics.h"
+#include "types.h"
+#include "vdo.h"
+#include "wait-queue.h"
+
+enum hash_lock_state {
+	/* State for locks that are not in use or are being initialized. */
+	VDO_HASH_LOCK_INITIALIZING,
+
+	/* This is the sequence of states typically used on the non-dedupe path. */
+	VDO_HASH_LOCK_QUERYING,
+	VDO_HASH_LOCK_WRITING,
+	VDO_HASH_LOCK_UPDATING,
+
+	/* The remaining states are typically used on the dedupe path in this order. */
+	VDO_HASH_LOCK_LOCKING,
+	VDO_HASH_LOCK_VERIFYING,
+	VDO_HASH_LOCK_DEDUPING,
+	VDO_HASH_LOCK_UNLOCKING,
+
+	/*
+	 * Terminal state for locks returning to the pool. Must be last both because it's the final
+	 * state, and also because it's used to count the states.
+	 */
+	VDO_HASH_LOCK_BYPASSING,
+};
+
+static const char * const LOCK_STATE_NAMES[] = {
+	[VDO_HASH_LOCK_BYPASSING] = "BYPASSING",
+	[VDO_HASH_LOCK_DEDUPING] = "DEDUPING",
+	[VDO_HASH_LOCK_INITIALIZING] = "INITIALIZING",
+	[VDO_HASH_LOCK_LOCKING] = "LOCKING",
+	[VDO_HASH_LOCK_QUERYING] = "QUERYING",
+	[VDO_HASH_LOCK_UNLOCKING] = "UNLOCKING",
+	[VDO_HASH_LOCK_UPDATING] = "UPDATING",
+	[VDO_HASH_LOCK_VERIFYING] = "VERIFYING",
+	[VDO_HASH_LOCK_WRITING] = "WRITING",
+};
+
+struct hash_lock {
+	/* The block hash covered by this lock */
+	struct uds_record_name hash;
+
+	/* When the lock is unused, this list entry allows the lock to be pooled */
+	struct list_head pool_node;
+
+	/*
+	 * A list containing the data VIOs sharing this lock, all having the same record name and
+	 * data block contents, linked by their hash_lock_node fields.
+	 */
+	struct list_head duplicate_ring;
+
+	/* The number of data_vios sharing this lock instance */
+	data_vio_count_t reference_count;
+
+	/* The maximum value of reference_count in the lifetime of this lock */
+	data_vio_count_t max_references;
+
+	/* The current state of this lock */
+	enum hash_lock_state state;
+
+	/* True if the UDS index should be updated with new advice */
+	bool update_advice;
+
+	/* True if the advice has been verified to be a true duplicate */
+	bool verified;
+
+	/* True if the lock has already accounted for an initial verification */
+	bool verify_counted;
+
+	/* True if this lock is registered in the lock map (cleared on rollover) */
+	bool registered;
+
+	/*
+	 * If verified is false, this is the location of a possible duplicate. If verified is true,
+	 * it is the verified location of a true duplicate.
+	 */
+	struct zoned_pbn duplicate;
+
+	/* The PBN lock on the block containing the duplicate data */
+	struct pbn_lock *duplicate_lock;
+
+	/* The data_vio designated to act on behalf of the lock */
+	struct data_vio *agent;
+
+	/*
+	 * Other data_vios with data identical to the agent who are currently waiting for the agent
+	 * to get the information they all need to deduplicate--either against each other, or
+	 * against an existing duplicate on disk.
+	 */
+	struct wait_queue waiters;
+};
+
+enum {
+	LOCK_POOL_CAPACITY = MAXIMUM_VDO_USER_VIOS,
+};
+
+struct hash_zones {
+	struct action_manager *manager;
+	struct kobject dedupe_directory;
+	struct uds_parameters parameters;
+	struct uds_index_session *index_session;
+	struct ratelimit_state ratelimiter;
+	atomic64_t timeouts;
+	atomic64_t dedupe_context_busy;
+
+	/* This spinlock protects the state fields and the starting of dedupe requests. */
+	spinlock_t lock;
+
+	/* The fields in the next block are all protected by the lock */
+	struct vdo_completion completion;
+	enum index_state index_state;
+	enum index_state index_target;
+	struct admin_state state;
+	bool changing;
+	bool create_flag;
+	bool dedupe_flag;
+	bool error_flag;
+	u64 reported_timeouts;
+
+	/* The number of zones */
+	zone_count_t zone_count;
+	/* The hash zones themselves */
+	struct hash_zone zones[];
+};
+
+static inline struct hash_zone *as_hash_zone(struct vdo_completion *completion)
+{
+	vdo_assert_completion_type(completion, VDO_HASH_ZONE_COMPLETION);
+	return container_of(completion, struct hash_zone, completion);
+}
+
+static inline struct hash_zones *as_hash_zones(struct vdo_completion *completion)
+{
+	vdo_assert_completion_type(completion, VDO_HASH_ZONES_COMPLETION);
+	return container_of(completion, struct hash_zones, completion);
+}
+
+static inline void assert_in_hash_zone(struct hash_zone *zone, const char *name)
+{
+	ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == zone->thread_id),
+			"%s called on hash zone thread",
+			name);
+}
+
+/**
+ * return_hash_lock_to_pool() - (Re)initialize a hash lock and return it to its pool.
+ * @zone: The zone from which the lock was borrowed.
+ * @lock: The lock that is no longer in use.
+ */
+static void return_hash_lock_to_pool(struct hash_zone *zone, struct hash_lock *lock)
+{
+	memset(lock, 0, sizeof(*lock));
+	INIT_LIST_HEAD(&lock->pool_node);
+	INIT_LIST_HEAD(&lock->duplicate_ring);
+	vdo_initialize_wait_queue(&lock->waiters);
+	list_add_tail(&lock->pool_node, &zone->lock_pool);
+}
+
+/**
+ * vdo_get_duplicate_lock() - Get the PBN lock on the duplicate data location for a data_vio from
+ *                            the hash_lock the data_vio holds (if there is one).
+ * @data_vio: The data_vio to query.
+ *
+ * Return: The PBN lock on the data_vio's duplicate location.
+ */
+struct pbn_lock *vdo_get_duplicate_lock(struct data_vio *data_vio)
+{
+	if (data_vio->hash_lock == NULL)
+		return NULL;
+	return data_vio->hash_lock->duplicate_lock;
+}
+
+/**
+ * get_hash_lock_state_name() - Get the string representation of a hash lock state.
+ * @state: The hash lock state.
+ *
+ * Return: The short string representing the state
+ */
+static const char *get_hash_lock_state_name(enum hash_lock_state state)
+{
+	/* Catch if a state has been added without updating the name array. */
+	STATIC_ASSERT((VDO_HASH_LOCK_BYPASSING + 1) == ARRAY_SIZE(LOCK_STATE_NAMES));
+	return (state < ARRAY_SIZE(LOCK_STATE_NAMES)) ? LOCK_STATE_NAMES[state] : "INVALID";
+}
+
+/**
+ * assert_hash_lock_agent() - Assert that a data_vio is the agent of its hash lock, and that this
+ *                            is being called in the hash zone.
+ * @data_vio: The data_vio expected to be the lock agent.
+ * @where: A string describing the function making the assertion.
+ */
+static void assert_hash_lock_agent(struct data_vio *data_vio, const char *where)
+{
+	/* Not safe to access the agent field except from the hash zone. */
+	assert_data_vio_in_hash_zone(data_vio);
+	ASSERT_LOG_ONLY(data_vio == data_vio->hash_lock->agent,
+			"%s must be for the hash lock agent", where);
+}
+
+/**
+ * set_duplicate_lock() - Set the duplicate lock held by a hash lock. May only be called in the
+ *                        physical zone of the PBN lock.
+ * @hash_lock: The hash lock to update.
+ * @pbn_lock: The PBN read lock to use as the duplicate lock.
+ */
+static void set_duplicate_lock(struct hash_lock *hash_lock, struct pbn_lock *pbn_lock)
+{
+	ASSERT_LOG_ONLY((hash_lock->duplicate_lock == NULL),
+			"hash lock must not already hold a duplicate lock");
+
+	pbn_lock->holder_count += 1;
+	hash_lock->duplicate_lock = pbn_lock;
+}
+
+/**
+ * dequeue_lock_waiter() - Remove the first data_vio from the lock's wait queue and return it.
+ * @lock: The lock containing the wait queue.
+ *
+ * Return: The first (oldest) waiter in the queue, or NULL if the queue is empty.
+ */
+static inline struct data_vio *dequeue_lock_waiter(struct hash_lock *lock)
+{
+	return waiter_as_data_vio(vdo_dequeue_next_waiter(&lock->waiters));
+}
+
+/**
+ * set_hash_lock() - Set, change, or clear the hash lock a data_vio is using.
+ * @data_vio: The data_vio to update.
+ * @new_lock: The hash lock the data_vio is joining.
+ *
+ * Updates the hash lock (or locks) to reflect the change in membership.
+ */
+static void set_hash_lock(struct data_vio *data_vio, struct hash_lock *new_lock)
+{
+	struct hash_lock *old_lock = data_vio->hash_lock;
+
+	if (old_lock != NULL) {
+		ASSERT_LOG_ONLY(data_vio->hash_zone != NULL,
+				"must have a hash zone when holding a hash lock");
+		ASSERT_LOG_ONLY(!list_empty(&data_vio->hash_lock_entry),
+				"must be on a hash lock ring when holding a hash lock");
+		ASSERT_LOG_ONLY(old_lock->reference_count > 0,
+				"hash lock reference must be counted");
+
+		if ((old_lock->state != VDO_HASH_LOCK_BYPASSING) &&
+		    (old_lock->state != VDO_HASH_LOCK_UNLOCKING))
+			/*
+			 * If the reference count goes to zero in a non-terminal state, we're most
+			 * likely leaking this lock.
+			 */
+			ASSERT_LOG_ONLY(old_lock->reference_count > 1,
+					"hash locks should only become unreferenced in a terminal state, not state %s",
+					get_hash_lock_state_name(old_lock->state));
+
+		list_del_init(&data_vio->hash_lock_entry);
+		old_lock->reference_count -= 1;
+
+		data_vio->hash_lock = NULL;
+	}
+
+	if (new_lock != NULL) {
+		/*
+		 * Keep all data_vios sharing the lock on a ring since they can complete in any
+		 * order and we'll always need a pointer to one to compare data.
+		 */
+		list_move_tail(&data_vio->hash_lock_entry, &new_lock->duplicate_ring);
+		new_lock->reference_count += 1;
+		if (new_lock->max_references < new_lock->reference_count)
+			new_lock->max_references = new_lock->reference_count;
+
+		data_vio->hash_lock = new_lock;
+	}
+}
+
+/* There are loops in the state diagram, so some forward decl's are needed. */
+static void start_deduping(struct hash_lock *lock, struct data_vio *agent, bool agent_is_done);
+static void start_locking(struct hash_lock *lock, struct data_vio *agent);
+static void start_writing(struct hash_lock *lock, struct data_vio *agent);
+static void unlock_duplicate_pbn(struct vdo_completion *completion);
+static void transfer_allocation_lock(struct data_vio *data_vio);
+
+/**
+ * exit_hash_lock() - Bottleneck for data_vios that have written or deduplicated and that are no
+ *                    longer needed to be an agent for the hash lock.
+ * @data_vio: The data_vio to complete and send to be cleaned up.
+ */
+static void exit_hash_lock(struct data_vio *data_vio)
+{
+	/* Release the hash lock now, saving a thread transition in cleanup. */
+	vdo_release_hash_lock(data_vio);
+
+	/* Complete the data_vio and start the clean-up path to release any locks it still holds. */
+	data_vio->vio.completion.callback = complete_data_vio;
+
+	continue_data_vio(data_vio);
+}
+
+/**
+ * set_duplicate_location() - Set the location of the duplicate block for data_vio, updating the
+ *                            is_duplicate and duplicate fields from a zoned_pbn.
+ * @data_vio: The data_vio to modify.
+ * @source: The location of the duplicate.
+ */
+static void set_duplicate_location(struct data_vio *data_vio, const struct zoned_pbn source)
+{
+	data_vio->is_duplicate = (source.pbn != VDO_ZERO_BLOCK);
+	data_vio->duplicate = source;
+}
+
+/**
+ * retire_lock_agent() - Retire the active lock agent, replacing it with the first lock waiter, and
+ *                       make the retired agent exit the hash lock.
+ * @lock: The hash lock to update.
+ *
+ * Return: The new lock agent (which will be NULL if there was no waiter)
+ */
+static struct data_vio *retire_lock_agent(struct hash_lock *lock)
+{
+	struct data_vio *old_agent = lock->agent;
+	struct data_vio *new_agent = dequeue_lock_waiter(lock);
+
+	lock->agent = new_agent;
+	exit_hash_lock(old_agent);
+	if (new_agent != NULL)
+		set_duplicate_location(new_agent, lock->duplicate);
+	return new_agent;
+}
+
+/**
+ * wait_on_hash_lock() - Add a data_vio to the lock's queue of waiters.
+ * @lock: The hash lock on which to wait.
+ * @data_vio: The data_vio to add to the queue.
+ */
+static void wait_on_hash_lock(struct hash_lock *lock, struct data_vio *data_vio)
+{
+	vdo_enqueue_waiter(&lock->waiters, &data_vio->waiter);
+
+	/*
+	 * Make sure the agent doesn't block indefinitely in the packer since it now has at least
+	 * one other data_vio waiting on it.
+	 */
+	if ((lock->state != VDO_HASH_LOCK_WRITING) || !cancel_data_vio_compression(lock->agent))
+		return;
+
+	/*
+	 * Even though we're waiting, we also have to send ourselves as a one-way message to the
+	 * packer to ensure the agent continues executing. This is safe because
+	 * cancel_vio_compression() guarantees the agent won't continue executing until this
+	 * message arrives in the packer, and because the wait queue link isn't used for sending
+	 * the message.
+	 */
+	data_vio->compression.lock_holder = lock->agent;
+	launch_data_vio_packer_callback(data_vio, vdo_remove_lock_holder_from_packer);
+}
+
+/**
+ * abort_waiter() - waiter_callback function that shunts waiters to write their blocks without
+ *                  optimization.
+ * @waiter: The data_vio's waiter link.
+ * @context: Not used.
+ */
+static void abort_waiter(struct waiter *waiter, void *context __always_unused)
+{
+	write_data_vio(waiter_as_data_vio(waiter));
+}
+
+/**
+ * start_bypassing() - Stop using the hash lock.
+ * @lock: The hash lock.
+ * @agent: The data_vio acting as the agent for the lock.
+ *
+ * Stops using the hash lock. This is the final transition for hash locks which did not get an
+ * error.
+ */
+static void start_bypassing(struct hash_lock *lock, struct data_vio *agent)
+{
+	lock->state = VDO_HASH_LOCK_BYPASSING;
+	exit_hash_lock(agent);
+}
+
+void vdo_clean_failed_hash_lock(struct data_vio *data_vio)
+{
+	struct hash_lock *lock = data_vio->hash_lock;
+
+	if (lock->state == VDO_HASH_LOCK_BYPASSING) {
+		exit_hash_lock(data_vio);
+		return;
+	}
+
+	if (lock->agent == NULL) {
+		lock->agent = data_vio;
+	} else if (data_vio != lock->agent) {
+		exit_hash_lock(data_vio);
+		return;
+	}
+
+	lock->state = VDO_HASH_LOCK_BYPASSING;
+
+	/* Ensure we don't attempt to update advice when cleaning up. */
+	lock->update_advice = false;
+
+	vdo_notify_all_waiters(&lock->waiters, abort_waiter, NULL);
+
+	if (lock->duplicate_lock != NULL) {
+		/* The agent must reference the duplicate zone to launch it. */
+		data_vio->duplicate = lock->duplicate;
+		launch_data_vio_duplicate_zone_callback(data_vio, unlock_duplicate_pbn);
+		return;
+	}
+
+	lock->agent = NULL;
+	data_vio->is_duplicate = false;
+	exit_hash_lock(data_vio);
+}
+
+/**
+ * finish_unlocking() - Handle the result of the agent for the lock releasing a read lock on
+ *                      duplicate candidate.
+ * @completion: The completion of the data_vio acting as the lock's agent.
+ *
+ * This continuation is registered in unlock_duplicate_pbn().
+ */
+static void finish_unlocking(struct vdo_completion *completion)
+{
+	struct data_vio *agent = as_data_vio(completion);
+	struct hash_lock *lock = agent->hash_lock;
+
+	assert_hash_lock_agent(agent, __func__);
+
+	ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
+			"must have released the duplicate lock for the hash lock");
+
+	if (!lock->verified) {
+		/*
+		 * UNLOCKING -> WRITING transition: The lock we released was on an unverified
+		 * block, so it must have been a lock on advice we were verifying, not on a
+		 * location that was used for deduplication. Go write (or compress) the block to
+		 * get a location to dedupe against.
+		 */
+		start_writing(lock, agent);
+		return;
+	}
+
+	/*
+	 * With the lock released, the verified duplicate block may already have changed and will
+	 * need to be re-verified if a waiter arrived.
+	 */
+	lock->verified = false;
+
+	if (vdo_has_waiters(&lock->waiters)) {
+		/*
+		 * UNLOCKING -> LOCKING transition: A new data_vio entered the hash lock while the
+		 * agent was releasing the PBN lock. The current agent exits and the waiter has to
+		 * re-lock and re-verify the duplicate location.
+		 *
+		 * TODO: If we used the current agent to re-acquire the PBN lock we wouldn't need
+		 * to re-verify.
+		 */
+		agent = retire_lock_agent(lock);
+		start_locking(lock, agent);
+		return;
+	}
+
+	/*
+	 * UNLOCKING -> BYPASSING transition: The agent is done with the lock and no other
+	 * data_vios reference it, so remove it from the lock map and return it to the pool.
+	 */
+	start_bypassing(lock, agent);
+}
+
+/**
+ * unlock_duplicate_pbn() - Release a read lock on the PBN of the block that may or may not have
+ *                          contained duplicate data.
+ * @completion: The completion of the data_vio acting as the lock's agent.
+ *
+ * This continuation is launched by start_unlocking(), and calls back to finish_unlocking() on the
+ * hash zone thread.
+ */
+static void unlock_duplicate_pbn(struct vdo_completion *completion)
+{
+	struct data_vio *agent = as_data_vio(completion);
+	struct hash_lock *lock = agent->hash_lock;
+
+	assert_data_vio_in_duplicate_zone(agent);
+	ASSERT_LOG_ONLY(lock->duplicate_lock != NULL, "must have a duplicate lock to release");
+
+	vdo_release_physical_zone_pbn_lock(agent->duplicate.zone,
+					   agent->duplicate.pbn,
+					   UDS_FORGET(lock->duplicate_lock));
+	if (lock->state == VDO_HASH_LOCK_BYPASSING) {
+		complete_data_vio(completion);
+		return;
+	}
+
+	launch_data_vio_hash_zone_callback(agent, finish_unlocking);
+}
+
+/**
+ * start_unlocking() - Release a read lock on the PBN of the block that may or may not have
+ *                     contained duplicate data.
+ * @lock: The hash lock.
+ * @agent: The data_vio currently acting as the agent for the lock.
+ */
+static void start_unlocking(struct hash_lock *lock, struct data_vio *agent)
+{
+	lock->state = VDO_HASH_LOCK_UNLOCKING;
+	launch_data_vio_duplicate_zone_callback(agent, unlock_duplicate_pbn);
+}
+
+static void release_context(struct dedupe_context *context)
+{
+	struct hash_zone *zone = context->zone;
+
+	WRITE_ONCE(zone->active, zone->active - 1);
+	list_move(&context->list_entry, &zone->available);
+}
+
+static void process_update_result(struct data_vio *agent)
+{
+	struct dedupe_context *context = agent->dedupe_context;
+
+	if ((context == NULL) ||
+	    !change_context_state(context, DEDUPE_CONTEXT_COMPLETE, DEDUPE_CONTEXT_IDLE))
+		return;
+
+	release_context(context);
+}
+
+/**
+ * finish_updating() - Process the result of a UDS update performed by the agent for the lock.
+ * @completion: The completion of the data_vio that performed the update
+ *
+ * This continuation is registered in start_querying().
+ */
+static void finish_updating(struct vdo_completion *completion)
+{
+	struct data_vio *agent = as_data_vio(completion);
+	struct hash_lock *lock = agent->hash_lock;
+
+	assert_hash_lock_agent(agent, __func__);
+
+	process_update_result(agent);
+
+	/*
+	 * UDS was updated successfully, so don't update again unless the duplicate location
+	 * changes due to rollover.
+	 */
+	lock->update_advice = false;
+
+	if (vdo_has_waiters(&lock->waiters)) {
+		/*
+		 * UPDATING -> DEDUPING transition: A new data_vio arrived during the UDS update.
+		 * Send it on the verified dedupe path. The agent is done with the lock, but the
+		 * lock may still need to use it to clean up after rollover.
+		 */
+		start_deduping(lock, agent, true);
+		return;
+	}
+
+	if (lock->duplicate_lock != NULL) {
+		/*
+		 * UPDATING -> UNLOCKING transition: No one is waiting to dedupe, but we hold a
+		 * duplicate PBN lock, so go release it.
+		 */
+		start_unlocking(lock, agent);
+		return;
+	}
+
+	/*
+	 * UPDATING -> BYPASSING transition: No one is waiting to dedupe and there's no lock to
+	 * release.
+	 */
+	start_bypassing(lock, agent);
+}
+
+static void query_index(struct data_vio *data_vio, enum uds_request_type operation);
+
+/**
+ * start_updating() - Continue deduplication with the last step, updating UDS with the location of
+ *                    the duplicate that should be returned as advice in the future.
+ * @lock: The hash lock.
+ * @agent: The data_vio currently acting as the agent for the lock.
+ */
+static void start_updating(struct hash_lock *lock, struct data_vio *agent)
+{
+	lock->state = VDO_HASH_LOCK_UPDATING;
+
+	ASSERT_LOG_ONLY(lock->verified, "new advice should have been verified");
+	ASSERT_LOG_ONLY(lock->update_advice, "should only update advice if needed");
+
+	agent->last_async_operation = VIO_ASYNC_OP_UPDATE_DEDUPE_INDEX;
+	set_data_vio_hash_zone_callback(agent, finish_updating);
+	query_index(agent, UDS_UPDATE);
+}
+
+/**
+ * finish_deduping() - Handle a data_vio that has finished deduplicating against the block locked
+ *                     by the hash lock.
+ * @lock: The hash lock.
+ * @data_vio: The lock holder that has finished deduplicating.
+ *
+ * If there are other data_vios still sharing the lock, this will just release the data_vio's share
+ * of the lock and finish processing the data_vio. If this is the last data_vio holding the lock,
+ * this makes the data_vio the lock agent and uses it to advance the state of the lock so it can
+ * eventually be released.
+ */
+static void finish_deduping(struct hash_lock *lock, struct data_vio *data_vio)
+{
+	struct data_vio *agent = data_vio;
+
+	ASSERT_LOG_ONLY(lock->agent == NULL, "shouldn't have an agent in DEDUPING");
+	ASSERT_LOG_ONLY(!vdo_has_waiters(&lock->waiters),
+			"shouldn't have any lock waiters in DEDUPING");
+
+	/* Just release the lock reference if other data_vios are still deduping. */
+	if (lock->reference_count > 1) {
+		exit_hash_lock(data_vio);
+		return;
+	}
+
+	/* The hash lock must have an agent for all other lock states. */
+	lock->agent = agent;
+	if (lock->update_advice)
+		/*
+		 * DEDUPING -> UPDATING transition: The location of the duplicate block changed
+		 * since the initial UDS query because of compression, rollover, or because the
+		 * query agent didn't have an allocation. The UDS update was delayed in case there
+		 * was another change in location, but with only this data_vio using the hash lock,
+		 * it's time to update the advice.
+		 */
+		start_updating(lock, agent);
+	else
+		/*
+		 * DEDUPING -> UNLOCKING transition: Release the PBN read lock on the duplicate
+		 * location so the hash lock itself can be released (contingent on no new data_vios
+		 * arriving in the lock before the agent returns).
+		 */
+		start_unlocking(lock, agent);
+}
+
+/**
+ * acquire_lock() - Get the lock for a record name.
+ * @zone: The zone responsible for the hash.
+ * @hash: The hash to lock.
+ * @replace_lock: If non-NULL, the lock already registered for the hash which should be replaced by
+ *                the new lock.
+ * @lock_ptr: A pointer to receive the hash lock.
+ *
+ * Gets the lock for the hash (record name) of the data in a data_vio, or if one does not exist (or
+ * if we are explicitly rolling over), initialize a new lock for the hash and register it in the
+ * zone. This must only be called in the correct thread for the zone.
+ *
+ * Return: VDO_SUCCESS or an error code.
+ */
+static int __must_check acquire_lock(struct hash_zone *zone,
+				     const struct uds_record_name *hash,
+				     struct hash_lock *replace_lock,
+				     struct hash_lock **lock_ptr)
+{
+	struct hash_lock *lock, *new_lock;
+	int result;
+
+	/*
+	 * Borrow and prepare a lock from the pool so we don't have to do two pointer_map accesses
+	 * in the common case of no lock contention.
+	 */
+	result = ASSERT(!list_empty(&zone->lock_pool), "never need to wait for a free hash lock");
+	if (result != VDO_SUCCESS)
+		return result;
+
+	new_lock = list_entry(zone->lock_pool.prev, struct hash_lock, pool_node);
+	list_del_init(&new_lock->pool_node);
+
+	/*
+	 * Fill in the hash of the new lock so we can map it, since we have to use the hash as the
+	 * map key.
+	 */
+	new_lock->hash = *hash;
+
+	result = vdo_pointer_map_put(zone->hash_lock_map,
+				     &new_lock->hash,
+				     new_lock,
+				     (replace_lock != NULL),
+				     (void **) &lock);
+	if (result != VDO_SUCCESS) {
+		return_hash_lock_to_pool(zone, UDS_FORGET(new_lock));
+		return result;
+	}
+
+	if (replace_lock != NULL) {
+		/* On mismatch put the old lock back and return a severe error */
+		ASSERT_LOG_ONLY(lock == replace_lock, "old lock must have been in the lock map");
+		/* TODO: Check earlier and bail out? */
+		ASSERT_LOG_ONLY(replace_lock->registered,
+				"old lock must have been marked registered");
+		replace_lock->registered = false;
+	}
+
+	if (lock == replace_lock) {
+		lock = new_lock;
+		lock->registered = true;
+	} else {
+		/* There's already a lock for the hash, so we don't need the borrowed lock. */
+		return_hash_lock_to_pool(zone, UDS_FORGET(new_lock));
+	}
+
+	*lock_ptr = lock;
+	return VDO_SUCCESS;
+}
+
+/**
+ * enter_forked_lock() - Bind the data_vio to a new hash lock.
+ *
+ * Implements waiter_callback. Binds the data_vio that was waiting to a new hash lock and waits on
+ * that lock.
+ */
+static void enter_forked_lock(struct waiter *waiter, void *context)
+{
+	struct data_vio *data_vio = waiter_as_data_vio(waiter);
+	struct hash_lock *new_lock = (struct hash_lock *) context;
+
+	set_hash_lock(data_vio, new_lock);
+	wait_on_hash_lock(new_lock, data_vio);
+}
+
+/**
+ * fork_hash_lock() - Fork a hash lock because it has run out of increments on the duplicate PBN.
+ * @old_lock: The hash lock to fork.
+ * @new_agent: The data_vio that will be the agent for the new lock.
+ *
+ * Transfers the new agent and any lock waiters to a new hash lock instance which takes the place
+ * of the old lock in the lock map. The old lock remains active, but will not update advice.
+ */
+static void fork_hash_lock(struct hash_lock *old_lock, struct data_vio *new_agent)
+{
+	struct hash_lock *new_lock;
+	int result;
+
+	result = acquire_lock(new_agent->hash_zone, &new_agent->record_name, old_lock, &new_lock);
+	if (result != VDO_SUCCESS) {
+		continue_data_vio_with_error(new_agent, result);
+		return;
+	}
+
+	/*
+	 * Only one of the two locks should update UDS. The old lock is out of references, so it
+	 * would be poor dedupe advice in the short term.
+	 */
+	old_lock->update_advice = false;
+	new_lock->update_advice = true;
+
+	set_hash_lock(new_agent, new_lock);
+	new_lock->agent = new_agent;
+
+	vdo_notify_all_waiters(&old_lock->waiters, enter_forked_lock, new_lock);
+
+	new_agent->is_duplicate = false;
+	start_writing(new_lock, new_agent);
+}
+
+/**
+ * launch_dedupe() - Reserve a reference count increment for a data_vio and launch it on the dedupe
+ *                   path.
+ * @lock: The hash lock.
+ * @data_vio: The data_vio to deduplicate using the hash lock.
+ * @has_claim: true if the data_vio already has claimed an increment from the duplicate lock.
+ *
+ * If no increments are available, this will roll over to a new hash lock and launch the data_vio
+ * as the writing agent for that lock.
+ */
+static void launch_dedupe(struct hash_lock *lock, struct data_vio *data_vio, bool has_claim)
+{
+	if (!has_claim && !vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
+		/* Out of increments, so must roll over to a new lock. */
+		fork_hash_lock(lock, data_vio);
+		return;
+	}
+
+	/* Deduplicate against the lock's verified location. */
+	set_duplicate_location(data_vio, lock->duplicate);
+	data_vio->new_mapped = data_vio->duplicate;
+	update_metadata_for_data_vio_write(data_vio, lock->duplicate_lock);
+}
+
+/**
+ * start_deduping() - Enter the hash lock state where data_vios deduplicate in parallel against a
+ *                    true copy of their data on disk.
+ * @lock: The hash lock.
+ * @agent: The data_vio acting as the agent for the lock.
+ * @agent_is_done: true only if the agent has already written or deduplicated against its data.
+ *
+ * If the agent itself needs to deduplicate, an increment for it must already have been claimed
+ * from the duplicate lock, ensuring the hash lock will still have a data_vio holding it.
+ */
+static void start_deduping(struct hash_lock *lock, struct data_vio *agent, bool agent_is_done)
+{
+	lock->state = VDO_HASH_LOCK_DEDUPING;
+
+	/*
+	 * We don't take the downgraded allocation lock from the agent unless we actually need to
+	 * deduplicate against it.
+	 */
+	if (lock->duplicate_lock == NULL) {
+		ASSERT_LOG_ONLY(!vdo_is_state_compressed(agent->new_mapped.state),
+				"compression must have shared a lock");
+		ASSERT_LOG_ONLY(agent_is_done, "agent must have written the new duplicate");
+		transfer_allocation_lock(agent);
+	}
+
+	ASSERT_LOG_ONLY(vdo_is_pbn_read_lock(lock->duplicate_lock),
+			"duplicate_lock must be a PBN read lock");
+
+	/*
+	 * This state is not like any of the other states. There is no designated agent--the agent
+	 * transitioning to this state and all the waiters will be launched to deduplicate in
+	 * parallel.
+	 */
+	lock->agent = NULL;
+
+	/*
+	 * Launch the agent (if not already deduplicated) and as many lock waiters as we have
+	 * available increments for on the dedupe path. If we run out of increments, rollover will
+	 * be triggered and the remaining waiters will be transferred to the new lock.
+	 */
+	if (!agent_is_done) {
+		launch_dedupe(lock, agent, true);
+		agent = NULL;
+	}
+	while (vdo_has_waiters(&lock->waiters))
+		launch_dedupe(lock, dequeue_lock_waiter(lock), false);
+
+	if (agent_is_done)
+		/*
+		 * In the degenerate case where all the waiters rolled over to a new lock, this
+		 * will continue to use the old agent to clean up this lock, and otherwise it just
+		 * lets the agent exit the lock.
+		 */
+		finish_deduping(lock, agent);
+}
+
+/**
+ * increment_stat() - Increment a statistic counter in a non-atomic yet thread-safe manner.
+ * @stat: The statistic field to increment.
+ */
+static void increment_stat(u64 *stat)
+{
+	/*
+	 * Must only be mutated on the hash zone thread. Prevents any compiler shenanigans from
+	 * affecting other threads reading stats.
+	 */
+	WRITE_ONCE(*stat, *stat + 1);
+}
+
+/**
+ * finish_verifying() - Handle the result of the agent for the lock comparing its data to the
+ *                      duplicate candidate.
+ * @completion: The completion of the data_vio used to verify dedupe
+ *
+ * This continuation is registered in start_verifying().
+ */
+static void finish_verifying(struct vdo_completion *completion)
+{
+	struct data_vio *agent = as_data_vio(completion);
+	struct hash_lock *lock = agent->hash_lock;
+
+	assert_hash_lock_agent(agent, __func__);
+
+	lock->verified = agent->is_duplicate;
+
+	/*
+	 * Only count the result of the initial verification of the advice as valid or stale, and
+	 * not any re-verifications due to PBN lock releases.
+	 */
+	if (!lock->verify_counted) {
+		lock->verify_counted = true;
+		if (lock->verified)
+			increment_stat(&agent->hash_zone->statistics.dedupe_advice_valid);
+		else
+			increment_stat(&agent->hash_zone->statistics.dedupe_advice_stale);
+	}
+
+	/*
+	 * Even if the block is a verified duplicate, we can't start to deduplicate unless we can
+	 * claim a reference count increment for the agent.
+	 */
+	if (lock->verified && !vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
+		agent->is_duplicate = false;
+		lock->verified = false;
+	}
+
+	if (lock->verified) {
+		/*
+		 * VERIFYING -> DEDUPING transition: The advice is for a true duplicate, so start
+		 * deduplicating against it, if references are available.
+		 */
+		start_deduping(lock, agent, false);
+	} else {
+		/*
+		 * VERIFYING -> UNLOCKING transition: Either the verify failed or we'd try to
+		 * dedupe and roll over immediately, which would fail because it would leave the
+		 * lock without an agent to release the PBN lock. In both cases, the data will have
+		 * to be written or compressed, but first the advice PBN must be unlocked by the
+		 * VERIFYING agent.
+		 */
+		lock->update_advice = true;
+		start_unlocking(lock, agent);
+	}
+}
+
+static bool blocks_equal(char *block1, char *block2)
+{
+	int i;
+
+
+	for (i = 0; i < VDO_BLOCK_SIZE; i += sizeof(u64))
+		if (*((u64 *) &block1[i]) != *((u64 *) &block2[i]))
+			return false;
+
+	return true;
+}
+
+static void verify_callback(struct vdo_completion *completion)
+{
+	struct data_vio *agent = as_data_vio(completion);
+
+	agent->is_duplicate = blocks_equal(agent->vio.data, agent->scratch_block);
+	launch_data_vio_hash_zone_callback(agent, finish_verifying);
+}
+
+static void uncompress_and_verify(struct vdo_completion *completion)
+{
+	struct data_vio *agent = as_data_vio(completion);
+	int result;
+
+	result = uncompress_data_vio(agent, agent->duplicate.state, agent->scratch_block);
+	if (result == VDO_SUCCESS) {
+		verify_callback(completion);
+		return;
+	}
+
+	agent->is_duplicate = false;
+	launch_data_vio_hash_zone_callback(agent, finish_verifying);
+}
+
+static void verify_endio(struct bio *bio)
+{
+	struct data_vio *agent = vio_as_data_vio(bio->bi_private);
+	int result = blk_status_to_errno(bio->bi_status);
+
+	vdo_count_completed_bios(bio);
+	if (result != VDO_SUCCESS) {
+		agent->is_duplicate = false;
+		launch_data_vio_hash_zone_callback(agent, finish_verifying);
+		return;
+	}
+
+	if (vdo_is_state_compressed(agent->duplicate.state)) {
+		launch_data_vio_cpu_callback(agent,
+					     uncompress_and_verify,
+					     CPU_Q_COMPRESS_BLOCK_PRIORITY);
+		return;
+	}
+
+	launch_data_vio_cpu_callback(agent, verify_callback, CPU_Q_COMPLETE_READ_PRIORITY);
+}
+
+/**
+ * start_verifying() - Begin the data verification phase.
+ * @lock: The hash lock (must be LOCKING).
+ * @agent: The data_vio to use to read and compare candidate data.
+ *
+ * Continue the deduplication path for a hash lock by using the agent to read (and possibly
+ * decompress) the data at the candidate duplicate location, comparing it to the data in the agent
+ * to verify that the candidate is identical to all the data_vios sharing the hash. If so, it can
+ * be deduplicated against, otherwise a data_vio allocation will have to be written to and used for
+ * dedupe.
+ */
+static void start_verifying(struct hash_lock *lock, struct data_vio *agent)
+{
+	int result;
+	struct vio *vio = &agent->vio;
+	char *buffer = (vdo_is_state_compressed(agent->duplicate.state) ?
+			(char *) agent->compression.block :
+			agent->scratch_block);
+
+	lock->state = VDO_HASH_LOCK_VERIFYING;
+	ASSERT_LOG_ONLY(!lock->verified, "hash lock only verifies advice once");
+
+	agent->last_async_operation = VIO_ASYNC_OP_VERIFY_DUPLICATION;
+	result = vio_reset_bio(vio, buffer, verify_endio, REQ_OP_READ, agent->duplicate.pbn);
+	if (result != VDO_SUCCESS) {
+		set_data_vio_hash_zone_callback(agent, finish_verifying);
+		continue_data_vio_with_error(agent, result);
+		return;
+	}
+
+	set_data_vio_bio_zone_callback(agent, process_vio_io);
+	vdo_launch_completion_with_priority(&vio->completion, BIO_Q_VERIFY_PRIORITY);
+}
+
+/**
+ * finish_locking() - Handle the result of the agent for the lock attempting to obtain a PBN read
+ *                    lock on the candidate duplicate block.
+ * @completion: The completion of the data_vio that attempted to get the read lock.
+ *
+ * This continuation is registered in lock_duplicate_pbn().
+ */
+static void finish_locking(struct vdo_completion *completion)
+{
+	struct data_vio *agent = as_data_vio(completion);
+	struct hash_lock *lock = agent->hash_lock;
+
+	assert_hash_lock_agent(agent, __func__);
+
+	if (!agent->is_duplicate) {
+		ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
+				"must not hold duplicate_lock if not flagged as a duplicate");
+		/*
+		 * LOCKING -> WRITING transition: The advice block is being modified or has no
+		 * available references, so try to write or compress the data, remembering to
+		 * update UDS later with the new advice.
+		 */
+		increment_stat(&agent->hash_zone->statistics.dedupe_advice_stale);
+		lock->update_advice = true;
+		start_writing(lock, agent);
+		return;
+	}
+
+	ASSERT_LOG_ONLY(lock->duplicate_lock != NULL,
+			"must hold duplicate_lock if flagged as a duplicate");
+
+	if (!lock->verified) {
+		/*
+		 * LOCKING -> VERIFYING transition: Continue on the unverified dedupe path, reading
+		 * the candidate duplicate and comparing it to the agent's data to decide whether
+		 * it is a true duplicate or stale advice.
+		 */
+		start_verifying(lock, agent);
+		return;
+	}
+
+	if (!vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
+		/*
+		 * LOCKING -> UNLOCKING transition: The verified block was re-locked, but has no
+		 * available increments left. Must first release the useless PBN read lock before
+		 * rolling over to a new copy of the block.
+		 */
+		agent->is_duplicate = false;
+		lock->verified = false;
+		lock->update_advice = true;
+		start_unlocking(lock, agent);
+		return;
+	}
+
+	/*
+	 * LOCKING -> DEDUPING transition: Continue on the verified dedupe path, deduplicating
+	 * against a location that was previously verified or written to.
+	 */
+	start_deduping(lock, agent, false);
+}
+
+static bool acquire_provisional_reference(struct data_vio *agent,
+					  struct pbn_lock *lock,
+					  struct slab_depot *depot)
+{
+	/* Ensure that the newly-locked block is referenced. */
+	struct vdo_slab *slab = vdo_get_slab(depot, agent->duplicate.pbn);
+	int result = vdo_acquire_provisional_reference(slab, agent->duplicate.pbn, lock);
+
+	if (result == VDO_SUCCESS)
+		return true;
+
+	uds_log_warning_strerror(result,
+				 "Error acquiring provisional reference for dedupe candidate; aborting dedupe");
+	agent->is_duplicate = false;
+	vdo_release_physical_zone_pbn_lock(agent->duplicate.zone, agent->duplicate.pbn, lock);
+	continue_data_vio_with_error(agent, result);
+	return false;
+}
+
+/**
+ * lock_duplicate_pbn() - Acquire a read lock on the PBN of the block containing candidate
+ *                        duplicate data (compressed or uncompressed).
+ * @completion: The completion of the data_vio attempting to acquire the physical block lock on
+ *              behalf of its hash lock.
+ *
+ * If the PBN is already locked for writing, the lock attempt is abandoned and is_duplicate will be
+ * cleared before calling back. this continuation is launched from start_locking(), and calls back
+ * to finish_locking() on the hash zone thread.
+ */
+static void lock_duplicate_pbn(struct vdo_completion *completion)
+{
+	unsigned int increment_limit;
+	struct pbn_lock *lock;
+	int result;
+
+	struct data_vio *agent = as_data_vio(completion);
+	struct slab_depot *depot = vdo_from_data_vio(agent)->depot;
+	struct physical_zone *zone = agent->duplicate.zone;
+
+	assert_data_vio_in_duplicate_zone(agent);
+
+	set_data_vio_hash_zone_callback(agent, finish_locking);
+
+	/*
+	 * While in the zone that owns it, find out how many additional references can be made to
+	 * the block if it turns out to truly be a duplicate.
+	 */
+	increment_limit = vdo_get_increment_limit(depot, agent->duplicate.pbn);
+	if (increment_limit == 0) {
+		/*
+		 * We could deduplicate against it later if a reference happened to be released
+		 * during verification, but it's probably better to bail out now.
+		 */
+		agent->is_duplicate = false;
+		continue_data_vio(agent);
+		return;
+	}
+
+	result = vdo_attempt_physical_zone_pbn_lock(zone,
+						    agent->duplicate.pbn,
+						    VIO_READ_LOCK,
+						    &lock);
+	if (result != VDO_SUCCESS) {
+		continue_data_vio_with_error(agent, result);
+		return;
+	}
+
+	if (!vdo_is_pbn_read_lock(lock)) {
+		/*
+		 * There are three cases of write locks: uncompressed data block writes, compressed
+		 * (packed) block writes, and block map page writes. In all three cases, we give up
+		 * on trying to verify the advice and don't bother to try deduplicate against the
+		 * data in the write lock holder.
+		 *
+		 * 1) We don't ever want to try to deduplicate against a block map page.
+		 *
+		 * 2a) It's very unlikely we'd deduplicate against an entire packed block, both
+		 * because of the chance of matching it, and because we don't record advice for it,
+		 * but for the uncompressed representation of all the fragments it contains. The
+		 * only way we'd be getting lock contention is if we've written the same
+		 * representation coincidentally before, had it become unreferenced, and it just
+		 * happened to be packed together from compressed writes when we go to verify the
+		 * lucky advice. Giving up is a minuscule loss of potential dedupe.
+		 *
+		 * 2b) If the advice is for a slot of a compressed block, it's about to get
+		 * smashed, and the write smashing it cannot contain our data--it would have to be
+		 * writing on behalf of our hash lock, but that's impossible since we're the lock
+		 * agent.
+		 *
+		 * 3a) If the lock is held by a data_vio with different data, the advice is already
+		 * stale or is about to become stale.
+		 *
+		 * 3b) If the lock is held by a data_vio that matches us, we may as well either
+		 * write it ourselves (or reference the copy we already wrote) instead of
+		 * potentially having many duplicates wait for the lock holder to write, journal,
+		 * hash, and finally arrive in the hash lock. We lose a chance to avoid a UDS
+		 * update in the very rare case of advice for a free block that just happened to be
+		 * allocated to a data_vio with the same hash. There's also a chance to save on a
+		 * block write, at the cost of a block verify. Saving on a full block compare in
+		 * all stale advice cases almost certainly outweighs saving a UDS update and
+		 * trading a write for a read in a lucky case where advice would have been saved
+		 * from becoming stale.
+		 */
+		agent->is_duplicate = false;
+		continue_data_vio(agent);
+		return;
+	}
+
+	if (lock->holder_count == 0) {
+		if (!acquire_provisional_reference(agent, lock, depot))
+			return;
+
+		/*
+		 * The increment limit we grabbed earlier is still valid. The lock now holds the
+		 * rights to acquire all those references. Those rights will be claimed by hash
+		 * locks sharing this read lock.
+		 */
+		lock->increment_limit = increment_limit;
+	}
+
+	/*
+	 * We've successfully acquired a read lock on behalf of the hash lock, so mark it as such.
+	 */
+	set_duplicate_lock(agent->hash_lock, lock);
+
+	/*
+	 * TODO: Optimization: We could directly launch the block verify, then switch to a hash
+	 * thread.
+	 */
+	continue_data_vio(agent);
+}
+
+/**
+ * start_locking() - Continue deduplication for a hash lock that has obtained valid advice of a
+ *                   potential duplicate through its agent.
+ * @lock: The hash lock (currently must be QUERYING).
+ * @agent: The data_vio bearing the dedupe advice.
+ */
+static void start_locking(struct hash_lock *lock, struct data_vio *agent)
+{
+	ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
+			"must not acquire a duplicate lock when already holding it");
+
+	lock->state = VDO_HASH_LOCK_LOCKING;
+
+	/*
+	 * TODO: Optimization: If we arrange to continue on the duplicate zone thread when
+	 * accepting the advice, and don't explicitly change lock states (or use an agent-local
+	 * state, or an atomic), we can avoid a thread transition here.
+	 */
+	agent->last_async_operation = VIO_ASYNC_OP_LOCK_DUPLICATE_PBN;
+	launch_data_vio_duplicate_zone_callback(agent, lock_duplicate_pbn);
+}
+
+/**
+ * finish_writing() - Re-entry point for the lock agent after it has finished writing or
+ *                    compressing its copy of the data block.
+ * @lock: The hash lock, which must be in state WRITING.
+ * @agent: The data_vio that wrote its data for the lock.
+ *
+ * The agent will never need to dedupe against anything, so it's done with the lock, but the lock
+ * may not be finished with it, as a UDS update might still be needed.
+ *
+ * If there are other lock holders, the agent will hand the job to one of them and exit, leaving
+ * the lock to deduplicate against the just-written block. If there are no other lock holders, the
+ * agent either exits (and later tears down the hash lock), or it remains the agent and updates
+ * UDS.
+ */
+static void finish_writing(struct hash_lock *lock, struct data_vio *agent)
+{
+	/*
+	 * Dedupe against the data block or compressed block slot the agent wrote. Since we know
+	 * the write succeeded, there's no need to verify it.
+	 */
+	lock->duplicate = agent->new_mapped;
+	lock->verified = true;
+
+	if (vdo_is_state_compressed(lock->duplicate.state) &&
+	    lock->registered)
+		/*
+		 * Compression means the location we gave in the UDS query is not the location
+		 * we're using to deduplicate.
+		 */
+		lock->update_advice = true;
+
+	/* If there are any waiters, we need to start deduping them. */
+	if (vdo_has_waiters(&lock->waiters)) {
+		/*
+		 * WRITING -> DEDUPING transition: an asynchronously-written block failed to
+		 * compress, so the PBN lock on the written copy was already transferred. The agent
+		 * is done with the lock, but the lock may still need to use it to clean up after
+		 * rollover.
+		 */
+		start_deduping(lock, agent, true);
+		return;
+	}
+
+	/*
+	 * There are no waiters and the agent has successfully written, so take a step towards
+	 * being able to release the hash lock (or just release it).
+	 */
+	if (lock->update_advice) {
+		/*
+		 * WRITING -> UPDATING transition: There's no waiter and a UDS update is needed, so
+		 * retain the WRITING agent and use it to launch the update. The happens on
+		 * compression, rollover, or the QUERYING agent not having an allocation.
+		 */
+		start_updating(lock, agent);
+	} else if (lock->duplicate_lock != NULL) {
+		/*
+		 * WRITING -> UNLOCKING transition: There's no waiter and no update needed, but the
+		 * compressed write gave us a shared duplicate lock that we must release.
+		 */
+		set_duplicate_location(agent, lock->duplicate);
+		start_unlocking(lock, agent);
+	} else {
+		/*
+		 * WRITING -> BYPASSING transition: There's no waiter, no update needed, and no
+		 * duplicate lock held, so both the agent and lock have no more work to do. The
+		 * agent will release its allocation lock in cleanup.
+		 */
+		start_bypassing(lock, agent);
+	}
+}
+
+/**
+ * select_writing_agent() - Search through the lock waiters for a data_vio that has an allocation.
+ * @lock: The hash lock to modify.
+ *
+ * If an allocation is found, swap agents, put the old agent at the head of the wait queue, then
+ * return the new agent. Otherwise, just return the current agent.
+ */
+static struct data_vio *select_writing_agent(struct hash_lock *lock)
+{
+	struct wait_queue temp_queue;
+	struct data_vio *data_vio;
+
+	vdo_initialize_wait_queue(&temp_queue);
+
+	/*
+	 * Move waiters to the temp queue one-by-one until we find an allocation. Not ideal to
+	 * search, but it only happens when nearly out of space.
+	 */
+	while (((data_vio = dequeue_lock_waiter(lock)) != NULL) &&
+	       !data_vio_has_allocation(data_vio)) {
+		/* Use the lower-level enqueue since we're just moving waiters around. */
+		vdo_enqueue_waiter(&temp_queue, &data_vio->waiter);
+	}
+
+	if (data_vio != NULL) {
+		/*
+		 * Move the rest of the waiters over to the temp queue, preserving the order they
+		 * arrived at the lock.
+		 */
+		vdo_transfer_all_waiters(&lock->waiters, &temp_queue);
+
+		/*
+		 * The current agent is being replaced and will have to wait to dedupe; make it the
+		 * first waiter since it was the first to reach the lock.
+		 */
+		vdo_enqueue_waiter(&lock->waiters, &lock->agent->waiter);
+		lock->agent = data_vio;
+	} else {
+		/* No one has an allocation, so keep the current agent. */
+		data_vio = lock->agent;
+	}
+
+	/* Swap all the waiters back onto the lock's queue. */
+	vdo_transfer_all_waiters(&temp_queue, &lock->waiters);
+	return data_vio;
+}
+
+/**
+ * start_writing() - Begin the non-duplicate write path.
+ * @lock: The hash lock (currently must be QUERYING).
+ * @agent: The data_vio currently acting as the agent for the lock.
+ *
+ * Begins the non-duplicate write path for a hash lock that had no advice, selecting a data_vio
+ * with an allocation as a new agent, if necessary, then resuming the agent on the data_vio write
+ * path.
+ */
+static void start_writing(struct hash_lock *lock, struct data_vio *agent)
+{
+	lock->state = VDO_HASH_LOCK_WRITING;
+
+	/*
+	 * The agent might not have received an allocation and so can't be used for writing, but
+	 * it's entirely possible that one of the waiters did.
+	 */
+	if (!data_vio_has_allocation(agent)) {
+		agent = select_writing_agent(lock);
+		/* If none of the waiters had an allocation, the writes all have to fail. */
+		if (!data_vio_has_allocation(agent)) {
+			/*
+			 * TODO: Should we keep a variant of BYPASSING that causes new arrivals to
+			 * fail immediately if they don't have an allocation? It might be possible
+			 * that on some path there would be non-waiters still referencing the lock,
+			 * so it would remain in the map as everything is currently spelled, even
+			 * if the agent and all waiters release.
+			 */
+			continue_data_vio_with_error(agent, VDO_NO_SPACE);
+			return;
+		}
+	}
+
+	/*
+	 * If the agent compresses, it might wait indefinitely in the packer, which would be bad if
+	 * there are any other data_vios waiting.
+	 */
+	if (vdo_has_waiters(&lock->waiters))
+		cancel_data_vio_compression(agent);
+
+	/*
+	 * Send the agent to the compress/pack/write path in vioWrite. If it succeeds, it will
+	 * return to the hash lock via vdo_continue_hash_lock() and call finish_writing().
+	 */
+	launch_compress_data_vio(agent);
+}
+
+/*
+ * Decode VDO duplicate advice from the old_metadata field of a UDS request.
+ * Returns true if valid advice was found and decoded
+ */
+static bool decode_uds_advice(struct dedupe_context *context)
+{
+	const struct uds_request *request = &context->request;
+	struct data_vio *data_vio = context->requestor;
+	size_t offset = 0;
+	const struct uds_record_data *encoding = &request->old_metadata;
+	struct vdo *vdo = vdo_from_data_vio(data_vio);
+	struct zoned_pbn *advice = &data_vio->duplicate;
+	u8 version;
+	int result;
+
+	if ((request->status != UDS_SUCCESS) || !request->found)
+		return false;
+
+	version = encoding->data[offset++];
+	if (version != UDS_ADVICE_VERSION) {
+		uds_log_error("invalid UDS advice version code %u", version);
+		return false;
+	}
+
+	advice->state = encoding->data[offset++];
+	advice->pbn = get_unaligned_le64(&encoding->data[offset]);
+	offset += sizeof(u64);
+	BUG_ON(offset != UDS_ADVICE_SIZE);
+
+	/* Don't use advice that's clearly meaningless. */
+	if ((advice->state == VDO_MAPPING_STATE_UNMAPPED) || (advice->pbn == VDO_ZERO_BLOCK)) {
+		uds_log_debug("Invalid advice from deduplication server: pbn %llu, state %u. Giving up on deduplication of logical block %llu",
+			      (unsigned long long) advice->pbn,
+			      advice->state,
+			      (unsigned long long) data_vio->logical.lbn);
+		atomic64_inc(&vdo->stats.invalid_advice_pbn_count);
+		return false;
+	}
+
+	result = vdo_get_physical_zone(vdo, advice->pbn, &advice->zone);
+	if ((result != VDO_SUCCESS) || (advice->zone == NULL)) {
+		uds_log_debug("Invalid physical block number from deduplication server: %llu, giving up on deduplication of logical block %llu",
+			      (unsigned long long) advice->pbn,
+			      (unsigned long long) data_vio->logical.lbn);
+		atomic64_inc(&vdo->stats.invalid_advice_pbn_count);
+		return false;
+	}
+
+	return true;
+}
+
+static void process_query_result(struct data_vio *agent)
+{
+	struct dedupe_context *context = agent->dedupe_context;
+
+	if (context == NULL)
+		return;
+
+	if (change_context_state(context, DEDUPE_CONTEXT_COMPLETE, DEDUPE_CONTEXT_IDLE)) {
+		agent->is_duplicate = decode_uds_advice(context);
+		release_context(context);
+	}
+}
+
+/**
+ * finish_querying() - Process the result of a UDS query performed by the agent for the lock.
+ * @completion: The completion of the data_vio that performed the query.
+ *
+ * This continuation is registered in start_querying().
+ */
+static void finish_querying(struct vdo_completion *completion)
+{
+	struct data_vio *agent = as_data_vio(completion);
+	struct hash_lock *lock = agent->hash_lock;
+
+	assert_hash_lock_agent(agent, __func__);
+
+	process_query_result(agent);
+
+	if (agent->is_duplicate) {
+		lock->duplicate = agent->duplicate;
+		/*
+		 * QUERYING -> LOCKING transition: Valid advice was obtained from UDS. Use the
+		 * QUERYING agent to start the hash lock on the unverified dedupe path, verifying
+		 * that the advice can be used.
+		 */
+		start_locking(lock, agent);
+	} else {
+		/*
+		 * The agent will be used as the duplicate if has an allocation; if it does, that
+		 * location was posted to UDS, so no update will be needed.
+		 */
+		lock->update_advice = !data_vio_has_allocation(agent);
+		/*
+		 * QUERYING -> WRITING transition: There was no advice or the advice wasn't valid,
+		 * so try to write or compress the data.
+		 */
+		start_writing(lock, agent);
+	}
+}
+
+/**
+ * start_querying() - Start deduplication for a hash lock.
+ * @lock: The initialized hash lock.
+ * @data_vio: The data_vio that has just obtained the new lock.
+ *
+ * Starts deduplication for a hash lock that has finished initializing by making the data_vio that
+ * requested it the agent, entering the QUERYING state, and using the agent to perform the UDS
+ * query on behalf of the lock.
+ */
+static void start_querying(struct hash_lock *lock, struct data_vio *data_vio)
+{
+	lock->agent = data_vio;
+	lock->state = VDO_HASH_LOCK_QUERYING;
+	data_vio->last_async_operation = VIO_ASYNC_OP_CHECK_FOR_DUPLICATION;
+	set_data_vio_hash_zone_callback(data_vio, finish_querying);
+	query_index(data_vio, (data_vio_has_allocation(data_vio) ? UDS_POST : UDS_QUERY));
+}
+
+/**
+ * report_bogus_lock_state() - Complain that a data_vio has entered a hash_lock that is in an
+ *                             unimplemented or unusable state and continue the data_vio with an
+ *                             error.
+ * @lock: The hash lock.
+ * @data_vio: The data_vio attempting to enter the lock.
+ */
+static void report_bogus_lock_state(struct hash_lock *lock, struct data_vio *data_vio)
+{
+	ASSERT_LOG_ONLY(false,
+			"hash lock must not be in unimplemented state %s",
+			get_hash_lock_state_name(lock->state));
+	continue_data_vio_with_error(data_vio, VDO_LOCK_ERROR);
+}
+
+/**
+ * vdo_continue_hash_lock() - Continue the processing state after writing, compressing, or
+ *                            deduplicating.
+ * @data_vio: The data_vio to continue processing in its hash lock.
+ *
+ * Asynchronously continue processing a data_vio in its hash lock after it has finished writing,
+ * compressing, or deduplicating, so it can share the result with any data_vios waiting in the hash
+ * lock, or update the UDS index, or simply release its share of the lock.
+ *
+ * Context: This must only be called in the correct thread for the hash zone.
+ */
+void vdo_continue_hash_lock(struct vdo_completion *completion)
+{
+	struct data_vio *data_vio = as_data_vio(completion);
+	struct hash_lock *lock = data_vio->hash_lock;
+
+	switch (lock->state) {
+	case VDO_HASH_LOCK_WRITING:
+		ASSERT_LOG_ONLY(data_vio == lock->agent,
+				"only the lock agent may continue the lock");
+		finish_writing(lock, data_vio);
+		break;
+
+	case VDO_HASH_LOCK_DEDUPING:
+		finish_deduping(lock, data_vio);
+		break;
+
+	case VDO_HASH_LOCK_BYPASSING:
+		/* This data_vio has finished the write path and the lock doesn't need it. */
+		exit_hash_lock(data_vio);
+		break;
+
+	case VDO_HASH_LOCK_INITIALIZING:
+	case VDO_HASH_LOCK_QUERYING:
+	case VDO_HASH_LOCK_UPDATING:
+	case VDO_HASH_LOCK_LOCKING:
+	case VDO_HASH_LOCK_VERIFYING:
+	case VDO_HASH_LOCK_UNLOCKING:
+		/* A lock in this state should never be re-entered. */
+		report_bogus_lock_state(lock, data_vio);
+		break;
+
+	default:
+		report_bogus_lock_state(lock, data_vio);
+	}
+}
+
+/**
+ * is_hash_collision() - Check to see if a hash collision has occurred.
+ * @lock: The lock to check.
+ * @candidate: The data_vio seeking to share the lock.
+ *
+ * Check whether the data in data_vios sharing a lock is different than in a data_vio seeking to
+ * share the lock, which should only be possible in the extremely unlikely case of a hash
+ * collision.
+ *
+ * Return: true if the given data_vio must not share the lock because it doesn't have the same data
+ *         as the lock holders.
+ */
+static bool is_hash_collision(struct hash_lock *lock, struct data_vio *candidate)
+{
+	struct data_vio *lock_holder;
+	struct hash_zone *zone;
+	bool collides;
+
+	if (list_empty(&lock->duplicate_ring))
+		return false;
+
+	lock_holder = list_first_entry(&lock->duplicate_ring, struct data_vio, hash_lock_entry);
+	zone = candidate->hash_zone;
+	collides = !blocks_equal(lock_holder->vio.data, candidate->vio.data);
+	if (collides)
+		increment_stat(&zone->statistics.concurrent_hash_collisions);
+	else
+		increment_stat(&zone->statistics.concurrent_data_matches);
+
+	return collides;
+}
+
+static inline int assert_hash_lock_preconditions(const struct data_vio *data_vio)
+{
+	int result;
+
+	/* FIXME: BUG_ON() and/or enter read-only mode? */
+	result = ASSERT(data_vio->hash_lock == NULL, "must not already hold a hash lock");
+	if (result != VDO_SUCCESS)
+		return result;
+
+	result = ASSERT(list_empty(&data_vio->hash_lock_entry),
+			"must not already be a member of a hash lock ring");
+	if (result != VDO_SUCCESS)
+		return result;
+
+	return ASSERT(data_vio->recovery_sequence_number == 0,
+		      "must not hold a recovery lock when getting a hash lock");
+}
+
+/**
+ * vdo_acquire_hash_lock() - Acquire or share a lock on a record name.
+ * @data_vio: The data_vio acquiring a lock on its record name.
+ *
+ * Acquire or share a lock on the hash (record name) of the data in a data_vio, updating the
+ * data_vio to reference the lock. This must only be called in the correct thread for the zone. In
+ * the unlikely case of a hash collision, this function will succeed, but the data_vio will not get
+ * a lock reference.
+ */
+void vdo_acquire_hash_lock(struct vdo_completion *completion)
+{
+	struct data_vio *data_vio = as_data_vio(completion);
+	struct hash_lock *lock;
+	int result;
+
+	assert_data_vio_in_hash_zone(data_vio);
+
+	result = assert_hash_lock_preconditions(data_vio);
+	if (result != VDO_SUCCESS) {
+		continue_data_vio_with_error(data_vio, result);
+		return;
+	}
+
+	result = acquire_lock(data_vio->hash_zone, &data_vio->record_name, NULL, &lock);
+	if (result != VDO_SUCCESS) {
+		continue_data_vio_with_error(data_vio, result);
+		return;
+	}
+
+	if (is_hash_collision(lock, data_vio)) {
+		/*
+		 * Hash collisions are extremely unlikely, but the bogus dedupe would be a data
+		 * corruption. Bypass optimization entirely. We can't compress a data_vio without
+		 * a hash_lock as the compressed write depends on the hash_lock to manage the
+		 * references for the compressed block.
+		 */
+		write_data_vio(data_vio);
+		return;
+	}
+
+	set_hash_lock(data_vio, lock);
+	switch (lock->state) {
+	case VDO_HASH_LOCK_INITIALIZING:
+		start_querying(lock, data_vio);
+		return;
+
+	case VDO_HASH_LOCK_QUERYING:
+	case VDO_HASH_LOCK_WRITING:
+	case VDO_HASH_LOCK_UPDATING:
+	case VDO_HASH_LOCK_LOCKING:
+	case VDO_HASH_LOCK_VERIFYING:
+	case VDO_HASH_LOCK_UNLOCKING:
+		/* The lock is busy, and can't be shared yet. */
+		wait_on_hash_lock(lock, data_vio);
+		return;
+
+	case VDO_HASH_LOCK_BYPASSING:
+		/* We can't use this lock, so bypass optimization entirely. */
+		vdo_release_hash_lock(data_vio);
+		write_data_vio(data_vio);
+		return;
+
+	case VDO_HASH_LOCK_DEDUPING:
+		launch_dedupe(lock, data_vio, false);
+		return;
+
+	default:
+		/* A lock in this state should not be acquired by new VIOs. */
+		report_bogus_lock_state(lock, data_vio);
+	}
+}
+
+/**
+ * vdo_release_hash_lock() - Release a data_vio's share of a hash lock, if held, and null out the
+ *                           data_vio's reference to it.
+ * @data_vio: The data_vio releasing its hash lock.
+ *
+ * If the data_vio is the only one holding the lock, this also releases any resources or locks used
+ * by the hash lock (such as a PBN read lock on a block containing data with the same hash) and
+ * returns the lock to the hash zone's lock pool.
+ *
+ * Context: This must only be called in the correct thread for the hash zone.
+ */
+void vdo_release_hash_lock(struct data_vio *data_vio)
+{
+	struct hash_lock *lock = data_vio->hash_lock;
+	struct hash_zone *zone = data_vio->hash_zone;
+
+	if (lock == NULL)
+		return;
+
+	set_hash_lock(data_vio, NULL);
+
+	if (lock->reference_count > 0)
+		/* The lock is still in use by other data_vios. */
+		return;
+
+	if (lock->registered) {
+		struct hash_lock *removed;
+
+		removed = vdo_pointer_map_remove(zone->hash_lock_map, &lock->hash);
+		ASSERT_LOG_ONLY(lock == removed, "hash lock being released must have been mapped");
+	} else {
+		ASSERT_LOG_ONLY(lock != vdo_pointer_map_get(zone->hash_lock_map, &lock->hash),
+				"unregistered hash lock must not be in the lock map");
+	}
+
+	ASSERT_LOG_ONLY(!vdo_has_waiters(&lock->waiters),
+			"hash lock returned to zone must have no waiters");
+	ASSERT_LOG_ONLY((lock->duplicate_lock == NULL),
+			"hash lock returned to zone must not reference a PBN lock");
+	ASSERT_LOG_ONLY((lock->state == VDO_HASH_LOCK_BYPASSING),
+			"returned hash lock must not be in use with state %s",
+			get_hash_lock_state_name(lock->state));
+	ASSERT_LOG_ONLY(list_empty(&lock->pool_node),
+			"hash lock returned to zone must not be in a pool ring");
+	ASSERT_LOG_ONLY(list_empty(&lock->duplicate_ring),
+			"hash lock returned to zone must not reference DataVIOs");
+
+	return_hash_lock_to_pool(zone, lock);
+}
+
+/**
+ * transfer_allocation_lock() - Transfer a data_vio's downgraded allocation PBN lock to the
+ *                              data_vio's hash lock, converting it to a duplicate PBN lock.
+ * @data_vio: The data_vio holding the allocation lock to transfer.
+ */
+static void transfer_allocation_lock(struct data_vio *data_vio)
+{
+	struct allocation *allocation = &data_vio->allocation;
+	struct hash_lock *hash_lock = data_vio->hash_lock;
+
+	ASSERT_LOG_ONLY(data_vio->new_mapped.pbn == allocation->pbn,
+			"transferred lock must be for the block written");
+
+	allocation->pbn = VDO_ZERO_BLOCK;
+
+	ASSERT_LOG_ONLY(vdo_is_pbn_read_lock(allocation->lock),
+			"must have downgraded the allocation lock before transfer");
+
+	hash_lock->duplicate = data_vio->new_mapped;
+	data_vio->duplicate = data_vio->new_mapped;
+
+	/*
+	 * Since the lock is being transferred, the holder count doesn't change (and isn't even
+	 * safe to examine on this thread).
+	 */
+	hash_lock->duplicate_lock = UDS_FORGET(allocation->lock);
+}
+
+/**
+ * vdo_share_compressed_write_lock() - Make a data_vio's hash lock a shared holder of the PBN lock
+ *                                     on the compressed block to which its data was just written.
+ * @data_vio: The data_vio which was just compressed.
+ * @pbn_lock: The PBN lock on the compressed block.
+ *
+ * If the lock is still a write lock (as it will be for the first share), it will be converted to a
+ * read lock. This also reserves a reference count increment for the data_vio.
+ */
+void vdo_share_compressed_write_lock(struct data_vio *data_vio, struct pbn_lock *pbn_lock)
+{
+	bool claimed;
+
+	ASSERT_LOG_ONLY(vdo_get_duplicate_lock(data_vio) == NULL,
+			"a duplicate PBN lock should not exist when writing");
+	ASSERT_LOG_ONLY(vdo_is_state_compressed(data_vio->new_mapped.state),
+			"lock transfer must be for a compressed write");
+	assert_data_vio_in_new_mapped_zone(data_vio);
+
+	/* First sharer downgrades the lock. */
+	if (!vdo_is_pbn_read_lock(pbn_lock))
+		vdo_downgrade_pbn_write_lock(pbn_lock, true);
+
+	/*
+	 * Get a share of the PBN lock, ensuring it cannot be released until after this data_vio
+	 * has had a chance to journal a reference.
+	 */
+	data_vio->duplicate = data_vio->new_mapped;
+	data_vio->hash_lock->duplicate = data_vio->new_mapped;
+	set_duplicate_lock(data_vio->hash_lock, pbn_lock);
+
+	/*
+	 * Claim a reference for this data_vio. Necessary since another hash_lock might start
+	 * deduplicating against it before our incRef.
+	 */
+	claimed = vdo_claim_pbn_lock_increment(pbn_lock);
+	ASSERT_LOG_ONLY(claimed, "impossible to fail to claim an initial increment");
+}
+
+/** compare_keys() - Implements pointer_key_comparator. */
+static bool compare_keys(const void *this_key, const void *that_key)
+{
+	/* Null keys are not supported. */
+	return (memcmp(this_key, that_key, sizeof(struct uds_record_name)) == 0);
+}
+
+/** hash_key() - Implements pointer_key_comparator. */
+static u32 hash_key(const void *key)
+{
+	const struct uds_record_name *name = key;
+
+	/* Use a fragment of the record name as a hash code. */
+	return get_unaligned_le32(&name->name[4]);
+}
+
+static int __must_check
+initialize_zone(struct vdo *vdo, struct hash_zones *zones, zone_count_t zone_number)
+{
+	int result;
+	data_vio_count_t i;
+	struct hash_zone *zone = &zones->zones[zone_number];
+
+	result = vdo_make_pointer_map(VDO_LOCK_MAP_CAPACITY,
+				      0,
+				      compare_keys,
+				      hash_key,
+				      &zone->hash_lock_map);
+	if (result != VDO_SUCCESS)
+		return result;
+
+	vdo_set_admin_state_code(&zone->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
+	zone->zone_number = zone_number;
+	zone->thread_id = vdo->thread_config.hash_zone_threads[zone_number];
+	vdo_initialize_completion(&zone->completion, vdo, VDO_HASH_ZONE_COMPLETION);
+	vdo_set_completion_callback(&zone->completion,
+				    timeout_index_operations_callback,
+				    zone->thread_id);
+	INIT_LIST_HEAD(&zone->lock_pool);
+	result = UDS_ALLOCATE(LOCK_POOL_CAPACITY,
+			      struct hash_lock,
+			      "hash_lock array",
+			      &zone->lock_array);
+	if (result != VDO_SUCCESS)
+		return result;
+
+	for (i = 0; i < LOCK_POOL_CAPACITY; i++)
+		return_hash_lock_to_pool(zone, &zone->lock_array[i]);
+
+	INIT_LIST_HEAD(&zone->available);
+	INIT_LIST_HEAD(&zone->pending);
+	result = uds_make_funnel_queue(&zone->timed_out_complete);
+	if (result != VDO_SUCCESS)
+		return result;
+
+	timer_setup(&zone->timer, timeout_index_operations, 0);
+
+	for (i = 0; i < MAXIMUM_VDO_USER_VIOS; i++) {
+		struct dedupe_context *context = &zone->contexts[i];
+
+		context->zone = zone;
+		context->request.callback = finish_index_operation;
+		context->request.session = zones->index_session;
+		list_add(&context->list_entry, &zone->available);
+	}
+
+	return vdo_make_default_thread(vdo, zone->thread_id);
+}
+
+/** get_thread_id_for_zone() - Implements vdo_zone_thread_getter. */
+static thread_id_t get_thread_id_for_zone(void *context, zone_count_t zone_number)
+{
+	struct hash_zones *zones = context;
+
+	return zones->zones[zone_number].thread_id;
+}
+
+/**
+ * vdo_make_hash_zones() - Create the hash zones.
+ *
+ * @vdo: The vdo to which the zone will belong.
+ * @zones_ptr: A pointer to hold the zones.
+ *
+ * Return: VDO_SUCCESS or an error code.
+ */
+int vdo_make_hash_zones(struct vdo *vdo, struct hash_zones **zones_ptr)
+{
+	int result;
+	struct hash_zones *zones;
+	zone_count_t z;
+	zone_count_t zone_count = vdo->thread_config.hash_zone_count;
+
+	if (zone_count == 0)
+		return VDO_SUCCESS;
+
+	result = UDS_ALLOCATE_EXTENDED(struct hash_zones,
+				       zone_count,
+				       struct hash_zone,
+				       __func__,
+				       &zones);
+	if (result != VDO_SUCCESS)
+		return result;
+
+	result = initialize_index(vdo, zones);
+	if (result != VDO_SUCCESS) {
+		UDS_FREE(zones);
+		return result;
+	}
+
+	vdo_set_admin_state_code(&zones->state, VDO_ADMIN_STATE_NEW);
+
+	zones->zone_count = zone_count;
+	for (z = 0; z < zone_count; z++) {
+		result = initialize_zone(vdo, zones, z);
+		if (result != VDO_SUCCESS) {
+			vdo_free_hash_zones(zones);
+			return result;
+		}
+	}
+
+	result = vdo_make_action_manager(zones->zone_count,
+					 get_thread_id_for_zone,
+					 vdo->thread_config.admin_thread,
+					 zones,
+					 NULL,
+					 vdo,
+					 &zones->manager);
+	if (result != VDO_SUCCESS) {
+		vdo_free_hash_zones(zones);
+		return result;
+	}
+
+	*zones_ptr = zones;
+	return VDO_SUCCESS;
+}
+
+void vdo_finish_dedupe_index(struct hash_zones *zones)
+{
+	if (zones == NULL)
+		return;
+
+	uds_destroy_index_session(UDS_FORGET(zones->index_session));
+}
+
+/**
+ * vdo_free_hash_zones() - Free the hash zones.
+ * @zones: The zone to free.
+ */
+void vdo_free_hash_zones(struct hash_zones *zones)
+{
+	zone_count_t i;
+
+	if (zones == NULL)
+		return;
+
+	UDS_FREE(UDS_FORGET(zones->manager));
+
+	for (i = 0; i < zones->zone_count; i++) {
+		struct hash_zone *zone = &zones->zones[i];
+
+		uds_free_funnel_queue(UDS_FORGET(zone->timed_out_complete));
+		vdo_free_pointer_map(UDS_FORGET(zone->hash_lock_map));
+		UDS_FREE(UDS_FORGET(zone->lock_array));
+	}
+
+	if (zones->index_session != NULL)
+		vdo_finish_dedupe_index(zones);
+
+	ratelimit_state_exit(&zones->ratelimiter);
+	if (vdo_get_admin_state_code(&zones->state) == VDO_ADMIN_STATE_NEW)
+		UDS_FREE(zones);
+	else
+		kobject_put(&zones->dedupe_directory);
+}
+
+static void initiate_suspend_index(struct admin_state *state)
+{
+	struct hash_zones *zones = container_of(state, struct hash_zones, state);
+	enum index_state index_state;
+
+	spin_lock(&zones->lock);
+	index_state = zones->index_state;
+	spin_unlock(&zones->lock);
+
+	if (index_state != IS_CLOSED) {
+		bool save = vdo_is_state_saving(&zones->state);
+		int result;
+
+		result = uds_suspend_index_session(zones->index_session, save);
+		if (result != UDS_SUCCESS)
+			uds_log_error_strerror(result, "Error suspending dedupe index");
+	}
+
+	vdo_finish_draining(state);
+}
+
+/**
+ * suspend_index() - Suspend the UDS index prior to draining hash zones.
+ *
+ * Implements vdo_action_preamble
+ */
+static void suspend_index(void *context, struct vdo_completion *completion)
+{
+	struct hash_zones *zones = context;
+
+	vdo_start_draining(&zones->state,
+			   vdo_get_current_manager_operation(zones->manager),
+			   completion,
+			   initiate_suspend_index);
+}
+
+/**
+ * initiate_drain() - Initiate a drain.
+ *
+ * Implements vdo_admin_initiator.
+ */
+static void initiate_drain(struct admin_state *state)
+{
+	check_for_drain_complete(container_of(state, struct hash_zone, state));
+}
+
+/**
+ * drain_hash_zone() - Drain a hash zone.
+ *
+ * Implements vdo_zone_action.
+ */
+static void drain_hash_zone(void *context, zone_count_t zone_number, struct vdo_completion *parent)
+{
+	struct hash_zones *zones = context;
+
+	vdo_start_draining(&zones->zones[zone_number].state,
+			   vdo_get_current_manager_operation(zones->manager),
+			   parent,
+			   initiate_drain);
+}
+
+/** vdo_drain_hash_zones() - Drain all hash zones. */
+void vdo_drain_hash_zones(struct hash_zones *zones, struct vdo_completion *parent)
+{
+	vdo_schedule_operation(zones->manager,
+			       parent->vdo->suspend_type,
+			       suspend_index,
+			       drain_hash_zone,
+			       NULL,
+			       parent);
+}
+
+static void launch_dedupe_state_change(struct hash_zones *zones)
+{
+	/* ASSERTION: We enter with the lock held. */
+	if (zones->changing || !vdo_is_state_normal(&zones->state))
+		/* Either a change is already in progress, or changes are not allowed. */
+		return;
+
+	if (zones->create_flag || (zones->index_state != zones->index_target)) {
+		zones->changing = true;
+		vdo_launch_completion(&zones->completion);
+		return;
+	}
+
+	/* ASSERTION: We exit with the lock held. */
+}
+
+/**
+ * resume_index() - Resume the UDS index prior to resuming hash zones.
+ *
+ * Implements vdo_action_preamble
+ */
+static void resume_index(void *context, struct vdo_completion *parent)
+{
+	struct hash_zones *zones = context;
+	struct device_config *config = parent->vdo->device_config;
+	int result;
+
+	zones->parameters.name = config->parent_device_name;
+	result = uds_resume_index_session(zones->index_session, zones->parameters.name);
+	if (result != UDS_SUCCESS)
+		uds_log_error_strerror(result, "Error resuming dedupe index");
+
+	spin_lock(&zones->lock);
+	vdo_resume_if_quiescent(&zones->state);
+
+	if (config->deduplication) {
+		zones->index_target = IS_OPENED;
+		WRITE_ONCE(zones->dedupe_flag, true);
+	} else {
+		zones->index_target = IS_CLOSED;
+	}
+
+	launch_dedupe_state_change(zones);
+	spin_unlock(&zones->lock);
+
+	vdo_finish_completion(parent);
+}
+
+/**
+ * resume_hash_zone() - Resume a hash zone.
+ *
+ * Implements vdo_zone_action.
+ */
+static void
+resume_hash_zone(void *context, zone_count_t zone_number, struct vdo_completion *parent)
+{
+	struct hash_zone *zone = &(((struct hash_zones *) context)->zones[zone_number]);
+
+	vdo_fail_completion(parent, vdo_resume_if_quiescent(&zone->state));
+}
+
+/**
+ * vdo_resume_hash_zones() - Resume a set of hash zones.
+ * @zones: The hash zones to resume.
+ * @parent: The object to notify when the zones have resumed.
+ */
+void vdo_resume_hash_zones(struct hash_zones *zones, struct vdo_completion *parent)
+{
+	if (vdo_is_read_only(parent->vdo)) {
+		vdo_launch_completion(parent);
+		return;
+	}
+
+	vdo_schedule_operation(zones->manager,
+			       VDO_ADMIN_STATE_RESUMING,
+			       resume_index,
+			       resume_hash_zone,
+			       NULL,
+			       parent);
+}
+
+/**
+ * get_hash_zone_statistics() - Add the statistics for this hash zone to the tally for all zones.
+ * @zone: The hash zone to query.
+ * @tally: The tally
+ */
+static void
+get_hash_zone_statistics(const struct hash_zone *zone, struct hash_lock_statistics *tally)
+{
+	const struct hash_lock_statistics *stats = &zone->statistics;
+
+	tally->dedupe_advice_valid += READ_ONCE(stats->dedupe_advice_valid);
+	tally->dedupe_advice_stale += READ_ONCE(stats->dedupe_advice_stale);
+	tally->concurrent_data_matches += READ_ONCE(stats->concurrent_data_matches);
+	tally->concurrent_hash_collisions += READ_ONCE(stats->concurrent_hash_collisions);
+	tally->curr_dedupe_queries += READ_ONCE(zone->active);
+}
+
+static void get_index_statistics(struct hash_zones *zones, struct index_statistics *stats)
+{
+	enum index_state state;
+	struct uds_index_stats index_stats;
+	int result;
+
+	spin_lock(&zones->lock);
+	state = zones->index_state;
+	spin_unlock(&zones->lock);
+
+	if (state != IS_OPENED)
+		return;
+
+	result = uds_get_index_session_stats(zones->index_session, &index_stats);
+	if (result != UDS_SUCCESS) {
+		uds_log_error_strerror(result, "Error reading index stats");
+		return;
+	}
+
+	stats->entries_indexed = index_stats.entries_indexed;
+	stats->posts_found = index_stats.posts_found;
+	stats->posts_not_found = index_stats.posts_not_found;
+	stats->queries_found = index_stats.queries_found;
+	stats->queries_not_found = index_stats.queries_not_found;
+	stats->updates_found = index_stats.updates_found;
+	stats->updates_not_found = index_stats.updates_not_found;
+	stats->entries_discarded = index_stats.entries_discarded;
+}
+
+/**
+ * vdo_get_dedupe_statistics() - Tally the statistics from all the hash zones and the UDS index.
+ * @hash_zones: The hash zones to query
+ *
+ * Return: The sum of the hash lock statistics from all hash zones plus the statistics from the UDS
+ *         index
+ */
+void vdo_get_dedupe_statistics(struct hash_zones *zones, struct vdo_statistics *stats)
+
+{
+	zone_count_t zone;
+
+	for (zone = 0; zone < zones->zone_count; zone++)
+		get_hash_zone_statistics(&zones->zones[zone], &stats->hash_lock);
+
+	get_index_statistics(zones, &stats->index);
+
+	/*
+	 * zones->timeouts gives the number of timeouts, and dedupe_context_busy gives the number
+	 * of queries not made because of earlier timeouts.
+	 */
+	stats->dedupe_advice_timeouts =
+		(atomic64_read(&zones->timeouts) + atomic64_read(&zones->dedupe_context_busy));
+}
+
+/**
+ * vdo_select_hash_zone() - Select the hash zone responsible for locking a given record name.
+ * @zones: The hash_zones from which to select.
+ * @name: The record name.
+ *
+ * Return: The hash zone responsible for the record name.
+ */
+struct hash_zone *
+vdo_select_hash_zone(struct hash_zones *zones, const struct uds_record_name *name)
+{
+	/*
+	 * Use a fragment of the record name as a hash code. Eight bits of hash should suffice
+	 * since the number of hash zones is small.
+	 * TODO: Verify that the first byte is independent enough.
+	 */
+	u32 hash = name->name[0];
+
+	/*
+	 * Scale the 8-bit hash fragment to a zone index by treating it as a binary fraction and
+	 * multiplying that by the zone count. If the hash is uniformly distributed over [0 ..
+	 * 2^8-1], then (hash * count / 2^8) should be uniformly distributed over [0 .. count-1].
+	 * The multiply and shift is much faster than a divide (modulus) on X86 CPUs.
+	 */
+	hash = (hash * zones->zone_count) >> 8;
+	return &zones->zones[hash];
+}
+
+/**
+ * dump_hash_lock() - Dump a compact description of hash_lock to the log if the lock is not on the
+ *                    free list.
+ * @lock: The hash lock to dump.
+ */
+static void dump_hash_lock(const struct hash_lock *lock)
+{
+	const char *state;
+
+	if (!list_empty(&lock->pool_node))
+		/* This lock is on the free list. */
+		return;
+
+	/*
+	 * Necessarily cryptic since we can log a lot of these. First three chars of state is
+	 * unambiguous. 'U' indicates a lock not registered in the map.
+	 */
+	state = get_hash_lock_state_name(lock->state);
+	uds_log_info("  hl %px: %3.3s %c%llu/%u rc=%u wc=%zu agt=%px",
+		     (const void *) lock, state, (lock->registered ? 'D' : 'U'),
+		     (unsigned long long) lock->duplicate.pbn,
+		     lock->duplicate.state, lock->reference_count,
+		     vdo_count_waiters(&lock->waiters), (void *) lock->agent);
+}
+
+static const char *index_state_to_string(struct hash_zones *zones, enum index_state state)
+{
+	if (!vdo_is_state_normal(&zones->state))
+		return SUSPENDED;
+
+	switch (state) {
+	case IS_CLOSED:
+		return zones->error_flag ? ERROR : CLOSED;
+	case IS_CHANGING:
+		return zones->index_target == IS_OPENED ? OPENING : CLOSING;
+	case IS_OPENED:
+		return READ_ONCE(zones->dedupe_flag) ? ONLINE : OFFLINE;
+	default:
+		return UNKNOWN;
+	}
+}
+
+/**
+ * vdo_dump_hash_zone() - Dump information about a hash zone to the log for debugging.
+ * @zone: The zone to dump.
+ */
+static void dump_hash_zone(const struct hash_zone *zone)
+{
+	data_vio_count_t i;
+
+	if (zone->hash_lock_map == NULL) {
+		uds_log_info("struct hash_zone %u: NULL map", zone->zone_number);
+		return;
+	}
+
+	uds_log_info("struct hash_zone %u: mapSize=%zu",
+		     zone->zone_number,
+		     vdo_pointer_map_size(zone->hash_lock_map));
+	for (i = 0; i < LOCK_POOL_CAPACITY; i++)
+		dump_hash_lock(&zone->lock_array[i]);
+}
+
+/**
+ * vdo_dump_hash_zones() - Dump information about the hash zones to the log for debugging.
+ * @zones: The zones to dump.
+ */
+void vdo_dump_hash_zones(struct hash_zones *zones)
+{
+	const char *state, *target;
+	zone_count_t zone;
+
+	spin_lock(&zones->lock);
+	state = index_state_to_string(zones, zones->index_state);
+	target = (zones->changing ? index_state_to_string(zones, zones->index_target) : NULL);
+	spin_unlock(&zones->lock);
+
+	uds_log_info("UDS index: state: %s", state);
+	if (target != NULL)
+		uds_log_info("UDS index: changing to state: %s", target);
+
+	for (zone = 0; zone < zones->zone_count; zone++)
+		dump_hash_zone(&zones->zones[zone]);
+}
diff --git a/drivers/md/dm-vdo/dedupe.h b/drivers/md/dm-vdo/dedupe.h
new file mode 100644
index 000000000000..af329fb0fa68
--- /dev/null
+++ b/drivers/md/dm-vdo/dedupe.h
@@ -0,0 +1,93 @@
+/* SPDX-License-Identifier: GPL-2.0-only */
+/*
+ * Copyright 2023 Red Hat
+ */
+
+#ifndef VDO_DEDUPE_H
+#define VDO_DEDUPE_H
+
+#include <linux/list.h>
+#include <linux/timer.h>
+
+#include "uds.h"
+
+#include "admin-state.h"
+#include "constants.h"
+#include "statistics.h"
+#include "types.h"
+#include "wait-queue.h"
+
+struct dedupe_context {
+	struct hash_zone *zone;
+	struct uds_request request;
+	struct list_head list_entry;
+	struct funnel_queue_entry queue_entry;
+	u64 submission_jiffies;
+	struct data_vio *requestor;
+	atomic_t state;
+};
+
+struct hash_lock;
+
+struct hash_zone {
+	/* Which hash zone this is */
+	zone_count_t zone_number;
+
+	/* The administrative state of the zone */
+	struct admin_state state;
+
+	/* The thread ID for this zone */
+	thread_id_t thread_id;
+
+	/* Mapping from record name fields to hash_locks */
+	struct pointer_map *hash_lock_map;
+
+	/* List containing all unused hash_locks */
+	struct list_head lock_pool;
+
+	/*
+	 * Statistics shared by all hash locks in this zone. Only modified on the hash zone thread,
+	 * but queried by other threads.
+	 */
+	struct hash_lock_statistics statistics;
+
+	/* Array of all hash_locks */
+	struct hash_lock *lock_array;
+
+	/* These fields are used to manage the dedupe contexts */
+	struct list_head available;
+	struct list_head pending;
+	struct funnel_queue *timed_out_complete;
+	struct timer_list timer;
+	struct vdo_completion completion;
+	unsigned int active;
+	atomic_t timer_state;
+
+	/* The dedupe contexts for querying the index from this zone */
+	struct dedupe_context contexts[MAXIMUM_VDO_USER_VIOS];
+};
+
+struct hash_zones;
+
+struct pbn_lock * __must_check vdo_get_duplicate_lock(struct data_vio *data_vio);
+
+void vdo_acquire_hash_lock(struct vdo_completion *completion);
+void vdo_continue_hash_lock(struct vdo_completion *completion);
+void vdo_release_hash_lock(struct data_vio *data_vio);
+void vdo_clean_failed_hash_lock(struct data_vio *data_vio);
+void vdo_share_compressed_write_lock(struct data_vio *data_vio, struct pbn_lock *pbn_lock);
+
+int __must_check vdo_make_hash_zones(struct vdo *vdo, struct hash_zones **zones_ptr);
+
+void vdo_free_hash_zones(struct hash_zones *zones);
+
+void vdo_drain_hash_zones(struct hash_zones *zones, struct vdo_completion *parent);
+
+void vdo_get_dedupe_statistics(struct hash_zones *zones, struct vdo_statistics *stats);
+
+struct hash_zone * __must_check
+vdo_select_hash_zone(struct hash_zones *zones, const struct uds_record_name *name);
+
+void vdo_dump_hash_zones(struct hash_zones *zones);
+
+#endif /* VDO_DEDUPE_H */
-- 
2.40.0

--
dm-devel mailing list
dm-devel@redhat.com
https://listman.redhat.com/mailman/listinfo/dm-devel


  parent reply	other threads:[~2023-09-14 19:18 UTC|newest]

Thread overview: 40+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-09-14 19:15 [dm-devel] [PATCH v3 00/39] dm: baseline for the VDO target Mike Snitzer
2023-09-14 19:15 ` [dm-devel] [PATCH v3 01/39] dm: add documentation for dm-vdo target Mike Snitzer
2023-09-14 19:15 ` [dm-devel] [PATCH v3 02/39] dm vdo: add the MurmurHash3 fast hashing algorithm Mike Snitzer
2023-09-14 19:15 ` [dm-devel] [PATCH v3 03/39] dm vdo: add memory allocation utilities Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 04/39] dm vdo: add basic logging and support utilities Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 05/39] dm vdo: add type declarations, constants, and simple data structures Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 06/39] dm vdo: add thread and synchronization utilities Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 07/39] dm vdo: add specialized request queueing functionality Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 08/39] dm vdo: add basic hash map data structures Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 09/39] dm vdo: add deduplication configuration structures Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 10/39] dm vdo: add deduplication index storage interface Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 11/39] dm vdo: implement the delta index Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 12/39] dm vdo: implement the volume index Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 13/39] dm vdo: implement the open chapter and chapter indexes Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 14/39] dm vdo: implement the chapter volume store Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 15/39] dm vdo: implement top-level deduplication index Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 16/39] dm vdo: implement external deduplication index interface Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 17/39] dm vdo: add administrative state and action manager Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 18/39] dm vdo: add vio, the request object for vdo metadata Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 19/39] dm vdo: add data_vio, the request object which services incoming bios Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 20/39] dm vdo: add flush support Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 21/39] dm vdo: add the io_submitter Mike Snitzer
2023-09-14 19:16 ` Mike Snitzer [this message]
2023-09-14 19:16 ` [dm-devel] [PATCH v3 23/39] dm vdo: add use of the deduplication index in hash zones Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 24/39] dm vdo: add the compressed block bin packer Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 25/39] dm vdo: add slab structure, slab journal and reference counters Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 26/39] dm vdo: add the slab summary Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 27/39] dm vdo: add the block allocators and physical zones Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 28/39] dm vdo: add the slab depot Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 29/39] dm vdo: add the block map Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 30/39] dm vdo: implement the vdo block map page cache Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 31/39] dm vdo: add the vdo recovery journal Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 32/39] dm vdo: add repair of damanged vdo volumes Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 33/39] dm vdo: add the vdo structure itself Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 34/39] dm vdo: add the on-disk formats and marshalling of vdo structures Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 35/39] dm vdo: add statistics reporting Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 36/39] dm vdo: add sysfs support for setting vdo params and reading stats Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 37/39] dm vdo: add debugging support Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 38/39] dm vdo: add the top-level DM target Mike Snitzer
2023-09-14 19:16 ` [dm-devel] [PATCH v3 39/39] dm vdo: enable configuration and building of dm-vdo Mike Snitzer

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20230914191635.39805-23-snitzer@kernel.org \
    --to=snitzer@kernel.org \
    --cc=dm-devel@redhat.com \
    --cc=msakai@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.