All of lore.kernel.org
 help / color / mirror / Atom feed
From: heinzm@redhat.com
To: dm-devel@redhat.com
Cc: Heinz Mauelshagen <heinzm@redhat.com>
Subject: [PATCH v6 2/4] dm-replicator: replication log and site link handler interfaces and main replicator module
Date: Fri, 18 Dec 2009 16:44:31 +0100	[thread overview]
Message-ID: <1261151073-25962-3-git-send-email-heinzm@redhat.com> (raw)
In-Reply-To: <1261151073-25962-2-git-send-email-heinzm@redhat.com>

From: Heinz Mauelshagen <heinzm@redhat.com>

These are the interface definitions for the replication log and the site link
handler plus the main replicator module plugging into the dm core interface
to construct/destruct/... replication control ("replicator" target)
and data devices ("replicator-dev" target).

The "replicator" control target handles the replication log and
the site link properties (eg. log size or asynchronous replication),
while the "replicator-dev" target handles all local and remote device
properties (eg. their paths and dirty log parameters).
 
 
Signed-off-by: Heinz Mauelshagen <heinzm@redhat.com>
Reviewed-by: Jon Brassow <jbrassow@redhat.com> 
Tested-by: Jon Brassow <jbrassow@redhat.com>
---
 drivers/md/Makefile        |    4 +-
 drivers/md/dm-repl-log.h   |  120 +++
 drivers/md/dm-repl-slink.h |  313 +++++++
 drivers/md/dm-repl.c       | 2004 ++++++++++++++++++++++++++++++++++++++++++++
 drivers/md/dm-repl.h       |  127 +++
 drivers/md/dm.c            |    1 +
 6 files changed, 2568 insertions(+), 1 deletions(-)
 create mode 100644 drivers/md/dm-repl-log.h
 create mode 100644 drivers/md/dm-repl-slink.h
 create mode 100644 drivers/md/dm-repl.c
 create mode 100644 drivers/md/dm-repl.h

diff --git 2.6.33-rc1.orig/drivers/md/Makefile 2.6.33-rc1/drivers/md/Makefile
index be05b39..832d547 100644
--- 2.6.33-rc1.orig/drivers/md/Makefile
+++ 2.6.33-rc1/drivers/md/Makefile
@@ -8,6 +8,7 @@ dm-multipath-y	+= dm-path-selector.o dm-mpath.o
 dm-snapshot-y	+= dm-snap.o dm-exception-store.o dm-snap-transient.o \
 		    dm-snap-persistent.o
 dm-mirror-y	+= dm-raid1.o
+dm-replicator-y	+= dm-repl.o
 dm-log-userspace-y \
 		+= dm-log-userspace-base.o dm-log-userspace-transfer.o
 md-mod-y	+= md.o bitmap.o
@@ -44,7 +45,8 @@ obj-$(CONFIG_DM_SNAPSHOT)	+= dm-snapshot.o
 obj-$(CONFIG_DM_MIRROR)		+= dm-mirror.o dm-log.o dm-region-hash.o
 obj-$(CONFIG_DM_LOG_USERSPACE)	+= dm-log-userspace.o
 obj-$(CONFIG_DM_ZERO)		+= dm-zero.o
-obj-$(CONFIG_DM_REPLICATOR)	+= dm-log.o dm-registry.o
+obj-$(CONFIG_DM_REPLICATOR)	+= dm-replicator.o \
+				   dm-log.o dm-registry.o
 
 quiet_cmd_unroll = UNROLL  $@
       cmd_unroll = $(AWK) -f$(srctree)/$(src)/unroll.awk -vN=$(UNROLL) \
diff --git 2.6.33-rc1.orig/drivers/md/dm-repl-log.h 2.6.33-rc1/drivers/md/dm-repl-log.h
new file mode 100644
index 0000000..cff74ed
--- /dev/null
+++ 2.6.33-rc1/drivers/md/dm-repl-log.h
@@ -0,0 +1,120 @@
+/*
+ * Copyright (C) 2009 Red Hat, Inc. All rights reserved.
+ *
+ * Module Author: Heinz Mauelshagen (heinzm@redhat.com)
+ *
+ * This file is released under the GPL.
+ */
+
+/*
+ * API calling convention to create a replication mapping:
+ *
+ * 1. get a replicator log handle, hence creating a new persistent
+ *    log or accessing an existing one
+ * 2. get an slink handle, hence creating a new transient
+ *    slink or accessing an existing one
+ * 2(cont). repeat the previous step for multiple slinks (eg. one for
+ *    local and one for remote device access)
+ * 3. bind a (remote) device to a particlar slink created in a previous step
+ * 3(cont). repeat the device binding for any additional devices on that slink
+ * 4. bind the created slink which has device(s) bound to it to the replog
+ * 4(cont). repeat the slink binding to the replog for all created slinks
+ * 5. call the replog io function for each IO.
+ *
+ * Reverse steps 1-4 to tear a replication mapping down, hence freeing all
+ * transient resources allocated to it.
+ */
+
+#ifndef _DM_REPL_LOG_H
+#define _DM_REPL_LOG_H
+
+#include "dm-repl.h"
+#include "dm-registry.h"
+#include "dm-repl-slink.h"
+
+/* Handle to access a replicator log. */
+struct dm_repl_log {
+	struct dm_repl_log_type *ops;
+	void *context;
+};
+
+/* List of site links hanging off of each replicator log. */
+struct dm_repl_log_slink_list {
+	rwlock_t lock;
+	struct list_head list; /* List of site links hanging of off this log. */
+	void *context; /* Caller context. */
+};
+
+struct dm_repl_log_type {
+	struct dm_registry_type type;
+
+	/* Construct/destruct a replicator log. */
+	int (*ctr)(struct dm_repl_log *, struct dm_target *,
+		   unsigned argc, char **argv);
+	void (*dtr)(struct dm_repl_log *, struct dm_target *);
+
+	/*
+	 * There are times when we want the log to be quiet.
+	 * Ie. no entries of the log will be copied accross site links.
+	 */
+	int (*postsuspend)(struct dm_repl_log *log, int dev_number);
+	int (*resume)(struct dm_repl_log *log, int dev_number);
+
+	/* Flush the current log contents. This function may block. */
+	int (*flush)(struct dm_repl_log *log);
+
+	/*
+	 * Read a bio either from a replicator logs backing store
+	 * (if supported) or from the replicated device if no buffer entry.
+	 * - or-
+	 * write a bio to a replicator logs backing store buffer.
+	 *
+	 * This includes buffer allocation in case of a write and
+	 * inititation of copies accross an/multiple site link(s).
+	 *
+	 * In case of a read with (partial) writes in the buffer,
+	 * the replog may postpone the read until the buffer content has
+	 * been copied accross the local site link *or* optimize by reading
+	 * (parts of) the bio off the buffer.
+	 *
+	 * Tag us a unique tag identifying a data set.
+	 */
+	int (*io)(struct dm_repl_log *, struct bio *, unsigned long long tag);
+
+	/* Endio function to call from dm_repl core on bio endio processing. */
+	int (*endio) (struct dm_repl_log *, struct bio *bio, int error,
+		      union map_info *map_context);
+
+	/* Set global I/O completion notification function and context- */
+	void (*io_notify_fn_set)(struct dm_repl_log *,
+				 dm_repl_notify_fn, void *context);
+
+	/*
+	 * Add (tie) a site link to a replication
+	 * log for site link copy processing.
+	 */
+	int (*slink_add)(struct dm_repl_log *, struct dm_repl_slink *);
+
+	/* Remove (untie) a site link from a replication log. */
+	int (*slink_del)(struct dm_repl_log *, struct dm_repl_slink *);
+
+	/*
+	 * Return list of site links added to a replication log.
+	 *
+	 * This method eases slink handler coding to
+	 * keep such replication log site link list.
+	 */
+	struct dm_repl_log_slink_list *(*slinks)(struct dm_repl_log *);
+
+	/* Return maximum number of supported site links. */
+	int (*slink_max)(struct dm_repl_log *);
+
+	/* REPLOG messages. */
+	int (*message)(struct dm_repl_log *, unsigned argc, char **argv);
+
+	/* Support function for replicator log status requests. */
+	int (*status)(struct dm_repl_log *, int dev_number, status_type_t,
+		      char *result, unsigned maxlen);
+};
+
+#endif /* #ifndef _DM_REPL_LOG_H */
diff --git 2.6.33-rc1.orig/drivers/md/dm-repl-slink.h 2.6.33-rc1/drivers/md/dm-repl-slink.h
new file mode 100644
index 0000000..ddf4ef7
--- /dev/null
+++ 2.6.33-rc1/drivers/md/dm-repl-slink.h
@@ -0,0 +1,313 @@
+/*
+ * Copyright (C) 2009 Red Hat, Inc. All rights reserved.
+ *
+ * Module Author: Heinz Mauelshagen (heinzm@redhat.com)
+ *
+ * This file is released under the GPL.
+ */
+
+/*
+ * API calling convention to create a replication mapping:
+ *
+ * 1. get a replicator log handle, hence creating a new persistent
+ *    log or accessing an existing one
+ * 2. get an slink handle, hence creating a new transient
+ *    slink or accessing an existing one
+ * 2(cont). repeat the previous step for multiple slinks (eg. one for
+ *    local and one for remote device access)
+ * 3. bind a (remote) device to a particlar slink created in a previous step
+ * 3(cont). repeat the device binding for any additional devices on that slink
+ * 4. bind the created slink which has device(s) bound to it to the replog
+ * 4(cont). repeat the slink binding to the replog for all created slinks
+ * 5. call the replog write function for each write IO and the replog hit
+ *    function for each read IO..
+ *
+ * Reverse steps 1-4 to tear a replication mapping down, hence freeing all
+ * transient resources allocated to it.
+ */
+
+#ifndef _DM_REPL_SLINK_IO_H
+#define _DM_REPL_SLINK_IO_H
+
+#include "dm.h"
+#include "dm-repl.h"
+#include "dm-registry.h"
+
+#include <linux/dm-io.h>
+
+/* Handle to access a site link. */
+struct dm_repl_slink {
+	struct dm_repl_slink_type *ops;
+	void *context;	/* Private slink (callee) context. */
+	void *caller;	/* Caller context to (optionally) tie to slink. */
+};
+
+/*
+ * Start copy function parameters.
+ */
+/* Copy device address union content type. */
+enum dm_repl_slink_dev_type {
+	DM_REPL_SLINK_BLOCK_DEVICE,	/* Copying from/to block_device. */
+	DM_REPL_SLINK_DEV_NUMBER,	/* Copying from/to device number. */
+};
+
+/* Copy device address. */
+struct dm_repl_slink_copy_addr {
+	/* Union content type. */
+	enum dm_repl_slink_dev_type type;
+
+	/* Either address is block_device or slink/device # pair. */
+	union {
+		struct block_device *bdev;
+		struct {
+			unsigned slink;
+			unsigned dev;
+		} number;
+	} dev;
+
+	/* Sector offset on device to copy to/from. */
+	sector_t sector;
+};
+
+/* Copy notification callback parameters. */
+struct dm_repl_slink_notify_ctx {
+	dm_repl_notify_fn fn;
+	void *context;
+};
+
+/* Copy function structure to pass in from caller. */
+struct dm_repl_slink_copy {
+	struct dm_repl_slink_copy_addr src; /* Source address of copy. */
+	struct dm_repl_slink_copy_addr dst; /* Destination address of copy. */
+	unsigned size;			    /* Size of copy [bytes]. */
+
+	/* Notification callback for data transfered to (remote) RAM. */
+	struct dm_repl_slink_notify_ctx ram;
+	/* Notification callback for data transfered to (remote) disk. */
+	struct dm_repl_slink_notify_ctx disk;
+};
+/*
+ * End copy function parameters.
+ */
+
+/* SLINK policies */
+enum dm_repl_slink_policy_type {
+	DM_REPL_SLINK_ASYNC,
+	DM_REPL_SLINK_SYNC,
+	DM_REPL_SLINK_STALL,
+};
+
+/* SLINK states */
+enum dm_repl_slink_state_type {
+	DM_REPL_SLINK_DOWN,
+	DM_REPL_SLINK_READ_ERROR,
+	DM_REPL_SLINK_WRITE_ERROR,
+};
+
+/* SLINK fallbehind information. */
+/* Definition of fall behind values. */
+enum dm_repl_slink_fallbehind_type {
+	DM_REPL_SLINK_FB_IOS,		/* Number of IOs. */
+	DM_REPL_SLINK_FB_SIZE,		/* In sectors unless unit. */
+	DM_REPL_SLINK_FB_TIMEOUT,	/* In ms unless unit. */
+};
+struct dm_repl_slink_fallbehind {
+	enum dm_repl_slink_fallbehind_type type;
+	sector_t value;
+	sector_t multiplier;
+	char unit;
+};
+
+struct dm_repl_log;
+
+/* SLINK handler interface type. */
+struct dm_repl_slink_type {
+	/* Must be first to allow for registry abstraction! */
+	struct dm_registry_type type;
+
+	/* Construct/destruct a site link. */
+	int (*ctr)(struct dm_repl_slink *, struct dm_repl_log *,
+		   unsigned argc, char **argv);
+	void (*dtr)(struct dm_repl_slink *);
+
+	/*
+	 * There are times when we want the slink to be quiet.
+	 * Ie. no checks will run on slinks and no initial
+	 * resynchronization will be performed.
+	 */
+	int (*postsuspend)(struct dm_repl_slink *slink, int dev_number);
+	int (*resume)(struct dm_repl_slink *slink, int dev_number);
+
+	/* Add a device to a site link. */
+	int (*dev_add)(struct dm_repl_slink *, int dev_number,
+		       struct dm_target *ti, unsigned argc, char **argv);
+
+	/* Delete a device from a site link. */
+	int (*dev_del)(struct dm_repl_slink *, int dev_number);
+
+	/*
+	 * Initiate data copy across a site link.
+	 *
+	 * This function may be used to copy a buffer entry *or*
+	 * for resynchronizing regions initially or when an SLINK
+	 * has fallen back to dirty log (bitmap) mode.
+	 *
+	 * The dm_repl_slink_copy can be allocated on the stack,
+	 * because copies of its members are taken before the function returns.
+	 *
+	 * The function will call 2 callbacks, one to report data in (remote)
+	 * RAM and another one to report data on (remote) disk
+	 * (see dm_repl_slink_copy structure for details).
+	 *
+	 * Tag is a unique tag to identify a data set.
+	 *
+	 *
+	 * The return codes are defined as follows:
+	 *
+	 * o -EAGAIN in case of prohibiting I/O because
+	 *    of device inaccessibility/suspension
+	 *    or device I/O errors
+	 *    (i.e. link temporarilly down) ->
+	 *    caller is allowed to retry the I/O later once
+	 *    he'll have received a callback.
+	 *
+	 * o -EACCES in case a region is being resynchronized
+	 *    and the source region is being read to copy data
+	 *    accross to the same region of the replica (RD) ->
+	 *    caller is allowed to retry the I/O later once
+	 *    he'll have received a callback.
+	 *
+	 * o -ENODEV in case a device is not configured
+	 *    caller must drop the I/O to the device/slink pair.
+	 *
+	 * o -EPERM in case a region is out of sync ->
+	 *    caller must drop the I/O to the device/slink pair.
+	 */
+	int (*copy)(struct dm_repl_slink *, struct dm_repl_slink_copy *,
+		    unsigned long long tag);
+
+	/* Submit bio to underlying transport. */
+	int (*io)(struct dm_repl_slink *, struct bio *,
+		  unsigned long long tag);
+
+	/* Endio function to call from dm_repl core on bio endio processing. */
+	int (*endio) (struct dm_repl_slink *, struct bio *bio, int error,
+		      union map_info *map_context);
+
+	/* Unplug request queues on all devices on slink. */
+	int (*unplug)(struct dm_repl_slink *);
+
+	/* Set global recovery notification function and context- */
+	void (*recover_notify_fn_set)(struct dm_repl_slink *,
+				      dm_repl_notify_fn, void *context);
+
+	/* Set/clear sync status of sector. */
+	int (*set_sync)(struct dm_repl_slink *, int dev_number,
+			sector_t sector, int in_sync);
+
+	/* Flush any dirty logs on slink. */
+	int (*flush_sync)(struct dm_repl_slink *);
+
+	/* Trigger resynchronization of devices on slink. */
+	int (*resync)(struct dm_repl_slink *slink, int resync);
+
+	/* Return > 0 if region is in sync on all slinks. */
+	int (*in_sync)(struct dm_repl_slink *slink, int dev_number,
+		       sector_t region);
+
+	/* Site link policies. */
+	enum dm_repl_slink_policy_type (*policy)(struct dm_repl_slink *);
+
+	/* Site link state. */
+	enum dm_repl_slink_state_type (*state)(struct dm_repl_slink *);
+
+	/* Return reference to fallbehind information. */
+	struct dm_repl_slink_fallbehind *(*fallbehind)(struct dm_repl_slink *);
+
+	/* Return device number for block_device on slink if any. */
+	int (*dev_number)(struct dm_repl_slink *, struct block_device *);
+
+	/* Return # of the SLINK. */
+	int (*slink_number)(struct dm_repl_slink *);
+
+	/* Return SLINK by number. */
+	struct dm_repl_slink *(*slink)(struct dm_repl_log *,
+				       unsigned slink_number);
+
+	/* SLINK status requests. */
+	int (*status)(struct dm_repl_slink *, int dev_number,
+		      status_type_t, char *result, unsigned int maxlen);
+
+	/* SLINK messages (eg. change policy). */
+	int (*message)(struct dm_repl_slink *, unsigned argc, char **argv);
+};
+
+/* Policy and state access inlines. */
+/* Policy synchronous. */
+static inline int
+slink_policy_synchronous(enum dm_repl_slink_policy_type policy)
+{
+	return test_bit(DM_REPL_SLINK_SYNC, (unsigned long *) &policy);
+}
+
+/* Slink synchronous. */
+static inline int
+slink_synchronous(struct dm_repl_slink *slink)
+{
+	return slink_policy_synchronous(slink->ops->policy(slink));
+}
+
+/* Policy asynchronous. */
+static inline int
+slink_policy_asynchronous(enum dm_repl_slink_policy_type policy)
+{
+	return test_bit(DM_REPL_SLINK_ASYNC, (unsigned long *) &policy);
+}
+
+/* Slink asynchronous. */
+static inline int
+slink_asynchronous(struct dm_repl_slink *slink)
+{
+	return slink_policy_asynchronous(slink->ops->policy(slink));
+}
+
+/* Policy stall. */
+static inline int
+slink_policy_stall(enum dm_repl_slink_policy_type policy)
+{
+	return test_bit(DM_REPL_SLINK_STALL, (unsigned long *) &policy);
+}
+
+/* Slink stall. */
+static inline int
+slink_stall(struct dm_repl_slink *slink)
+{
+	return slink_policy_stall(slink->ops->policy(slink));
+}
+
+/* State down.*/
+static inline int
+slink_state_down(enum dm_repl_slink_state_type state)
+{
+	return test_bit(DM_REPL_SLINK_DOWN, (unsigned long *) &state);
+}
+
+/* Slink down.*/
+static inline int
+slink_down(struct dm_repl_slink *slink)
+{
+	return slink_state_down(slink->ops->state(slink));
+}
+
+/* Setup of site links. */
+/* Create/destroy a transient replicator site link */
+struct dm_repl_slink *
+dm_repl_slink_get(char *name, struct dm_repl_log *,
+		  unsigned argc, char **argv);
+void dm_repl_slink_put(struct dm_repl_slink *);
+
+/* init/exit functions. */
+int dm_repl_slink_init(void);
+void dm_repl_slink_exit(void);
+
+#endif
diff --git 2.6.33-rc1.orig/drivers/md/dm-repl.c 2.6.33-rc1/drivers/md/dm-repl.c
new file mode 100644
index 0000000..86d1b48
--- /dev/null
+++ 2.6.33-rc1/drivers/md/dm-repl.c
@@ -0,0 +1,2004 @@
+/*
+ * Copyright (C) 2009 Red Hat, Inc. All rights reserved.
+ *
+ * Module Author: Heinz Mauelshagen <HeinzM@redhat.com>
+ *
+ * This file is released under the GPL.
+ *
+ * Remote Replication target.
+ *
+ * Features:
+ * o Logs writes to circular buffer keeping persistent state metadata.
+ * o Writes data from log synchronuously or asynchronuously
+ *   to multiple (1-N) remote replicas.
+ * o stores CRCs with metadata for integrity checks
+ * o stores versions with metadata to support future metadata migration
+ *
+ *
+ * For disk layout of backing store see dm-repl-log implementation.
+ *
+ *
+ * This file is the control module of the replication target, which
+ * controls the construction/destruction and mapping of replication
+ * mappings interfacing into seperate log and site link (transport)
+ * handler modules.
+ *
+ * That architecture allows the control module to be log *and* transport
+ * implementation agnostic.
+ */
+
+static const char version[] = "v0.028";
+
+#include "dm.h"
+#include "dm-repl.h"
+#include "dm-repl-log.h"
+#include "dm-repl-slink.h"
+
+#include <stdarg.h>
+#include <linux/dm-dirty-log.h>
+#include <linux/bio.h>
+#include <linux/blkdev.h>
+#include <linux/crc32.h>
+#include <linux/init.h>
+#include <linux/module.h>
+#include <linux/namei.h>
+#include <linux/types.h>
+#include <linux/vmalloc.h>
+#include <linux/workqueue.h>
+
+#define	DM_MSG_PREFIX	"dm-repl"
+#define	DAEMON	DM_MSG_PREFIX	"d"
+
+/* Default local device read ahead pages. */
+#define	LD_RA_PAGES_DEFAULT	8
+
+/* Factor out to dm.[ch] */
+/* Return type for name. */
+int
+dm_descr_type(const struct dm_str_descr *descr, unsigned len, const char *name)
+{
+	while (len--) {
+		if (!strncmp(STR_LEN(name, descr[len].name)))
+			return descr[len].type;
+	}
+
+	return -ENOENT;
+}
+EXPORT_SYMBOL_GPL(dm_descr_type);
+
+/* Return name for type. */
+const char *
+dm_descr_name(const struct dm_str_descr *descr, unsigned len, const int type)
+{
+	while (len--) {
+		if (type == descr[len].type)
+			return descr[len].name;
+	}
+
+	return NULL;
+}
+EXPORT_SYMBOL_GPL(dm_descr_name);
+/* END Factor out to dm.[ch] */
+
+/* Global list of replication log contexts for ctr/dtr and lock. */
+static LIST_HEAD(replog_c_list);
+static struct mutex replog_c_list_mutex;
+
+/* Statistics. */
+struct stats {
+	atomic_t io[2];
+	atomic_t submitted_io[2];
+	atomic_t congested_fn[2];
+};
+
+/* Reset statistics variables. */
+static void
+stats_reset(struct stats *stats)
+{
+	int i = 2;
+
+	while (i--) {
+		atomic_set(stats->io + i, 0);
+		atomic_set(stats->submitted_io + i, 0);
+		atomic_set(stats->congested_fn + i, 0);
+	}
+}
+
+/* Per site link context. */
+struct slink_c {
+	struct {
+		struct list_head slink_c;
+		struct list_head dc; /* List of replication device contexts. */
+	} lists;
+
+	/* Reference count (ie. number of devices on this site link) */
+	struct kref ref;
+
+	/* Slink handle. */
+	struct dm_repl_slink *slink;
+
+	/* Replog context. */
+	struct replog_c *replog_c;
+};
+
+/* Global context kept with replicator log. */
+enum replog_c_flags {
+	REPLOG_C_BLOCKED,
+	REPLOG_C_DEVEL_STATS,
+	REPLOG_C_IO_INFLIGHT
+};
+struct replog_c {
+	struct {
+		struct list_head replog_c;/* To add to global replog_c list. */
+		struct list_head slink_c; /* Site link context elements. */
+	} lists;
+
+	struct dm_target *ti;
+
+	/* Reference count (ie. # of slinks * # of devices on this replog) */
+	struct kref ref;
+
+	/* Back pointer to replication log. */
+	struct dm_repl_log *replog;
+	dev_t dev;	/* Replicator control device major:minor. */
+
+	/* Global io housekeeping on site link 0. */
+	struct repl_io {
+		unsigned long flags;	/* I/O state flags. */
+
+		struct bio_list in;	/* Pending bios (central input list).*/
+		spinlock_t in_lock;	/* Protects central input list.*/
+		atomic_t in_flight;	/* In flight io counter. */
+
+		/* IO workqueue. */
+		struct workqueue_struct *wq;
+		struct work_struct ws;
+
+		/* Statistics. */
+		struct stats stats;
+
+		/* slink+I/O teardown synchronization. */
+		wait_queue_head_t waiters;
+	} io;
+};
+DM_BITOPS(ReplBlocked, replog_c, REPLOG_C_BLOCKED);
+DM_BITOPS(ReplDevelStats, replog_c, REPLOG_C_DEVEL_STATS);
+DM_BITOPS(ReplIoInflight, replog_c, REPLOG_C_IO_INFLIGHT);
+
+/*
+ * Per device replication context kept with any mapped device and
+ * any associated remote device, which doesn't have a local mapping.
+ */
+struct device_c {
+	struct list_head list; /* To add to slink_c rc list. */
+
+	/* Local device ti (i.e. head). */
+	struct dm_target *ti;
+
+	/* replicator control device reference. */
+	struct dm_dev *replicator_dev;
+
+	/* SLINK handle. */
+	struct slink_c *slink_c;
+
+	/* This device's number. */
+	int number;
+};
+
+/* IO in flight wait qeue handling during suspension. */
+static void
+replog_c_io_get(struct replog_c *replog_c)
+{
+	SetReplIoInflight(replog_c);
+	atomic_inc(&replog_c->io.in_flight);
+}
+
+/* Drop io in flight reference. */
+static void
+replog_c_io_put(struct replog_c *replog_c)
+{
+	if (atomic_dec_and_test(&replog_c->io.in_flight)) {
+		ClearReplIoInflight(replog_c);
+		wake_up(&replog_c->io.waiters);
+	}
+}
+
+/* Get a handle on a replicator log. */
+static struct dm_repl_log *
+repl_log_ctr(const char *name, struct dm_target *ti,
+	     unsigned int argc, char **argv)
+{
+	int r;
+	struct dm_repl_log_type *type;
+	struct dm_repl_log *log;
+
+	log = kzalloc(sizeof(*log), GFP_KERNEL);
+	if (unlikely(!log))
+		return ERR_PTR(-ENOMEM);
+
+	/* Load requested replication log module. */
+	r = request_module("dm-repl-log-%s", name);
+	if (r < 0) {
+		DMERR("replication log module for \"%s\" not found", name);
+		kfree(log);
+		return ERR_PTR(-ENOENT);
+	}
+
+	type = dm_get_type(name, DM_REPLOG);
+	if (unlikely(IS_ERR(type))) {
+		DMERR("replication log registry type not found");
+		kfree(log);
+		return (struct dm_repl_log *) type;
+	}
+
+	log->ops = type;
+	r = type->ctr(log, ti, argc, argv);
+	if (unlikely(r < 0)) {
+		DMERR("%s: constructor failed", __func__);
+		dm_put_type(type, DM_REPLOG);
+		kfree(log);
+		log = ERR_PTR(r);
+	}
+
+	return log;
+}
+
+/* Put a handle on a replicator log. */
+static void
+repl_log_dtr(struct dm_repl_log *log, struct dm_target *ti)
+{
+	/* Frees log on last drop. */
+	log->ops->dtr(log, ti);
+	dm_put_type(log->ops, DM_REPLOG);
+	kfree(log);
+}
+
+/*
+ * Create/destroy a transient replicator site link on initial get/last out.
+ */
+static struct dm_repl_slink *
+repl_slink_ctr(char *name, struct dm_repl_log *replog,
+	       unsigned argc, char **argv)
+{
+	int r;
+	struct dm_repl_slink_type *type;
+	struct dm_repl_slink *slink;
+
+	slink = kzalloc(sizeof(*slink), GFP_KERNEL);
+	if (unlikely(!slink))
+		return ERR_PTR(-ENOMEM);
+
+	/* Load requested replication site link module. */
+	r = request_module("dm-repl-slink-%s", name);
+	if (r < 0) {
+		DMERR("replication slink module for \"%s\" not found", name);
+		kfree(slink);
+		return ERR_PTR(-ENOENT);
+	}
+
+	type = dm_get_type(name, DM_SLINK);
+	if (unlikely(IS_ERR(type))) {
+		DMERR("replication slink registry type not found");
+		kfree(slink);
+		return (struct dm_repl_slink *) type;
+	}
+
+	r = type->ctr(slink, replog, argc, argv);
+	if (unlikely(r < 0)) {
+		DMERR("%s: constructor failed", __func__);
+		dm_put_type(type, DM_SLINK);
+		kfree(slink);
+		return ERR_PTR(r);
+	}
+
+	slink->ops = type;
+	return slink;
+}
+
+static void
+slink_destroy(struct dm_repl_slink *slink)
+{
+	/* Frees slink on last reference drop. */
+	slink->ops->dtr(slink);
+	dm_put_type(slink->ops, DM_SLINK);
+	kfree(slink);
+}
+
+
+/* Wake worker. */
+static void do_repl(struct work_struct *ws);
+static void
+wake_do_repl(struct replog_c *replog_c)
+{
+	queue_work(replog_c->io.wq, &replog_c->io.ws);
+}
+
+/* Called from the replog in case we can queue more bios. */
+static void
+io_callback(int read_err, int write_err, void *context)
+{
+	struct replog_c *replog_c = context;
+
+	DMDEBUG_LIMIT("%s", __func__);
+	_BUG_ON_PTR(replog_c);
+	ClearReplBlocked(replog_c);
+	wake_do_repl(replog_c);
+}
+
+/* Get a reference on a replog_c by replog reference. */
+static struct replog_c *
+replog_c_get(struct replog_c *replog_c)
+{
+	kref_get(&replog_c->ref);
+	return replog_c;
+}
+
+/* Destroy replog_c object. */
+static int slink_c_put(struct slink_c *slink_c);
+static void
+replog_c_release(struct kref *ref)
+{
+	struct replog_c *replog_c = container_of(ref, struct replog_c, ref);
+
+	BUG_ON(!list_empty(&replog_c->lists.replog_c));
+	BUG_ON(!list_empty(&replog_c->lists.slink_c));
+	kfree(replog_c);
+}
+
+/* Release reference on replog_c, releasing resources on last drop. */
+static int
+replog_c_put(struct replog_c *replog_c)
+{
+	_BUG_ON_PTR(replog_c);
+	return kref_put(&replog_c->ref, replog_c_release);
+}
+
+/*
+ * Find a replog_c by replog reference in the global replog context list.
+ *
+ * Call with replog_c_list_mutex held.
+ */
+static struct replog_c *
+replog_c_get_by_dev(dev_t dev)
+{
+	struct replog_c *replog_c;
+
+	list_for_each_entry(replog_c, &replog_c_list, lists.replog_c) {
+		if (dev == replog_c->dev)
+			return replog_c_get(replog_c);
+	}
+
+	return ERR_PTR(-ENOENT);
+}
+
+/* Get replicator control device major:minor. */
+static dev_t
+get_ctrl_dev(struct dm_target *ti)
+{
+	dev_t dev;
+	struct mapped_device *md = dm_table_get_md(ti->table);
+	struct block_device *bdev = bdget_disk(dm_disk(md), 0);
+
+	dev = bdev->bd_dev;
+	bdput(bdev);
+	dm_put(md);
+	return dev;
+}
+
+/* Allocate a replication control context. */
+static struct replog_c *
+replog_c_alloc(void)
+{
+	struct replog_c *replog_c = kzalloc(sizeof(*replog_c), GFP_KERNEL);
+	struct repl_io *io;
+
+	if (unlikely(!replog_c))
+		return ERR_PTR(-ENOMEM);
+
+	io = &replog_c->io;
+
+	/* Create singlethread workqueue for this replog's io. */
+	io->wq = create_singlethread_workqueue(DAEMON);
+	if (unlikely(!io->wq)) {
+		kfree(replog_c);
+		return ERR_PTR(-ENOMEM);
+	}
+
+	kref_init(&replog_c->ref);
+	INIT_LIST_HEAD(&replog_c->lists.slink_c);
+	ClearReplDevelStats(replog_c);
+	ClearReplBlocked(replog_c);
+	spin_lock_init(&io->in_lock);
+	bio_list_init(&io->in);
+	atomic_set(&io->in_flight, 0);
+	INIT_WORK(&io->ws, do_repl);
+	stats_reset(&io->stats);
+	init_waitqueue_head(&io->waiters);
+	return replog_c;
+}
+
+/* Create replog_c context. */
+static struct replog_c *
+replog_c_create(struct dm_target *ti, struct dm_repl_log *replog)
+{
+	dev_t replicator_dev;
+	struct replog_c *replog_c, *replog_c_tmp;
+
+	/* Get replicator control device major:minor. */
+	replicator_dev = get_ctrl_dev(ti);
+
+	/* Allcate and init replog_c object. */
+	replog_c = replog_c_alloc();
+	if (IS_ERR(replog_c))
+		return replog_c;
+
+	/* Add to global replog_c list. */
+	mutex_lock(&replog_c_list_mutex);
+	replog_c_tmp = replog_c_get_by_dev(replicator_dev);
+	if (likely(IS_ERR(replog_c_tmp))) {
+		/* We won any potential race. */
+		/* Set replog global I/O callback and context. */
+		replog->ops->io_notify_fn_set(replog, io_callback,
+					      replog_c);
+		replog_c->dev = replicator_dev;
+		replog_c->ti = ti;
+		replog_c->replog = replog;
+		list_add_tail(&replog_c->lists.replog_c,
+			      &replog_c_list);
+		mutex_unlock(&replog_c_list_mutex);
+	} else {
+		/* Lost a potential race. */
+		mutex_unlock(&replog_c_list_mutex);
+
+		destroy_workqueue(replog_c->io.wq);
+		kfree(replog_c);
+		replog_c = replog_c_tmp;
+	}
+
+	return replog_c;
+}
+
+/* Find dc on slink_c list by dev_nr. */
+static struct device_c *
+device_c_find(struct slink_c *slink_c, unsigned dev_nr)
+{
+	struct device_c *dc;
+
+	list_for_each_entry(dc, &slink_c->lists.dc, list) {
+		if (dev_nr == dc->number)
+			return dc;
+	}
+
+	return ERR_PTR(-ENOENT);
+}
+
+/* Get a reference on an slink_c by slink reference. */
+static struct slink_c *
+slink_c_get(struct slink_c *slink_c)
+{
+	kref_get(&slink_c->ref);
+	return slink_c;
+}
+
+/* Find an slink_c by slink number on the replog slink list. */
+static struct slink_c *
+slink_c_get_by_number(struct replog_c *replog_c, int slink_nr)
+{
+	struct slink_c *slink_c;
+
+	list_for_each_entry(slink_c, &replog_c->lists.slink_c, lists.slink_c) {
+		int slink_nr_tmp =
+			slink_c->slink->ops->slink_number(slink_c->slink);
+
+		if (slink_nr == slink_nr_tmp)
+			return slink_c_get(slink_c);
+	}
+
+	return ERR_PTR(-ENOENT);
+}
+
+/* Site link constructor helper to create a slink_c object. */
+static struct slink_c *
+slink_c_create(struct replog_c *replog_c, struct dm_repl_slink *slink)
+{
+	int r, slink_nr = slink->ops->slink_number(slink);
+	struct slink_c *slink_c, *slink_c_tmp;
+	struct dm_repl_log *replog = replog_c->replog;
+
+	BUG_ON(slink_nr < 0);
+	DMDEBUG("%s creating slink_c for site link=%d", __func__, slink_nr);
+
+	slink_c = kzalloc(sizeof(*slink_c), GFP_KERNEL);
+	if (unlikely(!slink_c))
+		return ERR_PTR(-ENOMEM);
+
+	r = replog->ops->slink_add(replog, slink);
+	if (unlikely(r < 0)) {
+		kfree(slink_c);
+		return ERR_PTR(r);
+	}
+
+	DMDEBUG("%s added site link=%d", __func__,
+		slink->ops->slink_number(slink));
+
+	kref_init(&slink_c->ref);
+	INIT_LIST_HEAD(&slink_c->lists.dc);
+	slink_c->replog_c = replog_c;
+	slink_c->slink = slink;
+
+	/* Check creation race and add to per replog_c slink_c list. */
+	mutex_lock(&replog_c_list_mutex);
+	slink_c_tmp = slink_c_get_by_number(replog_c, slink_nr);
+	if (likely(IS_ERR(slink_c_tmp)))
+		list_add_tail(&slink_c->lists.slink_c,
+			      &replog_c->lists.slink_c);
+	else {
+		kfree(slink_c);
+		slink_c = slink_c_tmp;
+	}
+
+	mutex_unlock(&replog_c_list_mutex);
+	return slink_c;
+}
+
+/*
+ * Release reference on slink_c, removing dc from
+ * it and releasing resources on last drop.
+ */
+static void
+slink_c_release(struct kref *ref)
+{
+	struct slink_c *slink_c = container_of(ref, struct slink_c, ref);
+
+	BUG_ON(!list_empty(&slink_c->lists.dc));
+	kfree(slink_c);
+}
+
+/*
+ * Release reference on slink_c, removing dc from
+ * it and releasing resources on last drop.
+ */
+static int
+slink_c_put(struct slink_c *slink_c)
+{
+	return kref_put(&slink_c->ref, slink_c_release);
+}
+
+/* Either set ti->error or call DMERR() depending on ctr call type. */
+enum ctr_call_type { CTR_CALL, MESSAGE_CALL };
+static void
+ti_or_dmerr(enum ctr_call_type call_type, struct dm_target *ti, char *msg)
+{
+	if (call_type == CTR_CALL)
+		ti->error = msg;
+	else
+		DMERR("%s", msg);
+}
+
+/*
+ * Check, if @str is listed on variable (const char *) list of strings.
+ *
+ * Returns 1 for found on list and 0 for failure.
+ */
+static int
+str_listed(const char *str, ...)
+{
+	int r = 0;
+	const char *s;
+	va_list str_list;
+
+	va_start(str_list, str);
+
+	while ((s = va_arg(str_list, const char *))) {
+		if (!strncmp(str, s, strlen(str))) {
+			r = 1;
+			break;
+		}
+	}
+
+	va_end(str_list);
+	return r;
+}
+
+/*
+ * Worker thread.
+ *
+ * o work on all new queued bios io'ing them to the REPLOG
+ * o break out if replog reports -EWOULDBLOCK until called back
+ */
+static void
+do_repl(struct work_struct *ws)
+{
+	struct replog_c *replog_c = container_of(ws, struct replog_c, io.ws);
+	struct dm_repl_log *replog = replog_c->replog;
+	struct bio *bio;
+	struct bio_list ios;
+
+	_BUG_ON_PTR(replog);
+
+	if (ReplBlocked(replog_c))
+		return;
+
+	bio_list_init(&ios);
+
+	/* Quickly grab all (new) input bios queued. */
+	spin_lock(&replog_c->io.in_lock);
+	bio_list_merge(&ios, &replog_c->io.in);
+	bio_list_init(&replog_c->io.in);
+	spin_unlock(&replog_c->io.in_lock);
+
+	/* Work all deferred or new bios on work list. */
+	while ((bio = bio_list_pop(&ios))) {
+		int r = replog->ops->io(replog, bio, 0);
+
+		if (r == -EWOULDBLOCK) {
+			SetReplBlocked(replog_c);
+			DMDEBUG_LIMIT("%s SetReplBlocked", __func__);
+
+			/* Push non-processed bio back to the work list. */
+			bio_list_push(&ios, bio);
+
+			/*
+			 * Merge non-processed bios
+			 * back to the input list head.
+			 */
+			spin_lock(&replog_c->io.in_lock);
+			bio_list_merge_head(&replog_c->io.in, &ios);
+			spin_unlock(&replog_c->io.in_lock);
+
+			break;
+		} else
+			BUG_ON(r);
+	}
+}
+
+/* Replication congested function. */
+static int
+repl_congested(void *congested_data, int bdi_bits)
+{
+	int r;
+	struct device_c *dc = congested_data;
+	struct replog_c *replog_c;
+
+	_BUG_ON_PTR(dc);
+	_BUG_ON_PTR(dc->slink_c);
+	replog_c = dc->slink_c->replog_c;
+	_BUG_ON_PTR(replog_c);
+	r = !!ReplBlocked(replog_c);
+	atomic_inc(&replog_c->io.stats.congested_fn[r]);
+	return r;
+}
+
+/* Set backing device congested function of a local replicated device. */
+static void
+dc_set_bdi(struct device_c *dc)
+{
+	struct mapped_device *md = dm_table_get_md(dc->ti->table);
+	struct backing_dev_info *bdi = &dm_disk(md)->queue->backing_dev_info;
+
+	/* Set congested function and data. */
+	bdi->congested_fn = repl_congested;
+	bdi->congested_data = dc;
+	dm_put(md);
+}
+
+/* Get device on slink and unlink it from the list of devices. */
+static struct device_c *
+dev_get_del(struct device_c *dc, int slink_nr, struct list_head *dc_list)
+{
+	int dev_nr;
+	struct slink_c *slink_c;
+	struct dm_repl_slink *slink;
+	struct dm_repl_log *replog;
+	struct replog_c *replog_c;
+
+	_BUG_ON_PTR(dc);
+	dev_nr = dc->number;
+	BUG_ON(dev_nr < 0);
+	slink_c = dc->slink_c;
+	_BUG_ON_PTR(slink_c);
+	slink = slink_c->slink;
+	_BUG_ON_PTR(slink);
+	replog_c = slink_c->replog_c;
+	_BUG_ON_PTR(replog_c);
+	replog = replog_c->replog;
+	_BUG_ON_PTR(replog);
+
+	/* Get the slink by number. */
+	slink = slink->ops->slink(replog, slink_nr);
+	if (IS_ERR(slink))
+		return (struct device_c *) slink;
+
+	slink_c = slink_c_get_by_number(replog_c, slink_nr);
+	if (IS_ERR(slink_c))
+		return (struct device_c *) slink_c;
+
+	dc = device_c_find(slink_c, dev_nr);
+	if (IS_ERR(dc))
+		DMERR("No device %d on slink %d", dev_nr, slink_nr);
+	else
+		list_move(&dc->list, dc_list);
+
+	BUG_ON(slink_c_put(slink_c));
+	return dc;
+}
+
+/* Free device and put references. */
+static int
+dev_free_put(struct device_c *dc, int slink_nr)
+{
+	int r;
+	struct slink_c *slink_c;
+	struct dm_repl_slink *slink;
+
+	_BUG_ON_PTR(dc);
+	BUG_ON(dc->number < 0);
+	BUG_ON(slink_nr < 0);
+	slink_c = dc->slink_c;
+	_BUG_ON_PTR(slink_c);
+	slink = slink_c->slink;
+	_BUG_ON_PTR(slink);
+
+	/* Delete device from slink. */
+	r = slink->ops->dev_del(slink, dc->number);
+	if (r < 0) {
+		DMERR("Error %d deleting device %d from "
+		      "site link %d", r, dc->number, slink_nr);
+	} else
+		/* Drop reference on replicator control device. */
+		dm_put_device(dc->ti, dc->replicator_dev);
+
+	kfree(dc);
+
+	if (!r)
+		/* Drop reference on slink_c, freeing it on last one. */
+		BUG_ON(slink_c_put(slink_c));
+
+	return r;
+}
+
+/*
+ * Replication device "replicator-dev" destructor method.
+ *
+ * Either on slink0 in case slink_nr == 0 for mapped devices;
+ * the whole chain of LD + its RDs will be deleted
+ * -or-
+ * on slink > 0 in case of message interface calls (just one RD)
+ */
+static int
+_replicator_dev_dtr(struct dm_target *ti, int slink_nr)
+{
+	int r;
+	struct device_c *dc = ti->private, *dc_tmp, *dc_n;
+	struct slink_c *slink_c, *slink_c_n;
+	struct replog_c *replog_c;
+	struct dm_repl_slink *slink;
+	struct list_head dc_list;
+
+	BUG_ON(slink_nr < 0);
+	_BUG_ON_PTR(dc);
+	INIT_LIST_HEAD(&dc_list);
+	slink_c = dc->slink_c;
+	_BUG_ON_PTR(slink_c);
+	replog_c = slink_c->replog_c;
+	_BUG_ON_PTR(replog_c);
+
+	/* First pull device out on all slinks holding lock. */
+	mutex_lock(&replog_c_list_mutex);
+	/* Call from message interface wih slink_nr > 0. */
+	if (slink_nr)
+		dev_get_del(dc, slink_nr, &dc_list);
+	else {
+		/* slink number 0 -> delete LD and any RDs. */
+		list_for_each_entry_safe(slink_c, slink_c_n,
+					 &replog_c->lists.slink_c,
+					 lists.slink_c) {
+			slink = slink_c->slink;
+			_BUG_ON_PTR(slink);
+			slink_nr = slink->ops->slink_number(slink);
+			BUG_ON(slink_nr < 0);
+			dev_get_del(dc, slink_nr, &dc_list);
+		}
+	}
+
+	mutex_unlock(&replog_c_list_mutex);
+
+	r = !list_empty(&dc_list);
+
+	/* Now delete devices on pulled out list. */
+	list_for_each_entry_safe(dc_tmp, dc_n, &dc_list, list) {
+		slink = dc_tmp->slink_c->slink;
+		dev_free_put(dc_tmp, slink->ops->slink_number(slink));
+	}
+
+	ti->private = NULL;
+	return r;
+}
+
+/* Replicator device destructor. Autodestructs devices on slink > 0. */
+static void
+replicator_dev_dtr(struct dm_target *ti)
+{
+	_replicator_dev_dtr(ti, 0); /* Slink 0 device destruction. */
+}
+
+/* Construct a local/remote device. */
+/*
+ * slink_nr dev_nr dev_path dirty_log_params
+ *
+ * [0 1 /dev/mapper/local_device \	# local device being replicated
+ * nolog 0]{1..N}			# no dirty log with local devices
+ */
+#define	MIN_DEV_ARGS	5
+static int
+device_ctr(enum ctr_call_type call_type, struct dm_target *ti,
+	   struct replog_c *replog_c,
+	   const char *replicator_path, unsigned dev_nr,
+	   unsigned argc, char **argv, int *args_used)
+{
+	int dev_params, dirtylog_params, params, r, slink_nr;
+	struct dm_repl_slink *slink;	/* Site link handle. */
+	struct slink_c *slink_c;	/* Site link context. */
+	struct device_c *dc;		/* Replication device context. */
+
+	SHOW_ARGV;
+
+	if (argc < MIN_DEV_ARGS) {
+		ti_or_dmerr(call_type, ti, "Not enough device arguments");
+		return -EINVAL;
+	}
+
+	/* Get slink number. */
+	params = 0;
+	if (unlikely(sscanf(argv[params], "%d", &slink_nr) != 1 ||
+		     slink_nr < 0)) {
+		ti_or_dmerr(call_type, ti,
+			    "Invalid site link number argument");
+		return -EINVAL;
+	}
+
+	/* Get #dev_params. */
+	params++;
+	if (unlikely(sscanf(argv[params], "%d", &dev_params) != 1 ||
+		     dev_params < 0 ||
+		     dev_params  + 4 > argc)) {
+		ti_or_dmerr(call_type, ti,
+			    "Invalid device parameter number argument");
+		return -EINVAL;
+	}
+
+	/* Get #dirtylog_params. */
+	params += dev_params + 2;
+	if (unlikely(sscanf(argv[params], "%d", &dirtylog_params) != 1 ||
+		     dirtylog_params < 0 ||
+		     params + dirtylog_params + 1 > argc)) {
+		ti_or_dmerr(call_type, ti,
+			    "Invalid dirtylog parameter number argument");
+		return -EINVAL;
+	}
+
+	/* Check that all parameters are sane. */
+	params = dev_params + dirtylog_params + 3;
+	if (params > argc) {
+		ti_or_dmerr(call_type, ti,
+			    "Invalid device/dirtylog argument count");
+		return -EINVAL;
+	}
+
+	/* Get SLINK handle. */
+	mutex_lock(&replog_c_list_mutex);
+	slink_c = slink_c_get_by_number(replog_c, slink_nr);
+	mutex_unlock(&replog_c_list_mutex);
+
+	if (unlikely(IS_ERR(slink_c))) {
+		ti_or_dmerr(call_type, ti, "Cannot find site link context");
+		return -ENOENT;
+	}
+
+	slink = slink_c->slink;
+	_BUG_ON_PTR(slink);
+
+	/* Allocate replication context for new device. */
+	dc = kzalloc(sizeof(*dc), GFP_KERNEL);
+	if (unlikely(!dc)) {
+		ti_or_dmerr(call_type, ti, "Cannot allocate device context");
+		BUG_ON(slink_c_put(slink_c));
+		return -ENOMEM;
+	}
+
+	INIT_LIST_HEAD(&dc->list);
+	dc->slink_c = slink_c;
+	dc->ti = ti;
+
+	/*
+	 * Get reference on replicator control device.
+	 *
+	 * Dummy start/size sufficient here.
+	 */
+	r = dm_get_device(ti, replicator_path, 0, 1,
+			  FMODE_WRITE, &dc->replicator_dev);
+	if (unlikely(r < 0)) {
+		ti_or_dmerr(call_type, ti,
+			    "Can't access replicator control device");
+		goto err_slink_put;
+	}
+
+	/* Add device to slink. */
+	/*
+	 * ti->split_io for all local devices must be set
+	 * to the unique region_size of the remote devices.
+	 */
+	r = slink->ops->dev_add(slink, dev_nr, ti, params, argv + 1);
+	if (unlikely(r < 0)) {
+		ti_or_dmerr(call_type, ti, r == -EEXIST ?
+			"device already in use on site link" :
+			"Failed to add device to site link");
+		goto err_device_put;
+	}
+
+	dc->number = r;
+
+	/* Only set bdi properties on local devices. */
+	if (!slink_nr) {
+		/* Preset, will be set to region size in the slink code. */
+		ti->split_io = DM_REPL_MIN_SPLIT_IO;
+
+		/*
+		 * Init ti reference on slink0 devices only,
+		 * because they only have a local mapping!
+		 */
+		ti->private = dc;
+		dc_set_bdi(dc);
+	}
+
+	/* Add rc to slink_c list. */
+	mutex_lock(&replog_c_list_mutex);
+	list_add_tail(&dc->list, &slink_c->lists.dc);
+	mutex_unlock(&replog_c_list_mutex);
+
+	*args_used = dev_params + dirtylog_params + 4;
+	DMDEBUG("%s added device=%d to site link=%u", __func__,
+		r, slink->ops->slink_number(slink));
+	return 0;
+
+err_device_put:
+	dm_put_device(ti, dc->replicator_dev);
+err_slink_put:
+	BUG_ON(slink_c_put(slink_c));
+	kfree(dc);
+	return r;
+}
+
+/*
+ * Replication device "replicator-dev" constructor method.
+ *
+ * <start> <length> replicator-dev
+ *         <replicator_device> <dev_nr>		\
+ *         [<slink_nr> <#dev_params> <dev_params>
+ *          <dlog_type> <#dlog_params> <dlog_params>]{1..N}
+ *
+ * <replicator_device> = device previously constructed via "replication" target
+ * <dev_nr>	    = An integer that is used to 'tag' write requests as
+ *		      belonging to a particular set of devices - specifically,
+ *		      the devices that follow this argument (i.e. the site
+ *		      link devices).
+ * <slink_nr>	    = This number identifies the site/location where the next
+ *		      device to be specified comes from.  It is exactly the
+ *		      same number used to identify the site/location (and its
+ *		      policies) in the "replicator" target.  Interestingly,
+ *		      while one might normally expect a "dev_type" argument
+ *		      here, it can be deduced from the site link number and
+ *		      the 'slink_type' given in the "replication" target.
+ * <#dev_params>    = '1'  (The number of allowed parameters actually depends
+ *		      on the 'slink_type' given in the "replication" target.
+ *		      Since our only option there is "blockdev", the only
+ *		      allowable number here is currently '1'.)
+ * <dev_params>	    = 'dev_path'  (Again, since "blockdev" is the only
+ *		      'slink_type' available, the only allowable argument here
+ *		      is the path to the device.)
+ * <dlog_type>	    = Not to be confused with the "replicator log", this is
+ *		      the type of dirty log associated with this particular
+ *		      device.  Dirty logs are used for synchronization, during
+ *		      initialization or fall behind conditions, to bring devices
+ *		      into a coherent state with its peers - analogous to
+ *		      rebuilding a RAID1 (mirror) device.  Available dirty
+ *		      log types include: 'nolog', 'core', and 'disk'
+ * <#dlog_params>   = The number of arguments required for a particular log
+ *		      type - 'nolog' = 0, 'core' = 1/2, 'disk' = 2/3.
+ * <dlog_params>    = 'nolog' => ~no arguments~
+ *		      'core'  => <region_size> [sync | nosync]
+ *		      'disk'  => <dlog_dev_path> <region_size> [sync | nosync]
+ *	<region_size>   = This sets the granularity at which the dirty log
+ *			  tracks what areas of the device is in-sync.
+ *	[sync | nosync] = Optionally specify whether the sync should be forced
+ *			  or avoided initially.
+ */
+#define LOG_ARGS 2
+#define DEV_MIN_ARGS 5
+static int
+_replicator_dev_ctr(enum ctr_call_type call_type, struct dm_target *ti,
+		    unsigned argc, char **argv)
+{
+	int args_used, r, tmp;
+	unsigned dev_nr;
+	char *replicator_path = argv[0];
+	struct dm_dev *ctrl_dev;
+	struct replog_c *replog_c;
+
+	SHOW_ARGV;
+
+	if (argc < LOG_ARGS + DEV_MIN_ARGS)
+		goto err_args;
+
+	/*
+	 * Get reference on replicator control device.
+	 *
+	 * Dummy start/size sufficient here.
+	 */
+	r = dm_get_device(ti, replicator_path, 0, 1, FMODE_WRITE, &ctrl_dev);
+	if (unlikely(r < 0)) {
+		ti_or_dmerr(CTR_CALL, ti,
+			    "Can't access replicator control device");
+		return r;
+	}
+
+	if (sscanf(argv[1], "%d", &tmp) != 1 ||
+	    tmp < 0) {
+		dm_put_device(ti, ctrl_dev);
+		ti_or_dmerr(call_type, ti, "Invalid device number argument");
+		return -EINVAL;
+	}
+
+	dev_nr = tmp;
+
+	/* Find precreated replog context by device, taking out a reference. */
+	mutex_lock(&replog_c_list_mutex);
+	replog_c = replog_c_get_by_dev(ctrl_dev->bdev->bd_dev);
+	mutex_unlock(&replog_c_list_mutex);
+
+	if (unlikely(IS_ERR(replog_c))) {
+		dm_put_device(ti, ctrl_dev);
+		ti_or_dmerr(call_type, ti, "Failed to find replication log");
+		return PTR_ERR(replog_c);
+	}
+
+	_BUG_ON_PTR(replog_c->replog);
+	argc -= LOG_ARGS;
+	argv += LOG_ARGS;
+
+	/*
+	 * Iterate all slinks/rds if multiple device/dirty
+	 * log tuples present on mapping table line.
+	 */
+	while (argc >= DEV_MIN_ARGS) {
+		/* Create slink+device context. */
+		r = device_ctr(call_type, ti, replog_c, replicator_path,
+			       dev_nr, argc, argv, &args_used);
+		if (unlikely(r))
+			goto device_ctr_err;
+
+		BUG_ON(args_used > argc);
+		argc -= args_used;
+		argv += args_used;
+	}
+
+	/* All arguments consumed? */
+	if (argc) {
+		r = -EINVAL;
+		goto invalid_args;
+	}
+
+	/* Drop initially taken replog reference. */
+	BUG_ON(replog_c_put(replog_c));
+	dm_put_device(ti, ctrl_dev);
+	return 0;
+
+invalid_args:
+	ti_or_dmerr(call_type, ti, "Invalid device arguments");
+device_ctr_err:
+	/* Drop the initially taken replog reference. */
+	BUG_ON(replog_c_put(replog_c));
+	dm_put_device(ti, ctrl_dev);
+
+	/* If we get an error in ctr -> tear down. */
+	if (call_type == CTR_CALL)
+		replicator_dev_dtr(ti);
+
+	return r;
+
+err_args:
+	ti_or_dmerr(call_type, ti, "Not enough device arguments");
+	return -EINVAL;
+}
+
+/* Constructor method. */
+static int
+replicator_dev_ctr(struct dm_target *ti, unsigned argc, char **argv)
+{
+	return _replicator_dev_ctr(CTR_CALL, ti, argc, argv);
+}
+
+/* Device flush method. */
+static void
+replicator_dev_flush(struct dm_target *ti)
+{
+	struct device_c *dc = ti->private;
+	struct dm_repl_log *replog;
+
+	_BUG_ON_PTR(dc);
+	_BUG_ON_PTR(dc->slink_c);
+	_BUG_ON_PTR(dc->slink_c->replog_c);
+	replog = dc->slink_c->replog_c->replog;
+	_BUG_ON_PTR(replog);
+	BUG_ON(!replog->ops->flush);
+	replog->ops->flush(replog);
+}
+
+/* Queues bios to the cache and wakes up worker thread. */
+static inline void
+queue_bio(struct device_c *dc, struct bio *bio)
+{
+	struct replog_c *replog_c = dc->slink_c->replog_c;
+
+	atomic_inc(replog_c->io.stats.io + !!(bio_data_dir(bio) == WRITE));
+
+	spin_lock(&replog_c->io.in_lock);
+	bio_list_add(&replog_c->io.in, bio);
+	replog_c_io_get(replog_c);
+	spin_unlock(&replog_c->io.in_lock);
+
+	/* Wakeup worker to deal with bio input list. */
+	wake_do_repl(replog_c);
+}
+
+/*
+ * Map a replicated device io by handling it in the worker
+ * thread in order to avoid delays in the fast path.
+ */
+static int
+replicator_dev_map(struct dm_target *ti, struct bio *bio,
+		   union map_info *map_context)
+{
+	map_context->ptr = bio->bi_private;
+	bio->bi_sector -= ti->begin;	/* Remap sector to target begin. */
+	queue_bio(ti->private, bio);	/* Queue bio to the worker. */
+	return DM_MAPIO_SUBMITTED;	/* Handle later. */
+}
+
+
+/* Replication device suspend/resume helper. */
+enum suspend_resume_type { POSTSUSPEND, RESUME };
+static void
+_replicator_dev_suspend_resume(struct dm_target *ti,
+			       enum suspend_resume_type type)
+{
+	struct device_c *dc = ti->private;
+	struct replog_c *replog_c;
+	struct slink_c *slink_c, *n;
+	int dev_nr = dc->number, slinks = 0;
+
+	DMDEBUG("%s %s", __func__, type == RESUME ? "resume" : "postsusend");
+	_BUG_ON_PTR(dc);
+	_BUG_ON_PTR(dc->slink_c);
+	replog_c = dc->slink_c->replog_c;
+	_BUG_ON_PTR(replog_c);
+	BUG_ON(dev_nr < 0);
+
+	/* Suspend/resume device on all slinks. */
+	list_for_each_entry_safe(slink_c, n, &replog_c->lists.slink_c,
+				 lists.slink_c) {
+		int r;
+		struct dm_repl_slink *slink = slink_c->slink;
+
+		_BUG_ON_PTR(slink);
+
+		r = type == RESUME ?
+			slink->ops->resume(slink, dev_nr) :
+			slink->ops->postsuspend(slink, dev_nr);
+		if (r < 0)
+			DMERR("Error %d %s device=%d on site link %u",
+			      r, type == RESUME ?
+			      "resuming" : "postsuspending",
+			      dev_nr, slink->ops->slink_number(slink));
+		else
+			slinks++;
+	}
+
+	if (type == RESUME && slinks)
+		wake_do_repl(replog_c);
+}
+
+/* Replication device post suspend method. */
+static void
+replicator_dev_postsuspend(struct dm_target *ti)
+{
+	_replicator_dev_suspend_resume(ti, POSTSUSPEND);
+}
+
+/* Replicatin device resume method. */
+static void
+replicator_dev_resume(struct dm_target *ti)
+{
+	_replicator_dev_suspend_resume(ti, RESUME);
+}
+
+/* Pass endio calls down to the replicator log if requested. */
+static int
+replicator_dev_endio(struct dm_target *ti, struct bio *bio,
+		     int error, union map_info *map_context)
+{
+	int rr, rs;
+	struct device_c *dc = ti->private;
+	struct replog_c *replog_c;
+	struct dm_repl_log *replog;
+	struct dm_repl_slink *slink;
+
+	_BUG_ON_PTR(dc);
+	_BUG_ON_PTR(dc->slink_c);
+	slink = dc->slink_c->slink;
+	replog_c = dc->slink_c->replog_c;
+	_BUG_ON_PTR(replog_c);
+	replog = dc->slink_c->replog_c->replog;
+	_BUG_ON_PTR(replog);
+
+	rr = replog->ops->endio ?
+	     replog->ops->endio(replog, bio, error, map_context) : 0;
+	rs = slink->ops->endio ?
+	     slink->ops->endio(slink, bio, error, map_context) : 0;
+	replog_c_io_put(replog_c);
+	return rs < 0 ? rs : rr;
+}
+
+/*
+ * Replication device message method.
+ *
+ * Arguments:
+ * device add/del \
+ * 63:4 0 \		# replication log on 63:4 and device number '0'
+ * [0 1 /dev/mapper/local_device \	# local device being replicated
+ * nolog 0]{1..N}			# no dirty log with local devices
+ *
+ * start/resume all/device		# Resume whole replicator/
+ * 					# a single device
+ */
+static int
+replicator_dev_message(struct dm_target *ti, unsigned argc, char **argv)
+{
+	int slink_nr;
+	struct device_c *dc = ti->private;
+	struct replog_c *replog_c;
+	struct dm_repl_log *replog;
+
+	SHOW_ARGV;
+
+	_BUG_ON_PTR(dc);
+	_BUG_ON_PTR(dc->slink_c);
+	replog_c = dc->slink_c->replog_c;
+	_BUG_ON_PTR(replog_c);
+	replog = dc->slink_c->replog_c->replog;
+	_BUG_ON_PTR(replog);
+
+	/* Check minimum arguments. */
+	if (unlikely(argc < 1))
+		goto err_args;
+
+	/* Add/delete a device to/from a site link. */
+	if (str_listed(argv[0], "device", NULL)) {
+		if (argc < 2)
+			goto err_args;
+
+		/* We've got the target index of an SLINK0 device here. */
+		if (str_listed(argv[1], "add", NULL))
+			return _replicator_dev_ctr(MESSAGE_CALL, ti,
+						   argc - 2, argv + 2);
+		else if (str_listed(argv[1], "del", NULL)) {
+			if (argc < 3)
+				goto err_args;
+
+			if (sscanf(argv[2], "%d", &slink_nr) != 1 ||
+			    slink_nr < 1)
+				DM_EINVAL("invalid site link number "
+					  "argument; must be > 0");
+
+			return _replicator_dev_dtr(ti, slink_nr);
+		} else
+			DM_EINVAL("invalid device command argument");
+
+	/* Start replication on single device on all slinks. */
+	} else if (str_listed(argv[0], "start", "resume", NULL))
+		replicator_dev_resume(ti);
+
+	/* Stop replication for single device on all slinks. */
+	else if (str_listed(argv[0], "stop", "suspend", "postsuspend", NULL))
+		replicator_dev_postsuspend(ti);
+	else
+		DM_EINVAL("invalid message command");
+
+	return 0;
+
+err_args:
+	DM_EINVAL("too few message arguments");
+}
+
+/* Replication device status output method. */
+static int
+replicator_dev_status(struct dm_target *ti, status_type_t type,
+		      char *result, unsigned maxlen)
+{
+	ssize_t sz = 0;
+	static char buffer[2048];
+	struct device_c *dc = ti->private;
+	struct replog_c *replog_c;
+	struct dm_repl_slink *slink;
+
+	mutex_lock(&replog_c_list_mutex);
+	_BUG_ON_PTR(dc);
+	_BUG_ON_PTR(dc->slink_c);
+	slink = dc->slink_c->slink;
+	_BUG_ON_PTR(slink);
+	replog_c = dc->slink_c->replog_c;
+	_BUG_ON_PTR(replog_c);
+
+	DMEMIT("%s %d ", format_dev_t(buffer, replog_c->dev), dc->number);
+	mutex_unlock(&replog_c_list_mutex);
+	slink->ops->status(slink, dc->number, type, buffer, sizeof(buffer));
+	DMEMIT("%s", buffer);
+	return 0;
+}
+
+/* Replicator device interface. */
+static struct target_type replicator_dev_target = {
+	.name = "replicator-dev",
+	.version = {1, 0, 0},
+	.module = THIS_MODULE,
+	.ctr = replicator_dev_ctr,
+	.dtr = replicator_dev_dtr,
+	.flush = replicator_dev_flush,
+	.map = replicator_dev_map,
+	.postsuspend = replicator_dev_postsuspend,
+	.resume = replicator_dev_resume,
+	.end_io = replicator_dev_endio,
+	.message = replicator_dev_message,
+	.status = replicator_dev_status,
+};
+
+
+/*
+ * Replication log destructor.
+ */
+static void
+replicator_dtr(struct dm_target *ti)
+{
+	int r, slink_nr;
+	struct replog_c *replog_c = ti->private;
+	struct dm_repl_log *replog;
+	struct slink_c *slink_c, *n;
+	struct dm_repl_slink *slink;
+
+	_BUG_ON_PTR(replog_c);
+	replog = replog_c->replog;
+	_BUG_ON_PTR(replog);
+
+	/* Pull out replog_c to process destruction cleanly. */
+	mutex_lock(&replog_c_list_mutex);
+	list_del_init(&replog_c->lists.replog_c);
+	mutex_unlock(&replog_c_list_mutex);
+
+	/* Put all replog's slink contexts. */
+	list_for_each_entry_safe(slink_c, n, &replog_c->lists.slink_c,
+				 lists.slink_c) {
+		list_del_init(&slink_c->lists.slink_c);
+		slink = slink_c->slink;
+		_BUG_ON_PTR(slink);
+		slink_nr = slink->ops->slink_number(slink);
+		r = replog->ops->slink_del(replog, slink);
+		BUG_ON(r < 0);
+		slink_destroy(slink);
+		BUG_ON(replog_c_put(replog_c));
+		BUG_ON(!slink_c_put(slink_c));
+	}
+
+	/* Drop work queue. */
+	destroy_workqueue(replog_c->io.wq);
+
+	/* Drop reference on replog. */
+	repl_log_dtr(replog_c->replog, replog_c->ti);
+
+	BUG_ON(!replog_c_put(replog_c));
+}
+
+/*
+ * Replication constructor helpers.
+ */
+/* Create a site link tying it to the replication log. */
+/*
+ * E.g.: "local 4 1 async ios 10000"
+ */
+#define	MIN_SLINK_ARGS	3
+static int
+_replicator_slink_ctr(enum ctr_call_type call_type, struct dm_target *ti,
+		      struct replog_c *replog_c,
+		      unsigned argc, char **argv, unsigned *args_used)
+{
+	int first_slink, slink_nr, slink_params;
+	struct dm_repl_slink *slink;	/* Site link handle. */
+	struct slink_c *slink_c;	/* Site link context. */
+
+	SHOW_ARGV;
+
+	if (argc < MIN_SLINK_ARGS)
+		return -EINVAL;
+
+	/* Get #slink_params. */
+	if (unlikely(sscanf(argv[1], "%d", &slink_params) != 1 ||
+		     slink_params < 0 ||
+		     slink_params + 2 > argc)) {
+		ti_or_dmerr(call_type, ti,
+			   "Invalid site link parameter number argument");
+		return -EINVAL;
+	}
+
+	/* Get slink #. */
+	if (unlikely(sscanf(argv[2], "%d", &slink_nr) != 1 ||
+		     slink_nr < 0)) {
+		ti_or_dmerr(call_type, ti,
+			   "Invalid site link number argument");
+		return -EINVAL;
+	}
+
+	/* Check first slink is slink 0. */
+	mutex_lock(&replog_c_list_mutex);
+	first_slink = !list_first_entry(&replog_c->lists.slink_c,
+					struct slink_c, lists.slink_c);
+	if (first_slink && slink_nr) {
+		mutex_unlock(&replog_c_list_mutex);
+		ti_or_dmerr(call_type, ti, "First site link must be 0");
+		return -EINVAL;
+	}
+
+	slink_c = slink_c_get_by_number(replog_c, slink_nr);
+	mutex_unlock(&replog_c_list_mutex);
+
+	if (!IS_ERR(slink_c)) {
+		ti_or_dmerr(call_type, ti, "slink already existing");
+		BUG_ON(slink_c_put(slink_c));
+		return -EPERM;
+	}
+
+	/* Get SLINK handle. */
+	slink = repl_slink_ctr(argv[0], replog_c->replog,
+			       slink_params + 1, argv + 1);
+	if (unlikely(IS_ERR(slink))) {
+		ti_or_dmerr(call_type, ti, "Cannot create site link context");
+		return PTR_ERR(slink);
+	}
+
+	slink_c = slink_c_create(replog_c, slink);
+	if (unlikely(IS_ERR(slink_c))) {
+		ti_or_dmerr(call_type, ti, "Cannot allocate site link context");
+		slink_destroy(slink);
+		return PTR_ERR(slink_c);
+	}
+
+	*args_used = slink_params + 2;
+	DMDEBUG("%s added site link=%d", __func__, slink_nr);
+	return 0;
+}
+
+/*
+ * Construct a replicator mapping to log writes of one or more local mapped
+ * devices in a local replicator log (REPLOG) in order to replicate them to
+ * one or multiple site links (SLINKs) while ensuring write order fidelity.
+ *
+ *******************************
+ *
+ * "replicator" constructor table:
+ *
+ * <start> <length> replicator \
+ *	<replog_type> <#replog_params> <replog_params> \
+ *	[<slink_type_0> <#slink_params_0> <slink_params_0>]{1..N}
+ *
+ * <replog_type>    = "ringbuffer" is currently the only available type
+ * <#replog_params> = # of args intended for the replog (2 or 4)
+ * <replog_params>  = <dev_path> <dev_start> [auto/create/open <size>]
+ *	<dev_path>  = device path of replication log (REPLOG) backing store
+ *	<dev_start> = offset to REPLOG header
+ *	create	    = The replication log will be initialized if not active
+ *		      and sized to "size".  (If already active, the create
+ *		      will fail.)  Size is always in sectors.
+ *	open	    = The replication log must be initialized and valid or
+ *		      the constructor will fail.
+ *	auto        = If a valid replication log header is found on the
+ *		      replication device, this will behave like 'open'.
+ *		      Otherwise, this option behaves like 'create'.
+ *
+ * <slink_type>    = "blockdev" is currently the only available type
+ * <#slink_params> = 1/2/4
+ * <slink_params>  = <slink_nr> [<slink_policy> [<fall_behind> <N>]]
+ *	<slink_nr>     = This is a unique number that is used to identify a
+ *			 particular site/location.  '0' is always used to
+ *			 identify the local site, while increasing integers
+ *			 are used to identify remote sites.
+ *	<slink_policy> = The policy can be either 'sync' or 'async'.
+ *			 'sync' means write requests will not return until
+ *			 the data is on the storage device.  'async' allows
+ *			 a device to "fall behind"; that is, outstanding
+ *			 write requests are waiting in the replication log
+ *			 to be processed for this site, but it is not delaying
+ *			 the writes of other sites.
+ *	<fall_behind>  = This field is used to specify how far the user is
+ *			 willing to allow write requests to this specific site
+ *			 to "fall behind" in processing before switching to
+ *			 a 'sync' policy.  This "fall behind" threshhold can
+ *			 be specified in three ways: ios, size, or timeout.
+ *			 'ios' is the number of pending I/Os allowed (e.g.
+ *			 "ios 10000").  'size' is the amount of pending data
+ *			 allowed (e.g. "size 200m").  Size labels include:
+ *			 s (sectors), k, m, g, t, p, and e.  'timeout' is
+ *			 the amount of time allowed for writes to be
+ *			 outstanding.  Time labels include: s, m, h, and d.
+ */
+#define	MIN_CONTROL_ARGS	3
+static int
+replicator_ctr(struct dm_target *ti, unsigned argc, char **argv)
+{
+	int args_used = 0, params, r;
+	struct dm_dev *backing_dev;
+	struct dm_repl_log *replog;	/* Replicator log handle. */
+	struct replog_c *replog_c;	/* Replication log context. */
+
+	SHOW_ARGV;
+
+	if (unlikely(argc < MIN_CONTROL_ARGS)) {
+		ti->error = "Invalid argument count";
+		return -EINVAL;
+	}
+
+	/* Get # of replog params. */
+	if (unlikely(sscanf(argv[1], "%d", &params) != 1 ||
+		     params < 2 ||
+		     params + 3 > argc)) {
+		ti->error = "Invalid replicator log parameter number";
+		return -EINVAL;
+	}
+
+	/* Check for site link 0 parameter count. */
+	if (params + 4 > argc) {
+		ti->error = "Invalid replicator site link parameter number";
+		return -EINVAL;
+	}
+
+	/*
+	 * Get reference on replicator control device.
+	 *
+	 * Dummy start/size sufficient here.
+	 */
+	r = dm_get_device(ti, argv[2], 0, 1, FMODE_WRITE, &backing_dev);
+	if (unlikely(r < 0)) {
+		ti->error = "Can't access replicator control device";
+		return r;
+	}
+
+
+	/* Lookup replog_c by dev_t. */
+	mutex_lock(&replog_c_list_mutex);
+	replog_c = replog_c_get_by_dev(backing_dev->bdev->bd_dev);
+	mutex_unlock(&replog_c_list_mutex);
+
+	if (unlikely(!IS_ERR(replog_c))) {
+		BUG_ON(replog_c_put(replog_c));
+		dm_put_device(ti, backing_dev);
+		ti->error = "Recreating replication log prohibited";
+		return -EPERM;
+	}
+
+	/* Get a reference on the replication log. */
+	replog = repl_log_ctr(argv[0], ti, params, argv + 1);
+	dm_put_device(ti, backing_dev);
+	if (unlikely(IS_ERR(replog))) {
+		ti->error = "Cannot create replication log context";
+		return PTR_ERR(replog);
+	}
+
+	_BUG_ON_PTR(replog->ops->postsuspend);
+	_BUG_ON_PTR(replog->ops->resume);
+
+	/* Create global replication control context. */
+	replog_c = replog_c_create(ti, replog);
+	if (unlikely(IS_ERR(replog_c))) {
+		ti->error = "Cannot allocate replication device context";
+		return PTR_ERR(replog_c);
+	} else
+		ti->private = replog_c;
+
+	/* Work any slink parameter tupels. */
+	params += 2;
+	BUG_ON(argc < params);
+	argc -= params;
+	argv += params;
+	r = 0;
+
+	while (argc > 0) {
+		r = _replicator_slink_ctr(CTR_CALL, ti, replog_c,
+					  argc, argv, &args_used);
+		if (r)
+			break;
+
+		/* Take per site link reference out. */
+		replog_c_get(replog_c);
+
+		BUG_ON(argc < args_used);
+		argc -= args_used;
+		argv += args_used;
+	}
+
+	return r;
+}
+
+/*
+ * Replication log map function.
+ *
+ * No io to replication log device allowed: ignore it
+ * by returning zeroes on read and ignoring writes silently.
+ */
+static int
+replicator_map(struct dm_target *ti, struct bio *bio,
+	       union map_info *map_context)
+{
+	/* Readahead of null bytes only wastes buffer cache. */
+	if (bio_rw(bio) == READA)
+		return -EIO;
+	else if (bio_rw(bio) == READ)
+		zero_fill_bio(bio);
+
+	bio_endio(bio, 0);
+	return DM_MAPIO_SUBMITTED; /* Accepted bio, don't make new request. */
+}
+
+/* Replication log suspend/resume helper. */
+static void
+_replicator_suspend_resume(struct replog_c *replog_c,
+			   enum suspend_resume_type type)
+{
+	struct dm_repl_log *replog;
+
+	DMDEBUG("%s %s", __func__, type == RESUME ? "resume" : "postsusend");
+	_BUG_ON_PTR(replog_c);
+	replog = replog_c->replog;
+	_BUG_ON_PTR(replog);
+
+	/* FIXME: device number not utilized yet. */
+	switch (type) {
+	case POSTSUSPEND:
+		ClearReplBlocked(replog_c);
+		flush_workqueue(replog_c->io.wq);
+		wait_event(replog_c->io.waiters, !ReplIoInflight(replog_c));
+		replog->ops->postsuspend(replog, -1);
+		break;
+	case RESUME:
+		replog->ops->resume(replog, -1);
+		ClearReplBlocked(replog_c);
+		wake_do_repl(replog_c);
+		break;
+	default:
+		BUG();
+	}
+}
+
+
+/* Suspend/Resume all. */
+static void
+_replicator_suspend_resume_all(struct replog_c *replog_c,
+			       enum suspend_resume_type type)
+{
+	struct device_c *dc;
+	struct slink_c *slink_c0;
+
+	_BUG_ON_PTR(replog_c);
+
+	/* First entry on replog_c slink_c list is slink0. */
+	slink_c0 = list_first_entry(&replog_c->lists.slink_c,
+				    struct slink_c, lists.slink_c);
+	_BUG_ON_PTR(slink_c0);
+
+	/* Walk all slink device_c dc and resume slinks. */
+	if (type == RESUME)
+		list_for_each_entry(dc, &slink_c0->lists.dc, list)
+			_replicator_dev_suspend_resume(dc->ti, type);
+
+	_replicator_suspend_resume(replog_c, type);
+
+	/* Walk all slink device_c dc and resume slinks. */
+	if (type == POSTSUSPEND)
+		list_for_each_entry(dc, &slink_c0->lists.dc, list)
+			_replicator_dev_suspend_resume(dc->ti, type);
+}
+
+/* Replication control post suspend method. */
+static void
+replicator_postsuspend(struct dm_target *ti)
+{
+	_replicator_suspend_resume(ti->private, POSTSUSPEND);
+}
+
+/* Replication control resume method. */
+static void
+replicator_resume(struct dm_target *ti)
+{
+	_replicator_suspend_resume(ti->private, RESUME);
+}
+
+/*
+ * Replication log message method.
+ *
+ * Arguments: start/resume/stop/suspend/statistics/replog
+ */
+static int
+_replicator_slink_message(struct dm_target *ti, int argc, char **argv)
+{
+	int args_used, r, tmp;
+	unsigned slink_nr;
+	struct replog_c *replog_c = ti->private;
+	struct dm_repl_slink *slink;
+	struct slink_c *slink_c;
+
+	if (sscanf(argv[2], "%d", &tmp) != 1 ||	tmp < 1)
+		DM_EINVAL("site link number invalid");
+
+	slink_nr = tmp;
+
+	if (str_listed(argv[1], "add", "del", NULL) &&
+	    !slink_nr)
+		DM_EPERM("Can't add/delete site link 0 via message");
+
+	mutex_lock(&replog_c_list_mutex);
+	slink_c = slink_c_get_by_number(replog_c, slink_nr);
+	mutex_unlock(&replog_c_list_mutex);
+
+	if (str_listed(argv[1], "add", NULL)) {
+		if (IS_ERR(slink_c)) {
+			r = _replicator_slink_ctr(MESSAGE_CALL, ti,
+						 replog_c,
+						  argc - 2, argv + 2,
+						  &args_used);
+			if (r)
+				DMERR("Error creating site link");
+
+			return r;
+		} else {
+			BUG_ON(slink_c_put(slink_c));
+			DM_EPERM("site link already exists");
+		}
+	} else if (str_listed(argv[1], "del", NULL)) {
+		if (IS_ERR(slink_c))
+			DM_EPERM("site link doesn't exist");
+		else {
+			if (!list_empty(&slink_c->lists.dc)) {
+				slink_c_put(slink_c);
+				DM_EPERM("site link still has devices");
+			}
+
+			slink_c_put(slink_c);
+			r = slink_c_put(slink_c);
+			if (!r)
+				DMERR("site link still exists (race)!");
+
+			return r;
+		}
+	} else if (str_listed(argv[1], "message", NULL)) {
+		slink = slink_c->slink;
+		_BUG_ON_PTR(slink);
+
+		if (slink->ops->message)
+			return slink->ops->message(slink,
+						   argc - 2, argv + 2);
+		else
+			DM_EPERM("no site link message interface");
+	}
+
+	return r;
+}
+
+static int
+replicator_message(struct dm_target *ti, unsigned argc, char **argv)
+{
+	int r, resume, suspend;
+	struct replog_c *replog_c = ti->private;
+	struct dm_repl_log *replog;
+
+	SHOW_ARGV;
+	_BUG_ON_PTR(replog_c);
+	replog = replog_c->replog;
+	_BUG_ON_PTR(replog);
+
+	/* Check minimum arguments. */
+	if (unlikely(argc < 1))
+		goto err_args;
+
+	resume  = str_listed(argv[0], "resume", "start", NULL);
+	/* Hrm, bogus: need a NULL end arg to make it work!? */
+	suspend = !resume &&
+		  str_listed(argv[0], "suspend", "postsuspend", "stop", NULL);
+
+	/*
+	 * Start/resume replicaton log or
+	 * start/resume it and all slinks+devices.
+	 */
+	if (suspend || resume) {
+		int all;
+
+		if (!range_ok(argc, 1, 2)) {
+			DMERR("Invalid suspend/resume argument count");
+			return -EINVAL;
+		}
+
+		all = (argc == 2 && str_listed(argv[1], "all", NULL));
+
+		if (resume) {
+			if (all)
+				_replicator_suspend_resume_all(replog_c,
+							       RESUME);
+			else
+				_replicator_suspend_resume(replog_c,
+							   RESUME);
+
+		/* Stop replication log. */
+		} else  {
+			if (all) {
+				_replicator_suspend_resume_all(replog_c,
+							       POSTSUSPEND);
+			} else
+				_replicator_suspend_resume(replog_c,
+							   POSTSUSPEND);
+		}
+
+	/* Site link message. */
+	} else if (str_listed(argv[0], "slink", NULL)) {
+		/* E.g.: "local 4 1 async ios 10000" */
+		/* Check minimum arguments. */
+		if (unlikely(argc < 3))
+			goto err_args;
+
+		r = _replicator_slink_message(ti, argc, argv);
+		if (r)
+			return r;
+	/* Statistics. */
+	} else if (str_listed(argv[0], "statistics", NULL)) {
+		if (argc != 2)
+			DM_EINVAL("too many message arguments");
+
+		_BUG_ON_PTR(replog_c);
+		if (str_listed(argv[1], "on", NULL))
+			SetReplDevelStats(replog_c);
+		else if (str_listed(argv[1], "off", NULL))
+			ClearReplDevelStats(replog_c);
+		else if (str_listed(argv[1], "reset", NULL))
+			stats_reset(&replog_c->io.stats);
+
+	/* Replication log message. */
+	} else if (str_listed(argv[0], "replog", NULL)) {
+		if (argc < 2)
+			goto err_args;
+
+		if (replog->ops->message)
+			return replog->ops->message(replog, argc - 1, argv + 1);
+		else
+			DM_EPERM("no replication log message interface");
+	} else
+		DM_EINVAL("invalid message received");
+
+	return 0;
+
+err_args:
+	DM_EINVAL("too few message arguments");
+}
+
+/* Replication log status output method. */
+static int
+replicator_status(struct dm_target *ti, status_type_t type,
+		    char *result, unsigned maxlen)
+{
+	unsigned dev_nr = 0;
+	ssize_t sz = 0;
+	static char buffer[2048];
+	struct replog_c *replog_c = ti->private;
+	struct dm_repl_log *replog;
+	struct slink_c *slink_c0;
+	struct dm_repl_slink *slink;
+
+	mutex_lock(&replog_c_list_mutex);
+	_BUG_ON_PTR(replog_c);
+	replog = replog_c->replog;
+	_BUG_ON_PTR(replog);
+
+	if (type == STATUSTYPE_INFO) {
+		if (ReplDevelStats(replog_c)) {
+			struct stats *s = &replog_c->io.stats;
+
+			DMEMIT("v=%s r=%u w=%u rs=%u "
+			       "ws=%u nc=%u c=%u ",
+			       version,
+			       atomic_read(s->io), atomic_read(s->io + 1),
+			       atomic_read(s->submitted_io),
+			       atomic_read(s->submitted_io + 1),
+			       atomic_read(s->congested_fn),
+			       atomic_read(s->congested_fn + 1));
+		}
+	}
+
+	mutex_unlock(&replog_c_list_mutex);
+
+	/* Get status from replog. */
+	/* FIXME: dev_nr superfluous? */
+	replog->ops->status(replog, dev_nr, type, buffer, sizeof(buffer));
+	DMEMIT("%s", buffer);
+
+	slink_c0 = list_first_entry(&replog_c->lists.slink_c,
+				    struct slink_c, lists.slink_c);
+	slink = slink_c0->slink;
+	_BUG_ON_PTR(slink);
+	/* Get status from slink. */
+	*buffer = 0;
+	slink->ops->status(slink, -1, type, buffer, sizeof(buffer));
+	DMEMIT(" %s", buffer);
+	return 0;
+}
+
+/* Replicator control interface. */
+static struct target_type replicator_target = {
+	.name = "replicator",
+	.version = {1, 0, 0},
+	.module = THIS_MODULE,
+	.ctr = replicator_ctr,
+	.dtr = replicator_dtr,
+	.map = replicator_map,
+	.postsuspend = replicator_postsuspend,
+	.resume = replicator_resume,
+	.message = replicator_message,
+	.status = replicator_status,
+};
+
+static int __init dm_repl_init(void)
+{
+	int r;
+
+	INIT_LIST_HEAD(&replog_c_list);
+	mutex_init(&replog_c_list_mutex);
+
+	r = dm_register_target(&replicator_target);
+	if (r < 0)
+		DMERR("failed to register %s %s [%d]",
+		      replicator_target.name, version, r);
+	else {
+		DMINFO("registered %s target %s",
+		       replicator_target.name, version);
+		r = dm_register_target(&replicator_dev_target);
+		if (r < 0) {
+			DMERR("Failed to register %s %s [%d]",
+			      replicator_dev_target.name, version, r);
+			dm_unregister_target(&replicator_target);
+		} else
+			DMINFO("registered %s target %s",
+			       replicator_dev_target.name, version);
+	}
+
+	return r;
+}
+
+static void __exit
+dm_repl_exit(void)
+{
+	dm_unregister_target(&replicator_dev_target);
+	DMINFO("unregistered target %s %s",
+	       replicator_dev_target.name, version);
+	dm_unregister_target(&replicator_target);
+	DMINFO("unregistered target %s %s", replicator_target.name, version);
+}
+
+/* Module hooks */
+module_init(dm_repl_init);
+module_exit(dm_repl_exit);
+
+MODULE_DESCRIPTION(DM_NAME " remote replication target");
+MODULE_AUTHOR("Heinz Mauelshagen <heinzm@redhat.com>");
+MODULE_LICENSE("GPL");
diff --git 2.6.33-rc1.orig/drivers/md/dm-repl.h 2.6.33-rc1/drivers/md/dm-repl.h
new file mode 100644
index 0000000..20d0c99
--- /dev/null
+++ 2.6.33-rc1/drivers/md/dm-repl.h
@@ -0,0 +1,127 @@
+/*
+ * Copyright (C) 2009 Red Hat, Inc. All rights reserved.
+ *
+ * Module Author: Heinz Mauelshagen (heinzm@redhat.com)
+ *
+ * This file is released under the GPL.
+ */
+
+/*
+ * API calling convention to create a replication mapping:
+ *
+ * 1. get a replicator log handle, hence creating a new persistent
+ *    log or accessing an existing one
+ * 2. get an slink handle, hence creating a new transient
+ *    slink or accessing an existing one
+ * 2(cont). repeat the previous step for multiple slinks (eg. one for
+ *    local and one for remote device access)
+ * 3. bind a (remote) device to a particlar slink created in a previous step
+ * 3(cont). repeat the device binding for any additional devices on that slink
+ * 4. bind the created slink which has device(s) bound to it to the replog
+ * 4(cont). repeat the slink binding to the replog for all created slinks
+ * 5. call the replog io function for each IO.
+ *
+ * Reverse steps 1-4 to tear a replication mapping down, hence freeing all
+ * transient resources allocated to it.
+ */
+
+#ifndef _DM_REPL_H
+#define _DM_REPL_H
+
+#include <linux/device-mapper.h>
+
+/* FIXME: factor these macros out to dm.h */
+#define	STR_LEN(ptr, str)	ptr, str, strlen(ptr)
+#define ARRAY_END(a)    ((a) + ARRAY_SIZE(a))
+#define	range_ok(i, min, max)   (i >= min && i <= max)
+
+#define	TI_ERR_RET(str, ret) \
+do { \
+	ti->error = DM_MSG_PREFIX ": " str; \
+	return ret; } \
+while (0)
+#define	TI_ERR(str)	TI_ERR_RET(str, -EINVAL)
+
+#define	DM_ERR_RET(ret, x...)	do { DMERR(x); return ret; } while (0)
+#define	DM_EINVAL(x...)	DM_ERR_RET(-EINVAL, x)
+#define	DM_EPERM(x...)	DM_ERR_RET(-EPERM, x)
+
+/*
+ * Minimum split_io of target to preset for local devices in repl_ctr().
+ * Will be adjusted while constructing (a) remote device(s).
+ */
+#define	DM_REPL_MIN_SPLIT_IO	BIO_MAX_SECTORS
+
+/* REMOVEME: devel testing. */
+#if	0
+#define	SHOW_ARGV \
+	do { \
+		int i; \
+\
+		DMINFO("%s: called with the following args:", __func__); \
+		for (i = 0; i < argc; i++) \
+			DMINFO("%d: %s", i, argv[i]); \
+	} while (0)
+#else
+#define	SHOW_ARGV
+#endif
+
+
+/* Factor out to dm-bio-list.h */
+static inline void
+bio_list_push(struct bio_list *bl, struct bio *bio)
+{
+	bio->bi_next = bl->head;
+	bl->head = bio;
+
+	if (!bl->tail)
+		bl->tail = bio;
+}
+
+/* REMOVEME: development */
+#define	_BUG_ON_PTR(ptr) \
+	do { \
+		BUG_ON(!ptr); \
+		BUG_ON(IS_ERR(ptr)); \
+	} while (0)
+
+/* Callback function. */
+typedef void
+(*dm_repl_notify_fn)(int read_err, int write_err, void *context);
+
+/*
+ * Macros to access bitfields in the structures io.flags member.
+ * Mixed case naming examples are in the page cache as well.
+ */
+#define	DM_BITOPS(name, var, flag) \
+static inline int \
+TestClear ## name(struct var *v) \
+{ return test_and_clear_bit(flag, &v->io.flags); } \
+static inline int \
+TestSet ## name(struct var *v) \
+{ return test_and_set_bit(flag, &v->io.flags); } \
+static inline void \
+Clear ## name(struct var *v) \
+{ clear_bit(flag, &v->io.flags); smp_mb(); } \
+static inline void \
+Set ## name(struct var *v) \
+{ set_bit(flag, &v->io.flags); smp_mb(); } \
+static inline int \
+name(struct var *v) \
+{ return test_bit(flag, &v->io.flags); }
+
+/* FIXME: move to dm core. */
+/* Search routines for descriptor arrays. */
+struct dm_str_descr {
+	const int type;
+	const char *name;
+};
+
+/* Return type for name. */
+extern int
+dm_descr_type(const struct dm_str_descr *descr, unsigned len, const char *name);
+/* Return name for type. */
+extern const char *
+dm_descr_name(const struct dm_str_descr *descr, unsigned len, const int type);
+
+#endif
diff --git 2.6.33-rc1.orig/drivers/md/dm.c 2.6.33-rc1/drivers/md/dm.c
index 3167480..0048958 100644
--- 2.6.33-rc1.orig/drivers/md/dm.c
+++ 2.6.33-rc1/drivers/md/dm.c
@@ -2653,6 +2653,7 @@ struct gendisk *dm_disk(struct mapped_device *md)
 {
 	return md->disk;
 }
+EXPORT_SYMBOL_GPL(dm_disk);
 
 struct kobject *dm_kobject(struct mapped_device *md)
 {
-- 
1.6.2.5

  reply	other threads:[~2009-12-18 15:44 UTC|newest]

Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2009-12-18 15:44 [PATCH v6 0/4] dm-replicator: introduce new remote replication target heinzm
2009-12-18 15:44 ` [PATCH v6 1/4] dm-replicator: documentation and module registry heinzm
2009-12-18 15:44   ` heinzm [this message]
2009-12-18 15:44     ` [PATCH v6 3/4] dm-replicator: ringbuffer replication log handler heinzm
2009-12-18 15:44       ` [PATCH v6 4/4] dm-replicator: blockdev site link handler heinzm
2011-07-18  9:44       ` [PATCH v6 3/4] dm-replicator: ringbuffer replication log handler Busby
2010-01-07 10:18   ` [PATCH v6 1/4] dm-replicator: documentation and module registry 张宇
2010-01-08 19:44     ` Heinz Mauelshagen
2010-02-09  1:48       ` Busby

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1261151073-25962-3-git-send-email-heinzm@redhat.com \
    --to=heinzm@redhat.com \
    --cc=dm-devel@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.