All of lore.kernel.org
 help / color / mirror / Atom feed
* [Qemu-devel] [RFC PATCH v1: 00/12] fault tolerance through micro-checkpointing
@ 2013-10-21  1:14 mrhines
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 01/12] mc: add documentation for micro-checkpointing mrhines
                   ` (11 more replies)
  0 siblings, 12 replies; 16+ messages in thread
From: mrhines @ 2013-10-21  1:14 UTC (permalink / raw)
  To: qemu-devel
  Cc: aliguori, quintela, owasserm, onom, abali, mrhines, gokul, pbonzini

From: "Michael R. Hines" <mrhines@us.ibm.com>

This patch implements RDMA-aware fault tolerance for the VM
using Micro-Checkpointing (to be presented at the KVM Forum). 

The breakout of the patches is not ideal and is really meant to
kick things off for review, which will likely extend well past 1.7
and into 1.8 version of QEMU, assuming about 5-6 months of reviews. 

Please begin with patch #01 as it provides a good narrative of
what is different about this and previous attempts at fault tolerance,
including a breakdown of the current empirical performance challenges.

Michael R. Hines (12):
  mc: add documentation for micro-checkpointing
  rdma: remove reference to github.com
  migration: introduce parallelization of migration_bitmap
  mc: introduce a "checkpointing" status check into the VCPU states
  migration: support custom page loading
  rdma: accelerated memcpy() support
  mc: introduce state machine error handling and migration_bitmap prep
  mc: modified QMP statistics and migration_thread handoff
  mc: core logic
  mc: configure and makefile support
  mc: register MC qemu-file functions and expose MC tunable capability
  mc: activate and use MC core logic if requested

 Makefile.objs                 |    1 +
 arch_init.c                   |  276 +++++-
 configure                     |   45 +
 cpus.c                        |    9 +-
 docs/mc.txt                   |  261 ++++++
 docs/rdma.txt                 |    1 -
 hmp-commands.hx               |   14 +
 hmp.c                         |   23 +
 hmp.h                         |    1 +
 include/migration/migration.h |   69 +-
 include/migration/qemu-file.h |   55 +-
 include/qemu-common.h         |   12 +
 migration-checkpoint.c        | 1589 ++++++++++++++++++++++++++++++++
 migration-rdma.c              | 2008 ++++++++++++++++++++++++++++++-----------
 migration.c                   |  148 ++-
 qapi-schema.json              |   92 +-
 qmp-commands.hx               |   23 +
 savevm.c                      |   84 +-
 vl.c                          |   42 +
 19 files changed, 4123 insertions(+), 630 deletions(-)
 create mode 100644 docs/mc.txt
 create mode 100644 migration-checkpoint.c

-- 
1.8.1.2

^ permalink raw reply	[flat|nested] 16+ messages in thread

* [Qemu-devel] [RFC PATCH v1: 01/12] mc: add documentation for micro-checkpointing
  2013-10-21  1:14 [Qemu-devel] [RFC PATCH v1: 00/12] fault tolerance through micro-checkpointing mrhines
@ 2013-10-21  1:14 ` mrhines
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 02/12] rdma: remove reference to github.com mrhines
                   ` (10 subsequent siblings)
  11 siblings, 0 replies; 16+ messages in thread
From: mrhines @ 2013-10-21  1:14 UTC (permalink / raw)
  To: qemu-devel
  Cc: aliguori, quintela, owasserm, onom, abali, mrhines, gokul, pbonzini

From: "Michael R. Hines" <mrhines@us.ibm.com>


Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
---
 docs/mc.txt | 261 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 261 insertions(+)
 create mode 100644 docs/mc.txt

diff --git a/docs/mc.txt b/docs/mc.txt
new file mode 100644
index 0000000..90888f7
--- /dev/null
+++ b/docs/mc.txt
@@ -0,0 +1,261 @@
+Micro Checkpointing Specification
+==============================================
+Wiki: http://wiki.qemu.org/Features/MicroCheckpointing
+Github: git@github.com:hinesmr/qemu.git, 'mc' branch
+
+Copyright (C) 2014 Michael R. Hines <mrhines@us.ibm.com>
+
+Contents:
+=========
+* Introduction
+* The Micro-Checkpointing Process 
+* RDMA Integration
+* Failure Recovery
+* Before running
+* Running
+* Performance
+* TODO
+
+INTRODUCTION:
+=============
+
+Micro-Checkpointing (MC) is one method for providing Fault Tolerance to a
+running virtual machine (VM) with neither runtime assistance from the guest
+kernel nor from the guest application software. Furthermore, Fault Tolerance
+is one method of providing high availability to a VM such that, from the
+perspective of the outside world (clients, devices, and neighboring VMs that
+may be paired with it), the VM and its applications have not lost any runtime
+state in the event of either a failure of the hypervisor/hardware to allow the 
+VM to make forward progress or a complete loss of power. This mechanism for
+providing fault tolerance does *not* provide any protection whatsoever against 
+software-level faults in the guest kernel or applications. In fact, due to
+the potentially extended lifetime of the VM because of this type of high
+availability, such software-level bugs may in fact manifest themselves 
+*more often* than they ordinarily would, in which case you would need to
+employ other forms of availability to guard against such software-level faults.
+
+This implementation is also fully compatible with RDMA. (See docs/rdma.txt
+for more details).
+
+THE MICRO-CHECKPOINTING PROCESS:
+================================
+
+Micro-Checkpointing works against the existing live migration path in QEMU,
+and can effectively be understood as a "live migration that never ends".
+As such, iterations rounds happen at the granularity of 10s of milliseconds
+and perform the following steps:
+
+1. After N milliseconds, stop the VM.
+2. Generate a MC by invoking the live migration software path
+   to identify and copy dirty memory into a local staging area inside QEMU.
+3. Resume the VM immediately so that it can make forward progress.
+4. Transmit the checkpoint to the destination.
+5. Repeat 
+
+Upon failure, load the contents of the last MC at the destination back
+into memory and run the VM normally.
+
+Additionally, a MC must include a consistent view of device I/O,
+particularly the network, a problem commonly referred to as "output commit". 
+This means that the outside world can not be allowed to experience duplicate
+state that was committed by the virtual machine after failure. This is
+possible because a checkpoint may diverge by N milliseconds of time and
+commit state while the current checkpoint is being transmitted to the
+destination. 
+
+To guard against this problem, first, we must "buffer" the TX output of the
+network (not the input) between MCs until the current MC is safely received
+by the destination. For example, all outbound network packets must be held
+at the source until the MC is transmitted. After transmission is complete, 
+those packets can be released. Similarly, in the case of disk I/O, we must
+ensure that either the contents of the local disk is safely mirrored to a 
+remote disk before completing a MC or that the output to a shared disk, 
+such as iSCSI, is also buffered between checkpoints and then later released
+in the same way.
+
+This implementation *currently* only supports buffering for the network.
+This requires that the VM's root disk or any non-ephemeral disks also be 
+made network-accessible directly from within the VM. Until the aforementioned
+buffering or mirroring support is available (ideally through drive-mirror),
+the only "consistent" way to provide full fault tolerance of the VM's
+non-ephemeral disks is to construct a VM whose root disk is made to boot
+directly from iSCSI or NFS or similar such that all disk I/O is translated
+into network I/O. 
+
+RDMA INTEGRATION:
+=================
+
+RDMA is instrumental in enabling better MC performance, which is the reason
+why it was introduced into QEMU first.
+
+1. Checkpoint generation (RDMA-based memcpy):
+2. Checkpoint transmission (for performance and less CPU impact)
+
+Checkpoint generation (step 2 in the previous section) must be done while
+the VM is paused. In the worst case, the size of the checkpoint can be 
+equal in size to the amount of memory in total use by the VM. In order
+to resume VM execution as fast as possible, the checkpoint is copied
+consistently locally into a staging area before transmission. A standard
+memcpy() of potentially such a large amount of memory not only gets
+no use out of the CPU cache but also potentially clogs up the CPU pipeline
+which would otherwise be useful by other neighbor VMs on the same
+physical node that could be scheduled for execution by Linux. To minimize
+the effect on neighbor VMs, we use RDMA to perform a "local" memcpy(),
+bypassing the host processor.
+
+Checkpoint transmission can potentially consume very large amounts of
+both bandwidth as well as CPU utilization that could otherwise by used by
+the VM itself or its neighbors. Once the aforementioned local copy of the
+checkpoint is saved, this implementation makes use of the same RDMA
+hardware to perform the transmission, similar to the way a live migration
+happens over RDMA (see docs/rdma.txt). 
+
+FAILURE RECOVERY:
+=================
+
+Due to the high-frequency nature of micro-checkpointing, we expect
+a new checkpoint to be generated many times per second. Even missing just
+a few checkpoints easily constitutes a failure. Because of the consistent
+buffering of device I/O, this is safe because device I/O is not committed
+to the outside world until the checkpoint has been received at the
+destination.
+
+Failure is thus assumed under two conditions:
+
+1. MC over TCP/IP: Once the socket connection breaks, we assume failure.
+                   This happens very early in the loss of the latest
+                   checkpoint not only because a very large amount of bytes is
+                   typically being sequenced in a TCP stream but perhaps
+                   also because of the timeout in acknowledgement of
+                   the receipt of a commit message by the destination.
+
+2. MC over RDMA:   Since Infiniband does not provide any user-level timeout
+                   mechanisms, this implementation enhances QEMU's 
+                   RDMA migration protocol to include a simple keep-alive.
+                   Upon the loss of multiple keep-alive messages, the
+                   sender is deemed to be failed.
+
+In both cases, either due to a failed TCP socket connection or lost RDMA
+keep-alive group, both the sender or the receiver can be deemed to be failed.
+
+If the sender is deemed to be failed, the destination takes over immediately
+using the contents of the last checkpoint.
+
+If the destination is deemed to be lost, we perform the same action as
+a live migration: resume the sender normally and wait for management software
+to make a policy decision about whether or not to re-protect the VM,
+which may involve a third-party to identify a new destination host again to
+use as a backup for the VM.
+
+BEFORE RUNNING:
+===============
+
+First, compile QEMU with '--enable-mc' and ensure that the corresponding
+libraries for netlink are available. The netlink 'plug' support from the
+Qdisc functionality is required in particular, because it allows QEMU to
+direct the kernel to buffer outbound network packages between checkpoints
+as described previously.
+
+Next, start the VM that you want to protect using your standard procedures.
+
+Enable MC like this:
+
+QEMU Monitor Command:
+$ migrate_set_capability x-mc on # disabled by default
+
+Currently, only one network interface is supported, *and* currently you
+must ensure that the root disk of your VM is booted either directly from
+iSCSI or NFS, as described previously. This will be rectified with future
+improvements. 
+
+For testing only, you can ignore the aforementioned requirements
+if you simply want to get an understanding of the performance
+penalties associated with this feature activated. 
+
+Next, you can optionally disable network-buffering for additional test-only
+execution. This is useful if you want to get a breakdown only what the cost
+of the checkpointing the memory state is without the cost of
+checkpointing device state.
+
+QEMU Monitor Command:
+$ migrate_set_capability mc-net-disable on # buffering activated by default 
+
+Next, you can optionally enable RDMA 'memcpy' support.
+This is only valid if you have RDMA support compiled into QEMU and you intend
+to use the 'rdma' migration URI upon initiating MC as described later.
+
+QEMU Monitor Command:
+$ migrate_set_capability mc-rdma-copy on # disabled by default
+
+Next, you can optionally enable the 'bitworkers' feature of QEMU.
+This is allows QEMU to use all available host CPU cores to parallelize
+the process of processing the migration dirty bitmap as described previously.
+For normal live migrations, we disable this by default as migration is
+typically a short-lived operation.
+
+QEMU Monitor Command:
+$ migrate_set_capability bitworkers on # disabled by default
+
+Finally, if you are using QEMU's support for RDMA migration, you will want
+to enable RDMA keep-alive support to allow quick detection of failure. If
+you are using TCP/IP, this is not required:
+
+QEMU Monitor Command:
+$ migrate_set_capability rdma-keepalive on # disabled by default
+
+RUNNING:
+========
+
+MC can be initiated with exactly the same command as standard live migration:
+
+QEMU Monitor Command:
+$ migrate -d (tcp|rdma):host:port
+
+Upon failure, the destination VM will detect a loss in network connectivity
+and automatically revert to the last checkpoint taken and resume execution
+immediately. There is no need for additional QEMU monitor commands to initiate
+the recovery process.
+
+PERFORMANCE:
+============
+
+By far, the biggest cost is network throughput. Virtual machines are capable
+of dirtying memory well in excess of the bandwidth provided a commodity 1 Gbps 
+network link. If so, the MC process will always lag behind the virtual machine 
+and forward progress will be poor. It is highly recommended to use at least 
+a 10 Gbps link when using MC.
+
+Numbers are still coming in, but without output buffering of network I/O,
+the performance penalty on a typical 4GB RAM Java-based application server workload 
+using a 10 Gbps link (a good worst case for testing due Java's constant 
+garbage collection) is on the order of 25%. With network buffering activated, 
+this can be as high as 50%.
+
+The majority of the 25% penalty is due to the preparation of the QEMU migration
+dirty bitmap, which can incur tens of milliseconds of downtime against the guest. 
+
+The remaining 25% penalty comes from network buffering is typically due to checkpoints
+not occurring fast enough since a typical "round trip" time between the request of
+an application-level transaction and the corresponding response should ideally be 
+larger than the time it takes to complete a checkpoint, otherwise, the response
+to the application within the VM will appear to be congested since the VM's network
+endpoint may not have even received the TX request from the application in the
+first place.
+
+We believe that this effect is "amplified" due to the poor performance in
+processing the migration bitmap and thus since an application-level RTT cannot
+be serviced with more frequent checkpoints, network I/O tends to get held in
+the buffer too long. This has the effect of causing the guest TCP/IP stack
+to experience congestion, propagating this artificially created delay all the
+way up to the application.
+
+TODO:
+=====
+
+1. Eliminate as much of the cost of migration dirty bitmap preparation as possible.
+   Parallelization is really only a stop-gap measure.
+
+2. Implement local disk mirroring by integrating with QEMU's 'drive-mirror'
+   feature in order to full support virtual machines with local storage.
+
+3. Implement output commit buffering for shared storage.
-- 
1.8.1.2

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] [RFC PATCH v1: 02/12] rdma: remove reference to github.com
  2013-10-21  1:14 [Qemu-devel] [RFC PATCH v1: 00/12] fault tolerance through micro-checkpointing mrhines
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 01/12] mc: add documentation for micro-checkpointing mrhines
@ 2013-10-21  1:14 ` mrhines
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 03/12] migration: introduce parallelization of migration_bitmap mrhines
                   ` (9 subsequent siblings)
  11 siblings, 0 replies; 16+ messages in thread
From: mrhines @ 2013-10-21  1:14 UTC (permalink / raw)
  To: qemu-devel
  Cc: aliguori, quintela, owasserm, onom, abali, mrhines, gokul, pbonzini

From: "Michael R. Hines" <mrhines@us.ibm.com>


Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
---
 docs/rdma.txt | 1 -
 1 file changed, 1 deletion(-)

diff --git a/docs/rdma.txt b/docs/rdma.txt
index 2aca63b..6d116e2 100644
--- a/docs/rdma.txt
+++ b/docs/rdma.txt
@@ -2,7 +2,6 @@
 RDMA Live Migration Specification, Version # 1
 ==============================================
 Wiki: http://wiki.qemu-project.org/Features/RDMALiveMigration
-Github: git@github.com:hinesmr/qemu.git, 'rdma' branch
 
 Copyright (C) 2013 Michael R. Hines <mrhines@us.ibm.com>
 
-- 
1.8.1.2

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] [RFC PATCH v1: 03/12] migration: introduce parallelization of migration_bitmap
  2013-10-21  1:14 [Qemu-devel] [RFC PATCH v1: 00/12] fault tolerance through micro-checkpointing mrhines
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 01/12] mc: add documentation for micro-checkpointing mrhines
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 02/12] rdma: remove reference to github.com mrhines
@ 2013-10-21  1:14 ` mrhines
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 04/12] mc: introduce a "checkpointing" status check into the VCPU states mrhines
                   ` (8 subsequent siblings)
  11 siblings, 0 replies; 16+ messages in thread
From: mrhines @ 2013-10-21  1:14 UTC (permalink / raw)
  To: qemu-devel
  Cc: aliguori, quintela, owasserm, onom, abali, mrhines, gokul, pbonzini

From: "Michael R. Hines" <mrhines@us.ibm.com>

This patch allows the preparation of the migration_bitmap
to be parallelized. For very large VMs, this can take on
the order of 10s of milliseconds, which translates as downtime.

We count the number of cores first, and then handout chunks of
the logdirty bitmap to a thread per core. Each thread scans for
dirty bits in parallel.

Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
---
 arch_init.c                   | 228 +++++++++++++++++++++++++++++++++++++++---
 include/migration/migration.h |  10 ++
 include/qemu-common.h         |  12 +++
 qapi-schema.json              |  73 +++++++++++++-
 vl.c                          |  33 ++++++
 5 files changed, 340 insertions(+), 16 deletions(-)

diff --git a/arch_init.c b/arch_init.c
index 7545d96..4a71311 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -189,6 +189,8 @@ typedef struct AccountingInfo {
     uint64_t skipped_pages;
     uint64_t norm_pages;
     uint64_t iterations;
+    uint64_t log_dirty_time;
+    uint64_t migration_bitmap_time;
     uint64_t xbzrle_bytes;
     uint64_t xbzrle_pages;
     uint64_t xbzrle_cache_miss;
@@ -232,6 +234,16 @@ uint64_t norm_mig_pages_transferred(void)
     return acct_info.norm_pages;
 }
 
+uint64_t norm_mig_log_dirty_time(void)
+{
+    return acct_info.log_dirty_time;
+}
+
+uint64_t norm_mig_bitmap_time(void)
+{
+    return acct_info.migration_bitmap_time;
+}
+
 uint64_t xbzrle_mig_bytes_transferred(void)
 {
     return acct_info.xbzrle_bytes;
@@ -362,15 +374,189 @@ ram_addr_t migration_bitmap_find_and_reset_dirty(MemoryRegion *mr,
 static inline bool migration_bitmap_set_dirty(MemoryRegion *mr,
                                               ram_addr_t offset)
 {
-    bool ret;
-    int nr = (mr->ram_addr + offset) >> TARGET_PAGE_BITS;
+    return test_and_set_bit((mr->ram_addr + offset) >> TARGET_PAGE_BITS, 
+                                migration_bitmap);
+}
+
+typedef struct BitmapWalkerParams {
+    QemuMutex ready_mutex;
+    QemuMutex done_mutex;
+    QemuCond cond;
+    QemuThread walker;
+    MigrationState *s;
+    int core_id;
+    int keep_running;
+    ram_addr_t start;
+    ram_addr_t stop;
+    void *block;
+    uint64_t dirty_pages;
+} BitmapWalkerParams;
 
-    ret = test_and_set_bit(nr, migration_bitmap);
+static int nb_bitmap_workers = 0;
 
-    if (!ret) {
-        migration_dirty_pages++;
+BitmapWalkerParams *bitmap_walkers = NULL;
+
+/*
+ * Bitmap workers: This is a temporary performance-driven
+ * workaround for the slowness (10s of milliseconds) incurred
+ * during calls to migration_bitmap_sync().
+ *
+ * Ideally, migration_bitmap_sync() should be able to use the
+ * GET_LOG_DIRTY bitmap from KVM directly, but it does not right
+ * now because the bitmap is not retrieved as a single memory
+ * allocation which requires a couple of transformations into
+ * a 'unified' bitmap before the migration code can make good use
+ * of it.
+ *
+ * Bitmap workers perform this transformation in parallel
+ * in a multi-threaded fashion until a patch is ready to process
+ * the bitmaps from GET_LOG_DIRTY directly.
+ */
+static uint64_t migration_bitmap_sync_range(RAMBlock *block, 
+                            ram_addr_t start, ram_addr_t stop)
+{
+    ram_addr_t addr;
+    uint64_t dirty_pages = 0;
+    
+
+    for (addr = start; addr < stop; addr += TARGET_PAGE_SIZE) {
+        if (memory_region_test_and_clear_dirty(block->mr,
+                                               addr, TARGET_PAGE_SIZE,
+                                               DIRTY_MEMORY_MIGRATION)) {
+            if (!migration_bitmap_set_dirty(block->mr, addr)) {
+                dirty_pages++;
+            }
+        }
+    }
+
+    return dirty_pages;
+}
+
+/*
+ * The worker sleeps until it gets some work to transform a 
+ * chunk of bitmap from KVM to the migration_bitmap.
+ */
+void *migration_bitmap_worker(void *opaque)
+{
+    BitmapWalkerParams * bwp = opaque;
+
+    do {
+        qemu_mutex_lock(&bwp->ready_mutex);
+        qemu_mutex_lock(&bwp->done_mutex);
+        qemu_mutex_unlock(&bwp->ready_mutex);
+        qemu_cond_signal(&bwp->cond);
+
+        if(!bwp->keep_running) {
+                break;
+        }
+
+        bwp->dirty_pages = migration_bitmap_sync_range(bwp->block, bwp->start, bwp->stop);
+
+        qemu_cond_wait(&bwp->cond, &bwp->done_mutex);
+        qemu_mutex_unlock(&bwp->done_mutex);
+    } while(bwp->keep_running);
+
+    return NULL;
+}
+
+void migration_bitmap_worker_start(MigrationState *s)
+{
+    int core;
+
+    /* 
+     * CPUs N - 1 are reserved for N - 1 worker threads 
+     * processing the pc.ram bytemap => migration_bitmap.
+     * The migration thread goes on the last CPU,
+     * which process the remaining, smaller RAMblocks.
+     */
+    nb_bitmap_workers = getNumCores() - 1;
+
+    bitmap_walkers = g_malloc0(sizeof(struct BitmapWalkerParams) * 
+                                                nb_bitmap_workers);
+
+    memset(bitmap_walkers, 0, sizeof(BitmapWalkerParams) * nb_bitmap_workers);
+
+    for (core = 0; core < nb_bitmap_workers; core++) {
+        BitmapWalkerParams * bwp = &bitmap_walkers[core];
+        bwp->core_id = core;
+        bwp->keep_running = 1;
+        bwp->s = s;
+        qemu_cond_init(&bwp->cond);
+        qemu_mutex_init(&bwp->ready_mutex);
+        qemu_mutex_init(&bwp->done_mutex);
+        qemu_mutex_lock(&bwp->ready_mutex);
+    }
+
+    for (core = 0; core < nb_bitmap_workers; core++) {
+        BitmapWalkerParams * bwp = &bitmap_walkers[core];
+        qemu_thread_create(&bwp->walker, 
+            migration_bitmap_worker, bwp, QEMU_THREAD_DETACHED);
+    }
+}
+
+void migration_bitmap_worker_stop(MigrationState *s)
+{
+    int core;
+
+    for (core = 0; core < nb_bitmap_workers; core++) {
+        BitmapWalkerParams * bwp = &bitmap_walkers[core];
+        bwp->keep_running = 0;
+        qemu_mutex_unlock(&bwp->ready_mutex);
+    }
+
+    DPRINTF("Bitmap workers stopped.\n");
+
+	g_free(bitmap_walkers);
+	bitmap_walkers = NULL;
+    nb_bitmap_workers = 0;
+}
+
+
+static void migration_bitmap_distribute_specific_worker(MigrationState *s, RAMBlock * block, int core, ram_addr_t start, ram_addr_t stop)
+{
+    BitmapWalkerParams * bwp = &bitmap_walkers[core];
+
+    bwp->start = start;
+    bwp->stop = stop;
+    bwp->block = block;
+
+    qemu_cond_wait(&bwp->cond, &bwp->ready_mutex);
+} 
+
+static void migration_bitmap_join_worker(MigrationState *s, int core)
+{
+    BitmapWalkerParams * bwp = &bitmap_walkers[core];
+    qemu_mutex_lock(&bwp->done_mutex);
+    qemu_cond_signal(&bwp->cond);
+    qemu_mutex_unlock(&bwp->done_mutex);
+    migration_dirty_pages += bwp->dirty_pages;
+}
+
+/*
+ * Chop up the QEMU 'bytemap' built around GET_LOG_DIRTY and handout
+ * the migration_bitmap population work to all the workers.
+ * 
+ * If there are N cpus in the hypervisor, there will be N workers
+ * which each process equal chunks of the RAM block bytemap.
+ */
+static void migration_bitmap_distribute_work(MigrationState *s, RAMBlock * block)
+{
+    uint64_t pages = block->length / TARGET_PAGE_SIZE;
+    uint64_t inc = pages / nb_bitmap_workers;
+    uint64_t remainder = pages % inc;
+    int core;
+
+    for (core = 0; core < nb_bitmap_workers; core++) {
+        ram_addr_t start = core * inc, stop = core * inc + inc;
+
+        if(core == (nb_bitmap_workers - 1))
+                stop += remainder;
+
+        start *= TARGET_PAGE_SIZE;
+        stop *= TARGET_PAGE_SIZE;
+        
+        migration_bitmap_distribute_specific_worker(s, block, core, start, stop);
     }
-    return ret;
 }
 
 /* Needs iothread lock! */
@@ -378,7 +564,6 @@ static inline bool migration_bitmap_set_dirty(MemoryRegion *mr,
 static void migration_bitmap_sync(void)
 {
     RAMBlock *block;
-    ram_addr_t addr;
     uint64_t num_dirty_pages_init = migration_dirty_pages;
     MigrationState *s = migrate_get_current();
     static int64_t start_time;
@@ -386,33 +571,46 @@ static void migration_bitmap_sync(void)
     static int64_t num_dirty_pages_period;
     int64_t end_time;
     int64_t bytes_xfer_now;
+    int64_t begin_time;
+    int64_t dirty_time;
 
     if (!bytes_xfer_prev) {
         bytes_xfer_prev = ram_bytes_transferred();
     }
 
+    begin_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
     if (!start_time) {
         start_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
     }
-
     trace_migration_bitmap_sync_start();
     address_space_sync_dirty_bitmap(&address_space_memory);
 
+    dirty_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
+
     QTAILQ_FOREACH(block, &ram_list.blocks, next) {
-        for (addr = 0; addr < block->length; addr += TARGET_PAGE_SIZE) {
-            if (memory_region_test_and_clear_dirty(block->mr,
-                                                   addr, TARGET_PAGE_SIZE,
-                                                   DIRTY_MEMORY_MIGRATION)) {
-                migration_bitmap_set_dirty(block->mr, addr);
-            }
+        if (!strcmp(block->idstr, "pc.ram") && nb_bitmap_workers) {
+            migration_bitmap_distribute_work(s, block);
+            continue;
         }
+        migration_dirty_pages += migration_bitmap_sync_range(block, 0, block->length);
     }
+
+    if (nb_bitmap_workers) {
+        int core;
+        for (core = 0; core < nb_bitmap_workers; core++) {
+            migration_bitmap_join_worker(s, core);
+        }
+    }
+
     trace_migration_bitmap_sync_end(migration_dirty_pages
                                     - num_dirty_pages_init);
     num_dirty_pages_period += migration_dirty_pages - num_dirty_pages_init;
     end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
 
-    /* more than 1 second = 1000 millisecons */
+    acct_info.log_dirty_time += dirty_time - begin_time;
+    acct_info.migration_bitmap_time += end_time - dirty_time;
+
+    /* more than 1 second = 1000 milliseconds */
     if (end_time > start_time + 1000) {
         if (migrate_auto_converge()) {
             /* The following detection logic can be refined later. For now:
diff --git a/include/migration/migration.h b/include/migration/migration.h
index 140e6b4..3ffc433 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -45,6 +45,10 @@ struct MigrationState
     int64_t total_time;
     int64_t downtime;
     int64_t expected_downtime;
+    int64_t xmit_time;
+    int64_t ram_copy_time;
+    int64_t log_dirty_time;
+    int64_t bitmap_time;
     int64_t dirty_pages_rate;
     int64_t dirty_bytes_rate;
     bool enabled_capabilities[MIGRATION_CAPABILITY_MAX];
@@ -109,10 +113,16 @@ uint64_t skipped_mig_bytes_transferred(void);
 uint64_t skipped_mig_pages_transferred(void);
 uint64_t norm_mig_bytes_transferred(void);
 uint64_t norm_mig_pages_transferred(void);
+uint64_t norm_mig_log_dirty_time(void);
+uint64_t norm_mig_bitmap_time(void);
 uint64_t xbzrle_mig_bytes_transferred(void);
 uint64_t xbzrle_mig_pages_transferred(void);
 uint64_t xbzrle_mig_pages_overflow(void);
 uint64_t xbzrle_mig_pages_cache_miss(void);
+void *migration_bitmap_worker(void *opaque);
+void migration_bitmap_worker_start(MigrationState *s);
+void migration_bitmap_worker_stop(MigrationState *s);
+void migrate_set_state(MigrationState *s, int old_state, int new_state);
 
 void ram_handle_compressed(void *host, uint8_t ch, uint64_t size);
 
diff --git a/include/qemu-common.h b/include/qemu-common.h
index 5054836..936dc02 100644
--- a/include/qemu-common.h
+++ b/include/qemu-common.h
@@ -478,4 +478,16 @@ size_t buffer_find_nonzero_offset(const void *buf, size_t len);
  */
 int parse_debug_env(const char *name, int max, int initial);
 
+/*
+ * Headers to get number of host processors.
+ */
+int getNumCores(void);
+#if defined(WIN32)
+#include <windows.h>
+#elif defined(CONFIG_BSD)
+#include <sys/param.h>
+#include <sys/sysctl.h>
+#elif defined(CONFIG_LINUX)
+#include <unistd.h>
+#endif
 #endif
diff --git a/qapi-schema.json b/qapi-schema.json
index 60f3fd1..aac0894 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -577,6 +577,36 @@
            'cache-miss': 'int', 'overflow': 'int' } }
 
 ##
+# @MCStats
+#
+# Detailed Micro Checkpointing (MC) statistics
+#
+# @mbps: throughput of transmitting last MC 
+#
+# @xmit-time: milliseconds to transmit last MC 
+#
+# @log-dirty-time: milliseconds to GET_LOG_DIRTY for last MC 
+#
+# @migration-bitmap-time: milliseconds to prepare dirty bitmap for last MC
+#
+# @ram-copy-time: milliseconds to ram_save_live() last MC to staging memory
+#
+# @copy-mbps: throughput of ram_save_live() to staging memory for last MC 
+#
+# @checkpoints: cummulative total number of MCs generated 
+#
+# Since: 1.8
+##
+{ 'type': 'MCStats',
+  'data': {'mbps': 'number',
+           'xmit-time': 'uint64',
+           'log-dirty-time': 'uint64',
+           'migration-bitmap-time': 'uint64', 
+           'ram-copy-time': 'uint64',
+           'checkpoints' : 'uint64',
+           'copy-mbps': 'number' }}
+
+##
 # @MigrationInfo
 #
 # Information about current migration process.
@@ -622,6 +652,7 @@
   'data': {'*status': 'str', '*ram': 'MigrationStats',
            '*disk': 'MigrationStats',
            '*xbzrle-cache': 'XBZRLECacheStats',
+           '*mc': 'MCStats',
            '*total-time': 'int',
            '*expected-downtime': 'int',
            '*downtime': 'int',
@@ -661,10 +692,50 @@
 # @auto-converge: If enabled, QEMU will automatically throttle down the guest
 #          to speed up convergence of RAM migration. (since 1.6)
 #
+# @x-mc: The migration will never end, and the VM will instead be continuously
+#          micro-checkpointed (MC). Use the command migrate-set-mc-delay to 
+#          control the frequency at which the checkpoints occur. 
+#          Disabled by default. (Since 1.8)
+#
+# @mc-net-disable: Deactivate network buffering against outbound network 
+#          traffic while Micro-Checkpointing (@mc) is active.
+#          Enabled by default. Disabling will make the MC protocol inconsistent
+#          and potentially break network connections upon an actual failure.
+#          Only for performance testing. (Since 1.8)
+#
+# @mc-rdma-copy: MC requires creating a local-memory checkpoint before
+#          transmission to the destination. This requires heavy use of 
+#          memcpy() which dominates the processor pipeline. This option 
+#          makes use of *local* RDMA to perform the copy instead of the CPU.
+#          Enabled by default only if the migration transport is RDMA.
+#          Disabled by default otherwise. (Since 1.8)
+#
+# @bitworkers: Allow the QEMU migration bitmap to be scanned in parallel
+#          by using multiple processors on the host machine.
+#          This capability has no effect without also enabling @mc.
+#          Enabled by default. (Since 1.8)
+#
+# @rdma-keepalive: RDMA connections do not timeout by themselves if a peer
+#         has disconnected prematurely or failed. User-level keepalives
+#         allow the migration to abort cleanly if there is a problem with the
+#         destination host. For debugging, this can be problematic as
+#         the keepalive may cause the peer to abort prematurely if we are
+#         at a GDB breakpoint, for example.
+#         Enabled by default. (Since 1.8)
+#
 # Since: 1.2
 ##
 { 'enum': 'MigrationCapability',
-  'data': ['xbzrle', 'x-rdma-pin-all', 'auto-converge', 'zero-blocks'] }
+  'data': ['xbzrle', 
+           'x-rdma-pin-all', 
+           'auto-converge', 
+           'zero-blocks',
+           'x-mc', 
+           'mc-net-disable',
+           'mc-rdma-copy',
+           'bitworkers',
+           'rdma-keepalive'
+          ] }
 
 ##
 # @MigrationCapabilityStatus
diff --git a/vl.c b/vl.c
index b42ac67..e2ba2e8 100644
--- a/vl.c
+++ b/vl.c
@@ -2818,6 +2818,39 @@ static int object_create(QemuOpts *opts, void *opaque)
     return 0;
 }
 
+/*
+ * Currently, only used for migration_bitmap_sync(),
+ * but can be queried by anyone in the future.
+ */
+int getNumCores(void) 
+{
+    uint32_t count;
+#if defined(WIN32)
+    SYSTEM_INFO sysinfo;
+    GetSystemInfo(&sysinfo);
+    count = sysinfo.dwNumberOfProcessors;
+#elif defined(CONFIG_BSD)
+    int nm[2];
+    size_t len = 4;
+    nm[0] = CTL_HW; 
+    nm[1] = HW_AVAILCPU;
+    sysctl(nm, 2, &count, &len, NULL, 0);
+
+    if (count < 1) {
+        nm[1] = HW_NCPU;
+        sysctl(nm, 2, &count, &len, NULL, 0);
+        if(count < 1) { 
+           count = 1; 
+        }
+    }
+#elif defined(CONFIG_LINUX)
+    count = sysconf(_SC_NPROCESSORS_ONLN);
+#else
+    count = 1;
+#endif
+    return count;
+}
+
 int main(int argc, char **argv, char **envp)
 {
     int i;
-- 
1.8.1.2

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] [RFC PATCH v1: 04/12] mc: introduce a "checkpointing" status check into the VCPU states
  2013-10-21  1:14 [Qemu-devel] [RFC PATCH v1: 00/12] fault tolerance through micro-checkpointing mrhines
                   ` (2 preceding siblings ...)
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 03/12] migration: introduce parallelization of migration_bitmap mrhines
@ 2013-10-21  1:14 ` mrhines
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 05/12] migration: support custom page loading mrhines
                   ` (7 subsequent siblings)
  11 siblings, 0 replies; 16+ messages in thread
From: mrhines @ 2013-10-21  1:14 UTC (permalink / raw)
  To: qemu-devel
  Cc: aliguori, quintela, owasserm, onom, abali, mrhines, gokul, pbonzini

From: "Michael R. Hines" <mrhines@us.ibm.com>

During micro-checkpointing, the VCPUs get repeatedly paused and
resumed. We need to not freak out when the VM begins micro-checkpointing.

Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
---
 arch_init.c                   | 2 +-
 cpus.c                        | 9 ++++++++-
 include/migration/migration.h | 2 ++
 qapi-schema.json              | 4 +++-
 vl.c                          | 6 ++++++
 5 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/arch_init.c b/arch_init.c
index 4a71311..b139512 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -199,7 +199,7 @@ typedef struct AccountingInfo {
 
 static AccountingInfo acct_info;
 
-static void acct_clear(void)
+void acct_clear(void)
 {
     memset(&acct_info, 0, sizeof(acct_info));
 }
diff --git a/cpus.c b/cpus.c
index 398229e..d090c2c 100644
--- a/cpus.c
+++ b/cpus.c
@@ -530,7 +530,14 @@ static int do_vm_stop(RunState state)
         pause_all_vcpus();
         runstate_set(state);
         vm_state_notify(0, state);
-        monitor_protocol_event(QEVENT_STOP, NULL);
+        /*
+         * If MC is enabled, libvirt gets confused 
+         * because it thinks the VM is stopped when 
+         * its just being micro-checkpointed.
+         */
+        if(state != RUN_STATE_CHECKPOINT_VM) {
+            monitor_protocol_event(QEVENT_STOP, NULL);
+        }
     }
 
     bdrv_drain_all();
diff --git a/include/migration/migration.h b/include/migration/migration.h
index 3ffc433..3ad06c5 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -119,6 +119,8 @@ uint64_t xbzrle_mig_bytes_transferred(void);
 uint64_t xbzrle_mig_pages_transferred(void);
 uint64_t xbzrle_mig_pages_overflow(void);
 uint64_t xbzrle_mig_pages_cache_miss(void);
+void acct_clear(void);
+
 void *migration_bitmap_worker(void *opaque);
 void migration_bitmap_worker_start(MigrationState *s);
 void migration_bitmap_worker_stop(MigrationState *s);
diff --git a/qapi-schema.json b/qapi-schema.json
index aac0894..8e72bcf 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -169,6 +169,8 @@
 #
 # @save-vm: guest is paused to save the VM state
 #
+# @checkpoint-vm: guest is paused to checkpoint the VM state
+#
 # @shutdown: guest is shut down (and -no-shutdown is in use)
 #
 # @suspended: guest is suspended (ACPI S3)
@@ -181,7 +183,7 @@
   'data': [ 'debug', 'inmigrate', 'internal-error', 'io-error', 'paused',
             'postmigrate', 'prelaunch', 'finish-migrate', 'restore-vm',
             'running', 'save-vm', 'shutdown', 'suspended', 'watchdog',
-            'guest-panicked' ] }
+            'guest-panicked', 'checkpoint-vm' ] }
 
 ##
 # @SnapshotInfo
diff --git a/vl.c b/vl.c
index e2ba2e8..74d52ab 100644
--- a/vl.c
+++ b/vl.c
@@ -611,14 +611,18 @@ static const RunStateTransition runstate_transitions_def[] = {
 
     { RUN_STATE_FINISH_MIGRATE, RUN_STATE_RUNNING },
     { RUN_STATE_FINISH_MIGRATE, RUN_STATE_POSTMIGRATE },
+    { RUN_STATE_FINISH_MIGRATE, RUN_STATE_CHECKPOINT_VM },
 
     { RUN_STATE_RESTORE_VM, RUN_STATE_RUNNING },
 
+    { RUN_STATE_CHECKPOINT_VM, RUN_STATE_RUNNING },
+
     { RUN_STATE_RUNNING, RUN_STATE_DEBUG },
     { RUN_STATE_RUNNING, RUN_STATE_INTERNAL_ERROR },
     { RUN_STATE_RUNNING, RUN_STATE_IO_ERROR },
     { RUN_STATE_RUNNING, RUN_STATE_PAUSED },
     { RUN_STATE_RUNNING, RUN_STATE_FINISH_MIGRATE },
+    { RUN_STATE_RUNNING, RUN_STATE_CHECKPOINT_VM },
     { RUN_STATE_RUNNING, RUN_STATE_RESTORE_VM },
     { RUN_STATE_RUNNING, RUN_STATE_SAVE_VM },
     { RUN_STATE_RUNNING, RUN_STATE_SHUTDOWN },
@@ -634,9 +638,11 @@ static const RunStateTransition runstate_transitions_def[] = {
     { RUN_STATE_RUNNING, RUN_STATE_SUSPENDED },
     { RUN_STATE_SUSPENDED, RUN_STATE_RUNNING },
     { RUN_STATE_SUSPENDED, RUN_STATE_FINISH_MIGRATE },
+    { RUN_STATE_SUSPENDED, RUN_STATE_CHECKPOINT_VM },
 
     { RUN_STATE_WATCHDOG, RUN_STATE_RUNNING },
     { RUN_STATE_WATCHDOG, RUN_STATE_FINISH_MIGRATE },
+    { RUN_STATE_WATCHDOG, RUN_STATE_CHECKPOINT_VM },
 
     { RUN_STATE_GUEST_PANICKED, RUN_STATE_PAUSED },
     { RUN_STATE_GUEST_PANICKED, RUN_STATE_FINISH_MIGRATE },
-- 
1.8.1.2

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] [RFC PATCH v1: 05/12] migration: support custom page loading
  2013-10-21  1:14 [Qemu-devel] [RFC PATCH v1: 00/12] fault tolerance through micro-checkpointing mrhines
                   ` (3 preceding siblings ...)
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 04/12] mc: introduce a "checkpointing" status check into the VCPU states mrhines
@ 2013-10-21  1:14 ` mrhines
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 06/12] rdma: accelerated memcpy() support mrhines
                   ` (6 subsequent siblings)
  11 siblings, 0 replies; 16+ messages in thread
From: mrhines @ 2013-10-21  1:14 UTC (permalink / raw)
  To: qemu-devel
  Cc: aliguori, quintela, owasserm, onom, abali, mrhines, gokul, pbonzini

From: "Michael R. Hines" <mrhines@us.ibm.com>

Just as RDMA has custom routines for saving memory,
this provides us with custom routines for loading memory.

Micro-checkpointing needs this support in order to be able
to handle loading of the latest checkpoint into memory
as they are received from the network.

Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
---
 arch_init.c                   | 17 ++++++++++++-----
 include/migration/migration.h | 12 ++++++++++--
 include/migration/qemu-file.h | 16 ++++++++++++++--
 savevm.c                      | 27 ++++++++++++++++++++++++---
 4 files changed, 60 insertions(+), 12 deletions(-)

diff --git a/arch_init.c b/arch_init.c
index b139512..9cf7d18 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -684,7 +684,7 @@ static int ram_save_block(QEMUFile *f, bool last_stage)
             /* In doubt sent page as normal */
             bytes_sent = -1;
             ret = ram_control_save_page(f, block->offset,
-                               offset, TARGET_PAGE_SIZE, &bytes_sent);
+                       block->host, offset, TARGET_PAGE_SIZE, &bytes_sent);
 
             if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
                 if (ret != RAM_SAVE_CONTROL_DELAYED) {
@@ -712,9 +712,11 @@ static int ram_save_block(QEMUFile *f, bool last_stage)
             /* XBZRLE overflow or normal page */
             if (bytes_sent == -1) {
                 bytes_sent = save_block_hdr(f, block, offset, cont, RAM_SAVE_FLAG_PAGE);
-                qemu_put_buffer_async(f, p, TARGET_PAGE_SIZE);
-                bytes_sent += TARGET_PAGE_SIZE;
-                acct_info.norm_pages++;
+                if (ret != RAM_SAVE_CONTROL_DELAYED) {
+                    qemu_put_buffer_async(f, p, TARGET_PAGE_SIZE);
+                    bytes_sent += TARGET_PAGE_SIZE;
+                    acct_info.norm_pages++;
+                }
             }
 
             /* if page is unmodified, continue to the next */
@@ -1133,13 +1135,18 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
             ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
         } else if (flags & RAM_SAVE_FLAG_PAGE) {
             void *host;
+            int r;
 
             host = host_from_stream_offset(f, addr, flags);
             if (!host) {
                 return -EINVAL;
             }
 
-            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
+            r = ram_control_load_page(f, host, TARGET_PAGE_SIZE);
+
+            if (r == RAM_LOAD_CONTROL_NOT_SUPP) {
+                qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
+            }
         } else if (flags & RAM_SAVE_FLAG_XBZRLE) {
             void *host = host_from_stream_offset(f, addr, flags);
             if (!host) {
diff --git a/include/migration/migration.h b/include/migration/migration.h
index 3ad06c5..ac1b438 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -169,9 +169,17 @@ void ram_control_load_hook(QEMUFile *f, uint64_t flags);
 
 #define RAM_SAVE_CONTROL_NOT_SUPP -1000
 #define RAM_SAVE_CONTROL_DELAYED  -2000
+#define RAM_LOAD_CONTROL_NOT_SUPP -3000
+#define RAM_LOAD_CONTROL_DELAYED  -4000
 
-size_t ram_control_save_page(QEMUFile *f, ram_addr_t block_offset,
-                             ram_addr_t offset, size_t size,
+#define RDMA_CONTROL_VERSION_CURRENT 1
+
+int ram_control_save_page(QEMUFile *f, ram_addr_t block_offset,
+                             uint8_t *host_addr,
+                             ram_addr_t offset, long size,
                              int *bytes_sent);
 
+int ram_control_load_page(QEMUFile *f,
+                             void *host_addr,
+                             long size);
 #endif
diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h
index 0f757fb..d396b40 100644
--- a/include/migration/qemu-file.h
+++ b/include/migration/qemu-file.h
@@ -76,12 +76,22 @@ typedef int (QEMURamHookFunc)(QEMUFile *f, void *opaque, uint64_t flags);
  * This function allows override of where the RAM page
  * is saved (such as RDMA, for example.)
  */
-typedef size_t (QEMURamSaveFunc)(QEMUFile *f, void *opaque,
+typedef int (QEMURamSaveFunc)(QEMUFile *f, void *opaque,
                                ram_addr_t block_offset,
+                               uint8_t *host_addr,
                                ram_addr_t offset,
-                               size_t size,
+                               long size,
                                int *bytes_sent);
 
+/*
+ * This function allows override of where the RAM page
+ * is saved (such as RDMA, for example.)
+ */
+typedef int (QEMURamLoadFunc)(QEMUFile *f,
+                               void *opaque,
+                               void *host_addr,
+                               long size);
+
 typedef struct QEMUFileOps {
     QEMUFilePutBufferFunc *put_buffer;
     QEMUFileGetBufferFunc *get_buffer;
@@ -92,12 +102,14 @@ typedef struct QEMUFileOps {
     QEMURamHookFunc *after_ram_iterate;
     QEMURamHookFunc *hook_ram_load;
     QEMURamSaveFunc *save_page;
+    QEMURamLoadFunc *load_page;
 } QEMUFileOps;
 
 QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops);
 QEMUFile *qemu_fopen(const char *filename, const char *mode);
 QEMUFile *qemu_fdopen(int fd, const char *mode);
 QEMUFile *qemu_fopen_socket(int fd, const char *mode);
+QEMUFile *qemu_fopen_mc(void *opaque, const char *mode);
 QEMUFile *qemu_popen_cmd(const char *command, const char *mode);
 int qemu_get_fd(QEMUFile *f);
 int qemu_fclose(QEMUFile *f);
diff --git a/savevm.c b/savevm.c
index 2f631d4..05f8a05 100644
--- a/savevm.c
+++ b/savevm.c
@@ -661,14 +661,17 @@ void ram_control_load_hook(QEMUFile *f, uint64_t flags)
     }
 }
 
-size_t ram_control_save_page(QEMUFile *f, ram_addr_t block_offset,
-                         ram_addr_t offset, size_t size, int *bytes_sent)
+int ram_control_save_page(QEMUFile *f, ram_addr_t block_offset,
+                         uint8_t *host_addr,
+                         ram_addr_t offset, long size, int *bytes_sent)
 {
     if (f->ops->save_page) {
         int ret = f->ops->save_page(f, f->opaque, block_offset,
+                                    host_addr,
                                     offset, size, bytes_sent);
 
-        if (ret != RAM_SAVE_CONTROL_DELAYED) {
+        if (ret != RAM_SAVE_CONTROL_DELAYED 
+                && ret != RAM_SAVE_CONTROL_NOT_SUPP) {
             if (bytes_sent && *bytes_sent > 0) {
                 qemu_update_position(f, *bytes_sent);
             } else if (ret < 0) {
@@ -682,6 +685,24 @@ size_t ram_control_save_page(QEMUFile *f, ram_addr_t block_offset,
     return RAM_SAVE_CONTROL_NOT_SUPP;
 }
 
+int ram_control_load_page(QEMUFile *f, void *host_addr, long size)
+{
+    if (f->ops->load_page) {
+        int ret = f->ops->load_page(f, f->opaque, host_addr, size);
+
+        if (ret != RAM_LOAD_CONTROL_DELAYED 
+                && ret != RAM_LOAD_CONTROL_NOT_SUPP) {
+            if (ret < 0) {
+                qemu_file_set_error(f, ret);
+            }
+        }
+
+        return ret;
+    }
+
+    return RAM_LOAD_CONTROL_NOT_SUPP;
+}
+
 static void qemu_fill_buffer(QEMUFile *f)
 {
     int len;
-- 
1.8.1.2

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] [RFC PATCH v1: 06/12] rdma: accelerated memcpy() support
  2013-10-21  1:14 [Qemu-devel] [RFC PATCH v1: 00/12] fault tolerance through micro-checkpointing mrhines
                   ` (4 preceding siblings ...)
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 05/12] migration: support custom page loading mrhines
@ 2013-10-21  1:14 ` mrhines
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 07/12] mc: introduce state machine error handling and migration_bitmap prep mrhines
                   ` (5 subsequent siblings)
  11 siblings, 0 replies; 16+ messages in thread
From: mrhines @ 2013-10-21  1:14 UTC (permalink / raw)
  To: qemu-devel
  Cc: aliguori, quintela, owasserm, onom, abali, mrhines, gokul, pbonzini

From: "Michael R. Hines" <mrhines@us.ibm.com>

This patch implements the ability to provide accelerated
memory copies as an alternative to memcpy() using RDMA.

It requires the user to "register" the src and dest memory
with the RDMA subsystem using new interfaces and then call
a special copy function that is introduced to submit the
copy to the hardware.

Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
---
 include/migration/migration.h |   12 +
 include/migration/qemu-file.h |   38 +
 migration-rdma.c              | 2008 ++++++++++++++++++++++++++++++-----------
 savevm.c                      |   52 ++
 4 files changed, 1571 insertions(+), 539 deletions(-)

diff --git a/include/migration/migration.h b/include/migration/migration.h
index ac1b438..0e7f121 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -159,6 +159,9 @@ int64_t xbzrle_cache_resize(int64_t new_size);
 void ram_control_before_iterate(QEMUFile *f, uint64_t flags);
 void ram_control_after_iterate(QEMUFile *f, uint64_t flags);
 void ram_control_load_hook(QEMUFile *f, uint64_t flags);
+void ram_control_add(QEMUFile *f, void *host_addr,
+                         ram_addr_t block_offset, uint64_t length);
+void ram_control_remove(QEMUFile *f, ram_addr_t block_offset);
 
 /* Whenever this is found in the data stream, the flags
  * will be passed to ram_control_load_hook in the incoming-migration
@@ -171,6 +174,8 @@ void ram_control_load_hook(QEMUFile *f, uint64_t flags);
 #define RAM_SAVE_CONTROL_DELAYED  -2000
 #define RAM_LOAD_CONTROL_NOT_SUPP -3000
 #define RAM_LOAD_CONTROL_DELAYED  -4000
+#define RAM_COPY_CONTROL_NOT_SUPP -5000
+#define RAM_COPY_CONTROL_DELAYED  -6000
 
 #define RDMA_CONTROL_VERSION_CURRENT 1
 
@@ -182,4 +187,11 @@ int ram_control_save_page(QEMUFile *f, ram_addr_t block_offset,
 int ram_control_load_page(QEMUFile *f,
                              void *host_addr,
                              long size);
+
+int ram_control_copy_page(QEMUFile *f, 
+                             ram_addr_t block_offset_dest,
+                             ram_addr_t offset_dest,
+                             ram_addr_t block_offset_source,
+                             ram_addr_t offset_source,
+                             long size);
 #endif
diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h
index d396b40..d67e97a 100644
--- a/include/migration/qemu-file.h
+++ b/include/migration/qemu-file.h
@@ -92,6 +92,41 @@ typedef int (QEMURamLoadFunc)(QEMUFile *f,
                                void *host_addr,
                                long size);
 
+/*
+ * This function allows *local* RDMA copying memory between two registered
+ * RAMBlocks, both real ones as well as private memory areas independently
+ * registered by external callers (such as MC). If RDMA is not available,
+ * then this function does nothing and the caller should just use memcpy().
+ */
+typedef int (QEMURamCopyFunc)(QEMUFile *f, void *opaque,
+                               ram_addr_t block_offset_dest,
+                               ram_addr_t offset_dest,
+                               ram_addr_t block_offset_source,
+                               ram_addr_t offset_source,
+                               long size);
+
+/* 
+ * Inform the underlying transport of a new virtual memory area.
+ * If this area is an actual RAMBlock, then pass the corresponding
+ * parameters of that block.
+ * If this area is an arbitrary virtual memory address, then
+ * pass the same value for both @host_addr and @block_offset.
+ */
+typedef int (QEMURamAddFunc)(QEMUFile *f, void *opaque,
+                               void *host_addr,
+                               ram_addr_t block_offset,
+                               uint64_t length);
+
+/* 
+ * Remove an underlying new virtual memory area.
+ * If this area is an actual RAMBlock, then pass the corresponding
+ * parameters of that block.
+ * If this area is an arbitrary virtual memory address, then
+ * pass the same value for both @host_addr and @block_offset.
+ */
+typedef int (QEMURamRemoveFunc)(QEMUFile *f, void *opaque,
+                               ram_addr_t block_offset);
+
 typedef struct QEMUFileOps {
     QEMUFilePutBufferFunc *put_buffer;
     QEMUFileGetBufferFunc *get_buffer;
@@ -103,6 +138,9 @@ typedef struct QEMUFileOps {
     QEMURamHookFunc *hook_ram_load;
     QEMURamSaveFunc *save_page;
     QEMURamLoadFunc *load_page;
+    QEMURamCopyFunc *copy_page;
+    QEMURamAddFunc *add;
+    QEMURamRemoveFunc *remove;
 } QEMUFileOps;
 
 QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops);
diff --git a/migration-rdma.c b/migration-rdma.c
index f94f3b4..10365af 100644
--- a/migration-rdma.c
+++ b/migration-rdma.c
@@ -27,32 +27,40 @@
 #include <string.h>
 #include <rdma/rdma_cma.h>
 
+/*
+ * Ability to runtime-enable debug statements while inside GDB.
+ * Choices are 1, 2, or 3 (so far).
+ */
+static int rdma_debug = 0;
+
 //#define DEBUG_RDMA
 //#define DEBUG_RDMA_VERBOSE
 //#define DEBUG_RDMA_REALLY_VERBOSE
 
+#define RPRINTF(fmt, ...) printf("rdma: " fmt, ## __VA_ARGS__)
+
 #ifdef DEBUG_RDMA
 #define DPRINTF(fmt, ...) \
-    do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
+    do { RPRINTF(fmt, ## __VA_ARGS__); } while (0)
 #else
 #define DPRINTF(fmt, ...) \
-    do { } while (0)
+    do { if (rdma_debug >= 1) RPRINTF(fmt, ## __VA_ARGS__); } while (0)
 #endif
 
 #ifdef DEBUG_RDMA_VERBOSE
 #define DDPRINTF(fmt, ...) \
-    do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
+    do { RPRINTF(fmt, ## __VA_ARGS__); } while (0)
 #else
 #define DDPRINTF(fmt, ...) \
-    do { } while (0)
+    do { if (rdma_debug >= 2) RPRINTF(fmt, ## __VA_ARGS__); } while (0)
 #endif
 
 #ifdef DEBUG_RDMA_REALLY_VERBOSE
 #define DDDPRINTF(fmt, ...) \
-    do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
+    do { RPRINTF(fmt, ## __VA_ARGS__); } while (0)
 #else
 #define DDDPRINTF(fmt, ...) \
-    do { } while (0)
+    do { if (rdma_debug >= 3) RPRINTF(fmt, ## __VA_ARGS__); } while (0)
 #endif
 
 /*
@@ -60,17 +68,20 @@
  */
 #define ERROR(errp, fmt, ...) \
     do { \
+        Error **e = errp; \
         fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \
-        if (errp && (*(errp) == NULL)) { \
-            error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
+        if (e && ((*e) == NULL)) { \
+            error_setg(e, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
         } \
     } while (0)
 
+#define SET_ERROR(rdma, err) if (!rdma->error_state) rdma->error_state = err
+
 #define RDMA_RESOLVE_TIMEOUT_MS 10000
 
 /* Do not merge data if larger than this. */
 #define RDMA_MERGE_MAX (2 * 1024 * 1024)
-#define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
+#define RDMA_SEND_MAX (RDMA_MERGE_MAX / 4096)
 
 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
 
@@ -87,18 +98,30 @@
  */
 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
-
-#define RDMA_CONTROL_VERSION_CURRENT 1
 /*
  * Capabilities for negotiation.
  */
 #define RDMA_CAPABILITY_PIN_ALL 0x01
+#define RDMA_CAPABILITY_KEEPALIVE 0x02
+
+/*
+ * Max # missed keepalive before we assume remote side is unavailable.
+ */
+#define RDMA_CONNECTION_INTERVAL_MS 300
+#define RDMA_KEEPALIVE_INTERVAL_MS 300
+#define RDMA_KEEPALIVE_FIRST_MISSED_OFFSET 1000
+#define RDMA_MAX_LOST_KEEPALIVE 10 
+#define RDMA_MAX_STARTUP_MISSED_KEEPALIVE 100
 
 /*
  * Add the other flags above to this list of known capabilities
  * as they are introduced.
  */
-static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
+static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL
+                                   | RDMA_CAPABILITY_KEEPALIVE
+                                   ;
+static QEMUTimer *connection_timer = NULL;
+static QEMUTimer *keepalive_timer = NULL;
 
 #define CHECK_ERROR_STATE() \
     do { \
@@ -143,14 +166,18 @@ static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
  */
 enum {
     RDMA_WRID_NONE = 0,
-    RDMA_WRID_RDMA_WRITE = 1,
+    RDMA_WRID_RDMA_WRITE_REMOTE = 1,
+    RDMA_WRID_RDMA_WRITE_LOCAL = 2,
+    RDMA_WRID_RDMA_KEEPALIVE = 3,
     RDMA_WRID_SEND_CONTROL = 2000,
     RDMA_WRID_RECV_CONTROL = 4000,
 };
 
 const char *wrid_desc[] = {
     [RDMA_WRID_NONE] = "NONE",
-    [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
+    [RDMA_WRID_RDMA_WRITE_REMOTE] = "WRITE RDMA REMOTE",
+    [RDMA_WRID_RDMA_WRITE_LOCAL] = "WRITE RDMA LOCAL",
+    [RDMA_WRID_RDMA_KEEPALIVE] = "KEEPALIVE",
     [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
     [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
 };
@@ -216,21 +243,41 @@ typedef struct {
 /*
  * Negotiate RDMA capabilities during connection-setup time.
  */
-typedef struct {
+typedef struct QEMU_PACKED RDMACapabilities {
     uint32_t version;
     uint32_t flags;
+    uint32_t keepalive_rkey;
+    uint64_t keepalive_addr;
 } RDMACapabilities;
 
+static uint64_t htonll(uint64_t v)
+{
+    union { uint32_t lv[2]; uint64_t llv; } u;
+    u.lv[0] = htonl(v >> 32);
+    u.lv[1] = htonl(v & 0xFFFFFFFFULL);
+    return u.llv;
+}
+
+static uint64_t ntohll(uint64_t v) {
+    union { uint32_t lv[2]; uint64_t llv; } u;
+    u.llv = v;
+    return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
+}
+
 static void caps_to_network(RDMACapabilities *cap)
 {
     cap->version = htonl(cap->version);
     cap->flags = htonl(cap->flags);
+    cap->keepalive_rkey = htonl(cap->keepalive_rkey);
+    cap->keepalive_addr = htonll(cap->keepalive_addr);
 }
 
 static void network_to_caps(RDMACapabilities *cap)
 {
     cap->version = ntohl(cap->version);
     cap->flags = ntohl(cap->flags);
+    cap->keepalive_rkey = ntohl(cap->keepalive_rkey);
+    cap->keepalive_addr = ntohll(cap->keepalive_addr);
 }
 
 /*
@@ -245,11 +292,15 @@ typedef struct RDMALocalBlock {
     uint64_t remote_host_addr; /* remote virtual address */
     uint64_t offset;
     uint64_t length;
-    struct   ibv_mr **pmr;     /* MRs for chunk-level registration */
-    struct   ibv_mr *mr;       /* MR for non-chunk-level registration */
-    uint32_t *remote_keys;     /* rkeys for chunk-level registration */
-    uint32_t remote_rkey;      /* rkeys for non-chunk-level registration */
-    int      index;            /* which block are we */
+    struct ibv_mr **pmr;      /* MRs for remote chunk-level registration */
+    struct ibv_mr *mr;        /* MR for non-chunk-level registration */
+    struct ibv_mr **pmr_src;  /* MRs for copy chunk-level registration */
+    struct ibv_mr *mr_src;    /* MR for copy non-chunk-level registration */
+    struct ibv_mr **pmr_dest; /* MRs for copy chunk-level registration */
+    struct ibv_mr *mr_dest;   /* MR for copy non-chunk-level registration */
+    uint32_t *remote_keys;    /* rkeys for chunk-level registration */
+    uint32_t remote_rkey;     /* rkeys for non-chunk-level registration */
+    int      index;           /* which block are we */
     bool     is_ram_block;
     int      nb_chunks;
     unsigned long *transit_bitmap;
@@ -271,20 +322,6 @@ typedef struct QEMU_PACKED RDMARemoteBlock {
     uint32_t padding;
 } RDMARemoteBlock;
 
-static uint64_t htonll(uint64_t v)
-{
-    union { uint32_t lv[2]; uint64_t llv; } u;
-    u.lv[0] = htonl(v >> 32);
-    u.lv[1] = htonl(v & 0xFFFFFFFFULL);
-    return u.llv;
-}
-
-static uint64_t ntohll(uint64_t v) {
-    union { uint32_t lv[2]; uint64_t llv; } u;
-    u.llv = v;
-    return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
-}
-
 static void remote_block_to_network(RDMARemoteBlock *rb)
 {
     rb->remote_host_addr = htonll(rb->remote_host_addr);
@@ -313,10 +350,69 @@ typedef struct RDMALocalBlocks {
 } RDMALocalBlocks;
 
 /*
+ * We provide RDMA to QEMU by way of 2 mechanisms:
+ *
+ * 1. Local copy to remote copy
+ * 2. Local copy to local copy - like memcpy().
+ *
+ * Three instances of this structure are maintained inside of RDMAContext
+ * to manage both mechanisms.
+ */
+typedef struct RDMACurrentChunk {
+    /* store info about current buffer so that we can
+       merge it with future sends */
+    uint64_t current_addr;
+    uint64_t current_length;
+    /* index of ram block the current buffer belongs to */
+    int current_block_idx;
+    /* index of the chunk in the current ram block */
+    int current_chunk;
+
+    uint64_t block_offset;
+    uint64_t offset;
+
+    /* parameters for qemu_rdma_write() */
+    uint64_t chunk_idx;
+    uint8_t *chunk_start;
+    uint8_t *chunk_end;
+    RDMALocalBlock *block;
+    uint8_t *addr;
+    uint64_t chunks;
+} RDMACurrentChunk;
+
+/*
+ * Three copies of the following strucuture are used to hold the infiniband
+ * connection variables for each of the aformentioned mechanisms, one for
+ * remote copy and two local copy.
+ */
+typedef struct RDMALocalContext {
+    struct ibv_context *verbs;
+    struct ibv_pd *pd;
+    struct ibv_comp_channel *comp_chan;
+    struct ibv_cq *cq;
+    struct ibv_qp_init_attr qp_attr;
+    struct ibv_qp *qp;
+    union ibv_gid gid;
+    struct ibv_port_attr port;
+    uint64_t psn;
+    int port_num;
+    int nb_sent;
+    int64_t start_time;
+    int max_nb_sent;
+    const char * id_str;
+} RDMALocalContext;
+
+/*
  * Main data structure for RDMA state.
  * While there is only one copy of this structure being allocated right now,
  * this is the place where one would start if you wanted to consider
  * having more than one RDMA connection open at the same time.
+ *
+ * It is used for performing both local and remote RDMA operations
+ * with a single RDMA connection.
+ *
+ * Local operations are done by allocating separate queue pairs after
+ * the initial RDMA remote connection is initalized.
  */
 typedef struct RDMAContext {
     char *host;
@@ -333,19 +429,15 @@ typedef struct RDMAContext {
      */
     int control_ready_expected;
 
-    /* number of outstanding writes */
+    /* number of posts */
     int nb_sent;
 
-    /* store info about current buffer so that we can
-       merge it with future sends */
-    uint64_t current_addr;
-    uint64_t current_length;
-    /* index of ram block the current buffer belongs to */
-    int current_index;
-    /* index of the chunk in the current ram block */
-    int current_chunk;
+    RDMACurrentChunk chunk_remote;
+    RDMACurrentChunk chunk_local_src;
+    RDMACurrentChunk chunk_local_dest;
 
     bool pin_all;
+    bool do_keepalive;
 
     /*
      * infiniband-specific variables for opening the device
@@ -384,17 +476,200 @@ typedef struct RDMAContext {
      * Then use coroutine yield function.
      * Source runs in a thread, so we don't care.
      */
-    int migration_started_on_destination;
+    bool migration_started;
 
     int total_registrations;
     int total_writes;
 
     int unregister_current, unregister_next;
-    uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
+    uint64_t unregistrations[RDMA_SEND_MAX];
 
     GHashTable *blockmap;
+
+    uint64_t keepalive;
+    uint64_t last_keepalive;
+    uint64_t nb_missed_keepalive;
+    uint64_t next_keepalive;
+    struct ibv_mr *keepalive_mr;
+    struct ibv_mr *next_keepalive_mr;
+    uint32_t keepalive_rkey;
+    uint64_t keepalive_addr; 
+    bool keepalive_startup; 
+
+    RDMALocalContext lc_src;
+    RDMALocalContext lc_dest;
+    RDMALocalContext lc_remote;
+
+    /* who are we? */
+    bool source;
+    bool dest;
 } RDMAContext;
 
+static void close_ibv(RDMAContext *rdma, RDMALocalContext *lc)
+{
+
+    if (lc->qp) {
+        struct ibv_qp_attr attr = {.qp_state = IBV_QPS_ERR };
+        ibv_modify_qp(lc->qp, &attr, IBV_QP_STATE);
+        ibv_destroy_qp(lc->qp);
+        lc->qp = NULL;
+    }
+
+    if (lc->cq) {
+        ibv_destroy_cq(lc->cq);
+        lc->cq = NULL;
+    }
+
+    if (lc->comp_chan) {
+        ibv_destroy_comp_channel(lc->comp_chan);
+        lc->comp_chan = NULL;
+    }
+
+    if (lc->pd) {
+        ibv_dealloc_pd(lc->pd);
+        lc->pd = NULL;
+    }
+
+    if (lc->verbs) {
+        ibv_close_device(lc->verbs);
+        lc->verbs = NULL;
+    }
+}
+
+/*
+ * Create protection domain and completion queues
+ */
+static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma, RDMALocalContext *lc)
+{
+    struct rlimit r = { .rlim_cur = RLIM_INFINITY, .rlim_max = RLIM_INFINITY };
+     
+    if (getrlimit(RLIMIT_MEMLOCK, &r) < 0) {
+        perror("getrlimit");
+        ERROR(NULL, "getrlimit(RLIMIT_MEMLOCK)");
+        goto err_alloc;
+    }
+
+    DPRINTF("MemLock Limits cur: %" PRId64 " max: %" PRId64 "\n",
+            r.rlim_cur, r.rlim_max);
+
+    lc->pd = ibv_alloc_pd(lc->verbs);
+    if (!lc->pd) {
+        ERROR(NULL, "allocate protection domain");
+        goto err_alloc;
+    }
+
+    /* create completion channel */
+    lc->comp_chan = ibv_create_comp_channel(lc->verbs);
+    if (!lc->comp_chan) {
+        ERROR(NULL, "allocate completion channel");
+        goto err_alloc;
+    }
+
+    /*
+     * Completion queue can be filled by both read and write work requests,
+     * so must reflect the sum of both possible queue sizes.
+     */
+    lc->cq = ibv_create_cq(lc->verbs, (RDMA_SEND_MAX * 3), NULL, 
+                           lc->comp_chan, 0);
+    if (!lc->cq) {
+        ERROR(NULL, "allocate completion queue");
+        goto err_alloc;
+    }
+
+    return 0;
+
+err_alloc:
+    close_ibv(rdma, lc);
+    return -EINVAL;
+}
+
+static int open_local(RDMAContext *rdma, RDMALocalContext *lc)
+{
+	struct ibv_qp_attr set_attr = {
+		.qp_state = IBV_QPS_INIT,
+		.pkey_index = 0,
+		.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE,
+	};
+    struct ibv_qp_attr query_attr;
+    struct ibv_qp_init_attr query_init_attr;
+    int ret;
+
+    lc->psn = lrand48() & 0xffffff;
+
+    ret = ibv_query_qp(rdma->lc_remote.qp, &query_attr, IBV_QP_PORT, &query_init_attr);
+
+    if (ret) {
+        ret = EINVAL;
+        ERROR(NULL, "query original QP state");
+        goto err;
+    }
+
+    lc->port_num = query_attr.port_num;
+    set_attr.port_num = lc->port_num;
+
+    lc->verbs = ibv_open_device(rdma->lc_remote.verbs->device);
+
+	if(lc->verbs == NULL) {
+        ret = EINVAL;
+        ERROR(NULL, "open device!");
+        goto err;
+	}
+
+    ret = qemu_rdma_alloc_pd_cq(rdma, lc);
+
+    if (ret) {
+        ret = -ret;
+        ERROR(NULL, "Local ibv structure allocations");
+        goto err;
+    }
+
+    if (rdma->dest) {
+        qemu_set_nonblock(lc->comp_chan->fd);
+    }
+
+	lc->qp_attr.cap.max_send_wr	 = RDMA_SEND_MAX;
+	lc->qp_attr.cap.max_recv_wr	 = 3;
+	lc->qp_attr.cap.max_send_sge = 1;
+	lc->qp_attr.cap.max_recv_sge = 1;
+    lc->qp_attr.send_cq          = lc->cq;
+	lc->qp_attr.recv_cq		     = lc->cq;
+	lc->qp_attr.qp_type		     = IBV_QPT_RC;
+
+	lc->qp = ibv_create_qp(lc->pd, &lc->qp_attr);
+	if (!lc->qp) {
+        ret = EINVAL;
+        ERROR(NULL, "create queue pair!");
+        goto err;
+    }
+
+	ret = ibv_modify_qp(lc->qp, &set_attr,
+		IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS);
+
+    if (ret) {
+		ERROR(NULL, "verbs to init!");
+        goto err;
+	}	
+	
+	ret = ibv_query_port(lc->verbs, lc->port_num, &lc->port);
+
+    if (ret) {
+		ERROR(NULL, "query port attributes!");
+        goto err;
+    }
+	
+	ret = ibv_query_gid(lc->verbs, 1, 0, &lc->gid);
+
+    if (ret) {
+		ERROR(NULL, "Failed to query gid!");
+        goto err;
+    }
+
+    return 0;
+err:
+    SET_ERROR(rdma, -ret);
+    return rdma->error_state;
+}
+
 /*
  * Interface to the rest of the migration call stack.
  */
@@ -440,7 +715,7 @@ typedef struct QEMU_PACKED {
         uint64_t current_addr;  /* offset into the ramblock of the chunk */
         uint64_t chunk;         /* chunk to lookup if unregistering */
     } key;
-    uint32_t current_index; /* which ramblock the chunk belongs to */
+    uint32_t current_block_idx;     /* which ramblock the chunk belongs to */
     uint32_t padding;
     uint64_t chunks;            /* how many sequential chunks to register */
 } RDMARegister;
@@ -448,14 +723,14 @@ typedef struct QEMU_PACKED {
 static void register_to_network(RDMARegister *reg)
 {
     reg->key.current_addr = htonll(reg->key.current_addr);
-    reg->current_index = htonl(reg->current_index);
+    reg->current_block_idx = htonl(reg->current_block_idx);
     reg->chunks = htonll(reg->chunks);
 }
 
 static void network_to_register(RDMARegister *reg)
 {
     reg->key.current_addr = ntohll(reg->key.current_addr);
-    reg->current_index = ntohl(reg->current_index);
+    reg->current_block_idx = ntohl(reg->current_block_idx);
     reg->chunks = ntohll(reg->chunks);
 }
 
@@ -578,10 +853,10 @@ static int __qemu_rdma_add_block(RDMAContext *rdma, void *host_addr,
 
     g_hash_table_insert(rdma->blockmap, (void *) block_offset, block);
 
-    DDPRINTF("Added Block: %d, addr: %" PRIu64 ", offset: %" PRIu64
-           " length: %" PRIu64 " end: %" PRIu64 " bits %" PRIu64 " chunks %d\n",
-            local->nb_blocks, (uint64_t) block->local_host_addr, block->offset,
-            block->length, (uint64_t) (block->local_host_addr + block->length),
+    DDPRINTF("Added Block: %d, addr: %p, offset: %" PRIu64
+           " length: %" PRIu64 " end: %p bits %" PRIu64 " chunks %d\n",
+            local->nb_blocks, block->local_host_addr, block->offset,
+            block->length, block->local_host_addr + block->length,
                 BITS_TO_LONGS(block->nb_chunks) *
                     sizeof(unsigned long) * 8, block->nb_chunks);
 
@@ -621,35 +896,51 @@ static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
     return 0;
 }
 
-static int __qemu_rdma_delete_block(RDMAContext *rdma, ram_addr_t block_offset)
+static void qemu_rdma_free_pmrs(RDMAContext *rdma, RDMALocalBlock *block,
+                               struct ibv_mr ***mrs)
 {
-    RDMALocalBlocks *local = &rdma->local_ram_blocks;
-    RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
-        (void *) block_offset);
-    RDMALocalBlock *old = local->block;
-    int x;
-
-    assert(block);
-
-    if (block->pmr) {
+    if (*mrs) {
         int j;
 
         for (j = 0; j < block->nb_chunks; j++) {
-            if (!block->pmr[j]) {
+            if (!(*mrs)[j]) {
                 continue;
             }
-            ibv_dereg_mr(block->pmr[j]);
+            ibv_dereg_mr((*mrs)[j]);
             rdma->total_registrations--;
         }
-        g_free(block->pmr);
-        block->pmr = NULL;
+        g_free(*mrs);
+
+        *mrs = NULL;
     }
+}
 
-    if (block->mr) {
-        ibv_dereg_mr(block->mr);
+static void qemu_rdma_free_mr(RDMAContext *rdma, struct ibv_mr **mr)
+{
+    if (*mr) {
+        ibv_dereg_mr(*mr);
         rdma->total_registrations--;
-        block->mr = NULL;
+        *mr = NULL;
     }
+}
+
+static int __qemu_rdma_delete_block(RDMAContext *rdma, ram_addr_t block_offset)
+{
+    RDMALocalBlocks *local = &rdma->local_ram_blocks;
+    RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
+        (void *) block_offset);
+    RDMALocalBlock *old = local->block;
+    int x;
+
+    assert(block);
+
+    qemu_rdma_free_pmrs(rdma, block, &block->pmr);
+    qemu_rdma_free_pmrs(rdma, block, &block->pmr_src);
+    qemu_rdma_free_pmrs(rdma, block, &block->pmr_dest);
+
+    qemu_rdma_free_mr(rdma, &block->mr);
+    qemu_rdma_free_mr(rdma, &block->mr_src);
+    qemu_rdma_free_mr(rdma, &block->mr_dest);
 
     g_free(block->transit_bitmap);
     block->transit_bitmap = NULL;
@@ -674,7 +965,12 @@ static int __qemu_rdma_delete_block(RDMAContext *rdma, ram_addr_t block_offset)
         }
 
         if (block->index < (local->nb_blocks - 1)) {
-            memcpy(local->block + block->index, old + (block->index + 1),
+            RDMALocalBlock * end = old + (block->index + 1);
+            for (x = 0; x < (local->nb_blocks - (block->index + 1)); x++) {
+                end[x].index--;
+            }
+
+            memcpy(local->block + block->index, end,
                 sizeof(RDMALocalBlock) *
                     (local->nb_blocks - (block->index + 1)));
         }
@@ -683,6 +979,10 @@ static int __qemu_rdma_delete_block(RDMAContext *rdma, ram_addr_t block_offset)
         local->block = NULL;
     }
 
+    g_free(old);
+
+    local->nb_blocks--;
+
     DDPRINTF("Deleted Block: %d, addr: %" PRIu64 ", offset: %" PRIu64
            " length: %" PRIu64 " end: %" PRIu64 " bits %" PRIu64 " chunks %d\n",
             local->nb_blocks, (uint64_t) block->local_host_addr, block->offset,
@@ -690,10 +990,6 @@ static int __qemu_rdma_delete_block(RDMAContext *rdma, ram_addr_t block_offset)
                 BITS_TO_LONGS(block->nb_chunks) *
                     sizeof(unsigned long) * 8, block->nb_chunks);
 
-    g_free(old);
-
-    local->nb_blocks--;
-
     if (local->nb_blocks) {
         for (x = 0; x < local->nb_blocks; x++) {
             g_hash_table_insert(rdma->blockmap, (void *)local->block[x].offset,
@@ -974,7 +1270,7 @@ route:
         goto err_resolve_get_addr;
     }
     rdma_ack_cm_event(cm_event);
-    rdma->verbs = rdma->cm_id->verbs;
+    rdma->lc_remote.verbs = rdma->cm_id->verbs;
     qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
     qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
     return 0;
@@ -988,49 +1284,43 @@ err_resolve_create_id:
     return ret;
 }
 
-/*
- * Create protection domain and completion queues
- */
-static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
+static int qemu_rdma_alloc_keepalive(RDMAContext *rdma)
 {
-    /* allocate pd */
-    rdma->pd = ibv_alloc_pd(rdma->verbs);
-    if (!rdma->pd) {
-        fprintf(stderr, "failed to allocate protection domain\n");
-        return -1;
-    }
+    rdma->keepalive_mr = ibv_reg_mr(rdma->lc_remote.pd,
+            &rdma->keepalive, sizeof(rdma->keepalive),
+            IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
 
-    /* create completion channel */
-    rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
-    if (!rdma->comp_channel) {
-        fprintf(stderr, "failed to allocate completion channel\n");
-        goto err_alloc_pd_cq;
+    if (!rdma->keepalive_mr) {
+        perror("Failed to register keepalive location!");
+        SET_ERROR(rdma, -ENOMEM);
+        goto err_alloc;
     }
 
-    /*
-     * Completion queue can be filled by both read and write work requests,
-     * so must reflect the sum of both possible queue sizes.
-     */
-    rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
-            NULL, rdma->comp_channel, 0);
-    if (!rdma->cq) {
-        fprintf(stderr, "failed to allocate completion queue\n");
-        goto err_alloc_pd_cq;
+    rdma->next_keepalive_mr = ibv_reg_mr(rdma->lc_remote.pd,
+            &rdma->next_keepalive, sizeof(rdma->next_keepalive),
+            IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
+
+    if (!rdma->next_keepalive_mr) {
+        perror("Failed to register next keepalive location!");
+        SET_ERROR(rdma, -ENOMEM);
+        goto err_alloc;
     }
 
     return 0;
 
-err_alloc_pd_cq:
-    if (rdma->pd) {
-        ibv_dealloc_pd(rdma->pd);
+err_alloc:
+
+    if (rdma->keepalive_mr) {
+        ibv_dereg_mr(rdma->keepalive_mr);
+        rdma->keepalive_mr = NULL;
     }
-    if (rdma->comp_channel) {
-        ibv_destroy_comp_channel(rdma->comp_channel);
+
+    if (rdma->next_keepalive_mr) {
+        ibv_dereg_mr(rdma->next_keepalive_mr);
+        rdma->next_keepalive_mr = NULL;
     }
-    rdma->pd = NULL;
-    rdma->comp_channel = NULL;
-    return -1;
 
+    return -1;
 }
 
 /*
@@ -1041,41 +1331,68 @@ static int qemu_rdma_alloc_qp(RDMAContext *rdma)
     struct ibv_qp_init_attr attr = { 0 };
     int ret;
 
-    attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
+    attr.cap.max_send_wr = RDMA_SEND_MAX;
     attr.cap.max_recv_wr = 3;
     attr.cap.max_send_sge = 1;
     attr.cap.max_recv_sge = 1;
-    attr.send_cq = rdma->cq;
-    attr.recv_cq = rdma->cq;
+    attr.send_cq = rdma->lc_remote.cq;
+    attr.recv_cq = rdma->lc_remote.cq;
     attr.qp_type = IBV_QPT_RC;
 
-    ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
+    ret = rdma_create_qp(rdma->cm_id, rdma->lc_remote.pd, &attr);
     if (ret) {
         return -1;
     }
 
-    rdma->qp = rdma->cm_id->qp;
+    rdma->lc_remote.qp = rdma->cm_id->qp;
     return 0;
 }
 
+static int qemu_rdma_reg_whole_mr(RDMAContext *rdma, 
+                                  struct ibv_pd *pd,
+                                  struct ibv_mr **mr,
+                                  int index)
+{
+    RDMALocalBlocks *local = &rdma->local_ram_blocks;
+
+    *mr = ibv_reg_mr(pd,
+                local->block[index].local_host_addr,
+                local->block[index].length,
+                IBV_ACCESS_LOCAL_WRITE |
+                IBV_ACCESS_REMOTE_WRITE
+                );
+    if (!(*mr)) {
+        perror("Failed to register local dest ram block!\n");
+        return -1;
+    }
+    rdma->total_registrations++;
+
+    return 0;
+};
+
 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
 {
     int i;
     RDMALocalBlocks *local = &rdma->local_ram_blocks;
 
     for (i = 0; i < local->nb_blocks; i++) {
-        local->block[i].mr =
-            ibv_reg_mr(rdma->pd,
-                    local->block[i].local_host_addr,
-                    local->block[i].length,
-                    IBV_ACCESS_LOCAL_WRITE |
-                    IBV_ACCESS_REMOTE_WRITE
-                    );
-        if (!local->block[i].mr) {
-            perror("Failed to register local dest ram block!\n");
+        if (qemu_rdma_reg_whole_mr(rdma, rdma->lc_remote.pd, &local->block[i].mr, i)) {
             break;
         }
-        rdma->total_registrations++;
+
+        /* TODO: make this optional if MC is disabled */
+        if (rdma->source) {
+            if (qemu_rdma_reg_whole_mr(rdma, rdma->lc_src.pd, 
+                    &local->block[i].mr_src, i)) {
+                break;
+            }
+        } else {
+            if (qemu_rdma_reg_whole_mr(rdma, rdma->lc_dest.pd, 
+                    &local->block[i].mr_dest, i)) {
+                break;
+            }
+        }
+
     }
 
     if (i >= local->nb_blocks) {
@@ -1083,8 +1400,10 @@ static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
     }
 
     for (i--; i >= 0; i--) {
-        ibv_dereg_mr(local->block[i].mr);
-        rdma->total_registrations--;
+        qemu_rdma_free_mr(rdma, &local->block[i].mr);
+        qemu_rdma_free_mr(rdma, rdma->source ?
+                                &local->block[i].mr_src :
+                                &local->block[i].mr_dest);
     }
 
     return -1;
@@ -1129,24 +1448,34 @@ static int qemu_rdma_search_ram_block(RDMAContext *rdma,
  * to perform the actual RDMA operation.
  */
 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
-        RDMALocalBlock *block, uint8_t *host_addr,
-        uint32_t *lkey, uint32_t *rkey, int chunk,
-        uint8_t *chunk_start, uint8_t *chunk_end)
+                                           RDMACurrentChunk *cc,
+                                           RDMALocalContext *lc,
+                                           bool copy,
+                                           uint32_t *lkey, 
+                                           uint32_t *rkey)
 {
-    if (block->mr) {
+    struct ibv_mr ***pmr = copy ? (rdma->source ? &cc->block->pmr_src : 
+                           &cc->block->pmr_dest) : &cc->block->pmr;
+    struct ibv_mr **mr = copy ? (rdma->source ? &cc->block->mr_src :
+                         &cc->block->mr_dest) : &cc->block->mr;
+
+    /*
+     * Use pre-registered keys for the entire VM, if available.
+     */
+    if (*mr) {
         if (lkey) {
-            *lkey = block->mr->lkey;
+            *lkey = (*mr)->lkey;
         }
         if (rkey) {
-            *rkey = block->mr->rkey;
+            *rkey = (*mr)->rkey;
         }
         return 0;
     }
 
     /* allocate memory to store chunk MRs */
-    if (!block->pmr) {
-        block->pmr = g_malloc0(block->nb_chunks * sizeof(struct ibv_mr *));
-        if (!block->pmr) {
+    if (!(*pmr)) {
+        *pmr = g_malloc0(cc->block->nb_chunks * sizeof(struct ibv_mr *));
+        if (!(*pmr)) {
             return -1;
         }
     }
@@ -1154,38 +1483,38 @@ static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
     /*
      * If 'rkey', then we're the destination, so grant access to the source.
      *
-     * If 'lkey', then we're the source VM, so grant access only to ourselves.
+     * If 'lkey', then we're the source, so grant access only to ourselves.
      */
-    if (!block->pmr[chunk]) {
-        uint64_t len = chunk_end - chunk_start;
+    if (!(*pmr)[cc->chunk_idx]) {
+        uint64_t len = cc->chunk_end - cc->chunk_start;
 
         DDPRINTF("Registering %" PRIu64 " bytes @ %p\n",
-                 len, chunk_start);
+                 len, cc->chunk_start);
 
-        block->pmr[chunk] = ibv_reg_mr(rdma->pd,
-                chunk_start, len,
-                (rkey ? (IBV_ACCESS_LOCAL_WRITE |
-                        IBV_ACCESS_REMOTE_WRITE) : 0));
+        (*pmr)[cc->chunk_idx] = ibv_reg_mr(lc->pd, cc->chunk_start, len,
+                    (rkey ? (IBV_ACCESS_LOCAL_WRITE |
+                            IBV_ACCESS_REMOTE_WRITE) : 0));
 
-        if (!block->pmr[chunk]) {
+        if (!(*pmr)[cc->chunk_idx]) {
             perror("Failed to register chunk!");
-            fprintf(stderr, "Chunk details: block: %d chunk index %d"
+            fprintf(stderr, "Chunk details: block: %d chunk index %" PRIu64
                             " start %" PRIu64 " end %" PRIu64 " host %" PRIu64
                             " local %" PRIu64 " registrations: %d\n",
-                            block->index, chunk, (uint64_t) chunk_start,
-                            (uint64_t) chunk_end, (uint64_t) host_addr,
-                            (uint64_t) block->local_host_addr,
+                            cc->block->index, cc->chunk_idx, (uint64_t) cc->chunk_start,
+                            (uint64_t) cc->chunk_end, (uint64_t) cc->addr,
+                            (uint64_t) cc->block->local_host_addr,
                             rdma->total_registrations);
             return -1;
         }
+
         rdma->total_registrations++;
     }
 
     if (lkey) {
-        *lkey = block->pmr[chunk]->lkey;
+        *lkey = (*pmr)[cc->chunk_idx]->lkey;
     }
     if (rkey) {
-        *rkey = block->pmr[chunk]->rkey;
+        *rkey = (*pmr)[cc->chunk_idx]->rkey;
     }
     return 0;
 }
@@ -1196,7 +1525,7 @@ static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
  */
 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
 {
-    rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
+    rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->lc_remote.pd,
             rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
             IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
     if (rdma->wr_data[idx].control_mr) {
@@ -1257,11 +1586,11 @@ static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
         uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
         uint64_t chunk =
             (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
-        uint64_t index =
+        uint64_t block_index =
             (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
         RDMALocalBlock *block =
-            &(rdma->local_ram_blocks.block[index]);
-        RDMARegister reg = { .current_index = index };
+            &(rdma->local_ram_blocks.block[block_index]);
+        RDMARegister reg = { .current_block_idx = block_index };
         RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
                                  };
         RDMAControlHeader head = { .len = sizeof(RDMARegister),
@@ -1275,7 +1604,7 @@ static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
         rdma->unregistrations[rdma->unregister_current] = 0;
         rdma->unregister_current++;
 
-        if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
+        if (rdma->unregister_current == RDMA_SEND_MAX) {
             rdma->unregister_current = 0;
         }
 
@@ -1339,7 +1668,7 @@ static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
                                         uint64_t chunk, uint64_t wr_id)
 {
     if (rdma->unregistrations[rdma->unregister_next] != 0) {
-        fprintf(stderr, "rdma migration: queue is full!\n");
+        ERROR(NULL, "queue is full!");
     } else {
         RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
 
@@ -1350,7 +1679,7 @@ static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
             rdma->unregistrations[rdma->unregister_next++] =
                     qemu_rdma_make_wrid(wr_id, index, chunk);
 
-            if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) {
+            if (rdma->unregister_next == RDMA_SEND_MAX) {
                 rdma->unregister_next = 0;
             }
         } else {
@@ -1365,14 +1694,21 @@ static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
  * (of any kind) has completed.
  * Return the work request ID that completed.
  */
-static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
+static uint64_t qemu_rdma_poll(RDMAContext *rdma,
+                               RDMALocalContext *lc, 
+                               uint64_t *wr_id_out,
                                uint32_t *byte_len)
 {
+    int64_t current_time;
     int ret;
     struct ibv_wc wc;
     uint64_t wr_id;
 
-    ret = ibv_poll_cq(rdma->cq, 1, &wc);
+    if (!lc->start_time) {
+        lc->start_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
+    }
+
+    ret = ibv_poll_cq(lc->cq, 1, &wc);
 
     if (!ret) {
         *wr_id_out = RDMA_WRID_NONE;
@@ -1397,29 +1733,48 @@ static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
     if (rdma->control_ready_expected &&
         (wr_id >= RDMA_WRID_RECV_CONTROL)) {
         DDDPRINTF("completion %s #%" PRId64 " received (%" PRId64 ")"
-                  " left %d\n", wrid_desc[RDMA_WRID_RECV_CONTROL],
-                  wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
+                  " left %d (per qp %d)\n", 
+                  wrid_desc[RDMA_WRID_RECV_CONTROL],
+                  wr_id - RDMA_WRID_RECV_CONTROL, wr_id, 
+                  rdma->nb_sent, lc->nb_sent);
         rdma->control_ready_expected = 0;
     }
 
-    if (wr_id == RDMA_WRID_RDMA_WRITE) {
+    if (wr_id == RDMA_WRID_RDMA_WRITE_REMOTE) {
         uint64_t chunk =
             (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
-        uint64_t index =
+        uint64_t block_idx =
             (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
-        RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
-
-        DDDPRINTF("completions %s (%" PRId64 ") left %d, "
-                 "block %" PRIu64 ", chunk: %" PRIu64 " %p %p\n",
-                 print_wrid(wr_id), wr_id, rdma->nb_sent, index, chunk,
-                 block->local_host_addr, (void *)block->remote_host_addr);
+        RDMALocalBlock *block = &(rdma->local_ram_blocks.block[block_idx]);
 
         clear_bit(chunk, block->transit_bitmap);
 
+        if (lc->nb_sent > lc->max_nb_sent) {
+            lc->max_nb_sent = lc->nb_sent;
+        }
+
+        current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
+        
+        if ((current_time - lc->start_time) > 1000) {
+            lc->start_time = current_time;
+            DDPRINTF("outstanding %s total: %d context: %d max %d\n",
+                lc->id_str, rdma->nb_sent, lc->nb_sent, lc->max_nb_sent);
+        }
+
         if (rdma->nb_sent > 0) {
             rdma->nb_sent--;
         }
 
+        if (lc->nb_sent > 0) {
+            lc->nb_sent--;
+        }
+
+        DDDPRINTF("completions %s (%" PRId64 ") left %d (per qp %d), "
+                 "block %" PRIu64 ", chunk: %" PRIu64 " %p %p\n",
+                 print_wrid(wr_id), wr_id, rdma->nb_sent, lc->nb_sent,
+                 block_idx, chunk, block->local_host_addr, 
+                 (void *)block->remote_host_addr);
+
         if (!rdma->pin_all) {
             /*
              * FYI: If one wanted to signal a specific chunk to be unregistered
@@ -1428,12 +1783,15 @@ static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
              * unregistered later.
              */
 #ifdef RDMA_UNREGISTRATION_EXAMPLE
-            qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id);
+             if (block->pmr[chunk]) { 
+                 qemu_rdma_signal_unregister(rdma, block_idx, chunk, wc.wr_id);
+             }
 #endif
         }
     } else {
-        DDDPRINTF("other completion %s (%" PRId64 ") received left %d\n",
-            print_wrid(wr_id), wr_id, rdma->nb_sent);
+        DDDPRINTF("other completion %s (%" 
+                  PRId64 ") received left %d (per qp %d)\n",
+            print_wrid(wr_id), wr_id, rdma->nb_sent, lc->nb_sent);
     }
 
     *wr_id_out = wc.wr_id;
@@ -1457,7 +1815,9 @@ static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
  * completions only need to be recorded, but do not actually
  * need further processing.
  */
-static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
+static int qemu_rdma_block_for_wrid(RDMAContext *rdma, 
+                                    RDMALocalContext *lc,
+                                    int wrid_requested,
                                     uint32_t *byte_len)
 {
     int num_cq_events = 0, ret = 0;
@@ -1465,12 +1825,15 @@ static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
     void *cq_ctx;
     uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
 
-    if (ibv_req_notify_cq(rdma->cq, 0)) {
-        return -1;
+    ret = ibv_req_notify_cq(lc->cq, 0);
+    if (ret) {
+        perror("ibv_req_notify_cq");
+        return -ret;
     }
+
     /* poll cq first */
     while (wr_id != wrid_requested) {
-        ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
+        ret = qemu_rdma_poll(rdma, lc, &wr_id_in, byte_len);
         if (ret < 0) {
             return ret;
         }
@@ -1496,23 +1859,27 @@ static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
          * Coroutine doesn't start until process_incoming_migration()
          * so don't yield unless we know we're running inside of a coroutine.
          */
-        if (rdma->migration_started_on_destination) {
-            yield_until_fd_readable(rdma->comp_channel->fd);
+        if (qemu_in_coroutine()) {
+            yield_until_fd_readable(lc->comp_chan->fd);
         }
 
-        if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) {
+        ret = ibv_get_cq_event(lc->comp_chan, &cq, &cq_ctx);
+        if (ret < 0) {
             perror("ibv_get_cq_event");
             goto err_block_for_wrid;
         }
 
         num_cq_events++;
 
-        if (ibv_req_notify_cq(cq, 0)) {
+        ret = ibv_req_notify_cq(cq, 0);
+        if (ret) {
+            ret = -ret;
+            perror("ibv_req_notify_cq");
             goto err_block_for_wrid;
         }
 
         while (wr_id != wrid_requested) {
-            ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
+            ret = qemu_rdma_poll(rdma, lc, &wr_id_in, byte_len);
             if (ret < 0) {
                 goto err_block_for_wrid;
             }
@@ -1589,18 +1956,19 @@ static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
     }
 
 
-    if (ibv_post_send(rdma->qp, &send_wr, &bad_wr)) {
+    if (ibv_post_send(rdma->lc_remote.qp, &send_wr, &bad_wr)) {
         return -1;
     }
 
     if (ret < 0) {
-        fprintf(stderr, "Failed to use post IB SEND for control!\n");
+        ERROR(NULL, "using post IB SEND for control!");
         return ret;
     }
 
-    ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
+    ret = qemu_rdma_block_for_wrid(rdma, &rdma->lc_remote,
+                                   RDMA_WRID_SEND_CONTROL, NULL);
     if (ret < 0) {
-        fprintf(stderr, "rdma migration: send polling control error!\n");
+        ERROR(NULL, "send polling control!");
     }
 
     return ret;
@@ -1626,7 +1994,7 @@ static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
                                  };
 
 
-    if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
+    if (ibv_post_recv(rdma->lc_remote.qp, &recv_wr, &bad_wr)) {
         return -1;
     }
 
@@ -1640,11 +2008,12 @@ static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
                 RDMAControlHeader *head, int expecting, int idx)
 {
     uint32_t byte_len;
-    int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
+    int ret = qemu_rdma_block_for_wrid(rdma, &rdma->lc_remote,
+                                       RDMA_WRID_RECV_CONTROL + idx,
                                        &byte_len);
 
     if (ret < 0) {
-        fprintf(stderr, "rdma migration: recv polling control error!\n");
+        ERROR(NULL, "recv polling control!");
         return ret;
     }
 
@@ -1731,8 +2100,7 @@ static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
     if (resp) {
         ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
         if (ret) {
-            fprintf(stderr, "rdma migration: error posting"
-                    " extra control recv for anticipated result!");
+            ERROR(NULL, "posting extra control recv for anticipated result!");
             return ret;
         }
     }
@@ -1742,7 +2110,7 @@ static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
      */
     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
     if (ret) {
-        fprintf(stderr, "rdma migration: error posting first control recv!");
+        ERROR(NULL, "posting first control recv!");
         return ret;
     }
 
@@ -1752,7 +2120,7 @@ static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
     ret = qemu_rdma_post_send_control(rdma, data, head);
 
     if (ret < 0) {
-        fprintf(stderr, "Failed to send control buffer!\n");
+        ERROR(NULL, "sending control buffer!");
         return ret;
     }
 
@@ -1829,64 +2197,80 @@ static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
      */
     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
     if (ret) {
-        fprintf(stderr, "rdma migration: error posting second control recv!");
+        ERROR(NULL, "posting second control recv!");
         return ret;
     }
 
     return 0;
 }
 
-/*
- * Write an actual chunk of memory using RDMA.
- *
- * If we're using dynamic registration on the dest-side, we have to
- * send a registration command first.
- */
-static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
-                               int current_index, uint64_t current_addr,
-                               uint64_t length)
+static inline void install_boundaries(RDMAContext *rdma, RDMACurrentChunk *cc)
 {
-    struct ibv_sge sge;
-    struct ibv_send_wr send_wr = { 0 };
-    struct ibv_send_wr *bad_wr;
-    int reg_result_idx, ret, count = 0;
-    uint64_t chunk, chunks;
-    uint8_t *chunk_start, *chunk_end;
-    RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
-    RDMARegister reg;
-    RDMARegisterResult *reg_result;
-    RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
-    RDMAControlHeader head = { .len = sizeof(RDMARegister),
+    uint64_t len = cc->block->is_ram_block ? 
+                   cc->current_length : cc->block->length;
+
+    cc->chunks = len / (1UL << RDMA_REG_CHUNK_SHIFT);
+
+    if (cc->chunks && ((len % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
+        cc->chunks--;
+    }
+
+    cc->addr = (uint8_t *) (uint64_t)(cc->block->local_host_addr +
+                                 (cc->current_addr - cc->block->offset));
+
+    cc->chunk_idx = ram_chunk_index(cc->block->local_host_addr, cc->addr);
+    cc->chunk_start = ram_chunk_start(cc->block, cc->chunk_idx);
+    cc->chunk_end = ram_chunk_end(cc->block, cc->chunk_idx + cc->chunks);
+
+    DDPRINTF("Block %d chunk %" PRIu64 " has %" PRIu64
+             " chunks, (%" PRIu64 " MB)\n", cc->block->index, cc->chunk_idx,
+                cc->chunks + 1, (cc->chunks + 1) * 
+                    (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
+
+}
+
+/*
+ * Push out any unwritten RDMA operations.
+ */
+static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
+                                 RDMACurrentChunk *src,
+                                 RDMACurrentChunk *dest)
+{
+    struct ibv_sge sge;
+    struct ibv_send_wr send_wr = { 0 };
+    struct ibv_send_wr *bad_wr;
+    int reg_result_idx, ret, count = 0;
+    bool copy;
+    RDMALocalContext *lc;
+    RDMARegister reg;
+    RDMARegisterResult *reg_result;
+    RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
+    RDMAControlHeader head = { .len = sizeof(RDMARegister),
                                .type = RDMA_CONTROL_REGISTER_REQUEST,
                                .repeat = 1,
                              };
 
-retry:
-    sge.addr = (uint64_t)(block->local_host_addr +
-                            (current_addr - block->offset));
-    sge.length = length;
-
-    chunk = ram_chunk_index(block->local_host_addr, (uint8_t *) sge.addr);
-    chunk_start = ram_chunk_start(block, chunk);
+    if (!src->current_length) {
+        return 0;
+    }
 
-    if (block->is_ram_block) {
-        chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
+    if (dest == src) {
+        dest = NULL;
+    }
 
-        if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
-            chunks--;
-        }
-    } else {
-        chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
+    copy = dest ? true : false;
 
-        if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
-            chunks--;
-        }
-    }
+    lc = copy ? 
+        (rdma->source ? &rdma->lc_src : &rdma->lc_dest) : &rdma->lc_remote;
 
-    DDPRINTF("Writing %" PRIu64 " chunks, (%" PRIu64 " MB)\n",
-        chunks + 1, (chunks + 1) * (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
+retry:
+    src->block = &(rdma->local_ram_blocks.block[src->current_block_idx]);
+    install_boundaries(rdma, src);
 
-    chunk_end = ram_chunk_end(block, chunk + chunks);
+    if (dest) {
+        dest->block = &(rdma->local_ram_blocks.block[dest->current_block_idx]);
+        install_boundaries(rdma, dest);
+    }
 
     if (!rdma->pin_all) {
 #ifdef RDMA_UNREGISTRATION_EXAMPLE
@@ -1894,49 +2278,54 @@ retry:
 #endif
     }
 
-    while (test_bit(chunk, block->transit_bitmap)) {
+    while (test_bit(src->chunk_idx, src->block->transit_bitmap)) {
         (void)count;
         DDPRINTF("(%d) Not clobbering: block: %d chunk %" PRIu64
-                " current %" PRIu64 " len %" PRIu64 " %d %d\n",
-                count++, current_index, chunk,
-                sge.addr, length, rdma->nb_sent, block->nb_chunks);
+                " current %" PRIu64 " len %" PRIu64 " left %d (per qp %d) %d\n",
+                count++, src->current_block_idx, src->chunk_idx,
+                (uint64_t) src->addr, src->current_length, 
+                rdma->nb_sent, lc->nb_sent, src->block->nb_chunks);
 
-        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
+        ret = qemu_rdma_block_for_wrid(rdma, lc, 
+                                       RDMA_WRID_RDMA_WRITE_REMOTE, NULL);
 
         if (ret < 0) {
             fprintf(stderr, "Failed to Wait for previous write to complete "
                     "block %d chunk %" PRIu64
-                    " current %" PRIu64 " len %" PRIu64 " %d\n",
-                    current_index, chunk, sge.addr, length, rdma->nb_sent);
+                    " current %" PRIu64 " len %" PRIu64 " %d (per qp %d)\n",
+                    src->current_block_idx, src->chunk_idx, (uint64_t) src->addr, 
+                    src->current_length, rdma->nb_sent, lc->nb_sent);
             return ret;
         }
     }
 
-    if (!rdma->pin_all || !block->is_ram_block) {
-        if (!block->remote_keys[chunk]) {
+    if (!rdma->pin_all || !src->block->is_ram_block) {
+        if (!src->block->remote_keys[src->chunk_idx]) {
             /*
              * This chunk has not yet been registered, so first check to see
              * if the entire chunk is zero. If so, tell the other size to
              * memset() + madvise() the entire chunk without RDMA.
              */
 
-            if (can_use_buffer_find_nonzero_offset((void *)sge.addr, length)
-                   && buffer_find_nonzero_offset((void *)sge.addr,
-                                                    length) == length) {
+            if (src->block->is_ram_block &&
+                   can_use_buffer_find_nonzero_offset(src->addr, src->current_length)
+                   && buffer_find_nonzero_offset(src->addr,
+                                                    src->current_length) == src->current_length) {
                 RDMACompress comp = {
-                                        .offset = current_addr,
+                                        .offset = src->current_addr,
                                         .value = 0,
-                                        .block_idx = current_index,
-                                        .length = length,
+                                        .block_idx = src->current_block_idx,
+                                        .length = src->current_length,
                                     };
 
                 head.len = sizeof(comp);
                 head.type = RDMA_CONTROL_COMPRESS;
 
-                DDPRINTF("Entire chunk is zero, sending compress: %"
-                    PRIu64 " for %d "
-                    "bytes, index: %d, offset: %" PRId64 "...\n",
-                    chunk, sge.length, current_index, current_addr);
+                DDPRINTF("Entire chunk is zero, sending compress: %" PRIu64 
+                         " for %" PRIu64 " bytes, index: %d"
+                         ", offset: %" PRId64 "...\n",
+                         src->chunk_idx, src->current_length, 
+                         src->current_block_idx, src->current_addr);
 
                 compress_to_network(&comp);
                 ret = qemu_rdma_exchange_send(rdma, &head,
@@ -1946,109 +2335,125 @@ retry:
                     return -EIO;
                 }
 
-                acct_update_position(f, sge.length, true);
+                acct_update_position(f, src->current_length, true);
 
                 return 1;
             }
 
             /*
-             * Otherwise, tell other side to register.
+             * Otherwise, tell other side to register. (Only for remote RDMA)
              */
-            reg.current_index = current_index;
-            if (block->is_ram_block) {
-                reg.key.current_addr = current_addr;
-            } else {
-                reg.key.chunk = chunk;
-            }
-            reg.chunks = chunks;
+            if (!dest) {
+                reg.current_block_idx = src->current_block_idx;
+                if (src->block->is_ram_block) {
+                    reg.key.current_addr = src->current_addr;
+                } else {
+                    reg.key.chunk = src->chunk_idx;
+                }
+                reg.chunks = src->chunks;
 
-            DDPRINTF("Sending registration request chunk %" PRIu64 " for %d "
-                    "bytes, index: %d, offset: %" PRId64 "...\n",
-                    chunk, sge.length, current_index, current_addr);
+                DDPRINTF("Sending registration request chunk %" PRIu64 
+                         " for %" PRIu64 " bytes, index: %d, offset: %" 
+                         PRId64 "...\n",
+                         src->chunk_idx, src->current_length, 
+                         src->current_block_idx, src->current_addr);
 
-            register_to_network(&reg);
-            ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
-                                    &resp, &reg_result_idx, NULL);
-            if (ret < 0) {
-                return ret;
+                register_to_network(&reg);
+                ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
+                                        &resp, &reg_result_idx, NULL);
+                if (ret < 0) {
+                    return ret;
+                }
             }
 
             /* try to overlap this single registration with the one we sent. */
-            if (qemu_rdma_register_and_get_keys(rdma, block,
-                                                (uint8_t *) sge.addr,
-                                                &sge.lkey, NULL, chunk,
-                                                chunk_start, chunk_end)) {
+            if (qemu_rdma_register_and_get_keys(rdma, src, lc, copy, 
+                                                &sge.lkey, NULL)) {
                 fprintf(stderr, "cannot get lkey!\n");
                 return -EINVAL;
             }
 
-            reg_result = (RDMARegisterResult *)
-                    rdma->wr_data[reg_result_idx].control_curr;
+            if (!dest) {
+                reg_result = (RDMARegisterResult *)
+                        rdma->wr_data[reg_result_idx].control_curr;
 
-            network_to_result(reg_result);
+                network_to_result(reg_result);
 
-            DDPRINTF("Received registration result:"
-                    " my key: %x their key %x, chunk %" PRIu64 "\n",
-                    block->remote_keys[chunk], reg_result->rkey, chunk);
+                DDPRINTF("Received registration result:"
+                        " my key: %x their key %x, chunk %" PRIu64 "\n",
+                        src->block->remote_keys[src->chunk_idx], 
+                        reg_result->rkey, src->chunk_idx);
 
-            block->remote_keys[chunk] = reg_result->rkey;
-            block->remote_host_addr = reg_result->host_addr;
+                src->block->remote_keys[src->chunk_idx] = reg_result->rkey;
+                src->block->remote_host_addr = reg_result->host_addr;
+            }
         } else {
             /* already registered before */
-            if (qemu_rdma_register_and_get_keys(rdma, block,
-                                                (uint8_t *)sge.addr,
-                                                &sge.lkey, NULL, chunk,
-                                                chunk_start, chunk_end)) {
+            if (qemu_rdma_register_and_get_keys(rdma, src, lc, copy,
+                                                &sge.lkey, NULL)) {
                 fprintf(stderr, "cannot get lkey!\n");
                 return -EINVAL;
             }
         }
 
-        send_wr.wr.rdma.rkey = block->remote_keys[chunk];
+        send_wr.wr.rdma.rkey = src->block->remote_keys[src->chunk_idx];
     } else {
-        send_wr.wr.rdma.rkey = block->remote_rkey;
+        send_wr.wr.rdma.rkey = src->block->remote_rkey;
 
-        if (qemu_rdma_register_and_get_keys(rdma, block, (uint8_t *)sge.addr,
-                                                     &sge.lkey, NULL, chunk,
-                                                     chunk_start, chunk_end)) {
+        if (qemu_rdma_register_and_get_keys(rdma, src, lc, copy, 
+                                            &sge.lkey, NULL)) {
             fprintf(stderr, "cannot get lkey!\n");
             return -EINVAL;
         }
     }
 
+    if (dest) {
+        if (qemu_rdma_register_and_get_keys(rdma, dest,
+                                            &rdma->lc_dest, copy,
+                                            NULL, &send_wr.wr.rdma.rkey)) {
+            fprintf(stderr, "cannot get rkey!\n");
+            return -EINVAL;
+        }
+    }
+
     /*
      * Encode the ram block index and chunk within this wrid.
      * We will use this information at the time of completion
      * to figure out which bitmap to check against and then which
      * chunk in the bitmap to look for.
      */
-    send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
-                                        current_index, chunk);
+    send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE_REMOTE,
+                                        src->current_block_idx, src->chunk_idx);
 
+    sge.length = src->current_length;
+    sge.addr = (uint64_t) src->addr;
     send_wr.opcode = IBV_WR_RDMA_WRITE;
     send_wr.send_flags = IBV_SEND_SIGNALED;
     send_wr.sg_list = &sge;
     send_wr.num_sge = 1;
-    send_wr.wr.rdma.remote_addr = block->remote_host_addr +
-                                (current_addr - block->offset);
+    send_wr.wr.rdma.remote_addr = (dest ? (uint64_t) dest->addr : 
+                (src->block->remote_host_addr + 
+                    (src->current_addr - src->block->offset)));
 
-    DDDPRINTF("Posting chunk: %" PRIu64 ", addr: %lx"
-              " remote: %lx, bytes %" PRIu32 "\n",
-              chunk, sge.addr, send_wr.wr.rdma.remote_addr,
-              sge.length);
+    DDPRINTF("Posting chunk: %" PRIu64 ", addr: %lx"
+             " remote: %lx, bytes %" PRIu32 " lkey %" PRIu32 
+             " rkey %" PRIu32 "\n",
+             src->chunk_idx, sge.addr, 
+             send_wr.wr.rdma.remote_addr, sge.length,
+             sge.lkey, send_wr.wr.rdma.rkey);
 
     /*
      * ibv_post_send() does not return negative error numbers,
      * per the specification they are positive - no idea why.
      */
-    ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
+    ret = ibv_post_send(lc->qp, &send_wr, &bad_wr);
 
     if (ret == ENOMEM) {
         DDPRINTF("send queue is full. wait a little....\n");
-        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
+        ret = qemu_rdma_block_for_wrid(rdma, lc,
+                                       RDMA_WRID_RDMA_WRITE_REMOTE, NULL);
         if (ret < 0) {
-            fprintf(stderr, "rdma migration: failed to make "
-                            "room in full send queue! %d\n", ret);
+            ERROR(NULL, "could not make room in full send queue! %d", ret);
             return ret;
         }
 
@@ -2059,80 +2464,66 @@ retry:
         return -ret;
     }
 
-    set_bit(chunk, block->transit_bitmap);
-    acct_update_position(f, sge.length, false);
-    rdma->total_writes++;
-
-    return 0;
-}
-
-/*
- * Push out any unwritten RDMA operations.
- *
- * We support sending out multiple chunks at the same time.
- * Not all of them need to get signaled in the completion queue.
- */
-static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
-{
-    int ret;
+    set_bit(src->chunk_idx, src->block->transit_bitmap);
 
-    if (!rdma->current_length) {
-        return 0;
+    if (!dest) {
+        acct_update_position(f, sge.length, false);
     }
 
-    ret = qemu_rdma_write_one(f, rdma,
-            rdma->current_index, rdma->current_addr, rdma->current_length);
+    rdma->total_writes++;
+    rdma->nb_sent++;
+    lc->nb_sent++;
 
-    if (ret < 0) {
-        return ret;
-    }
+    DDDPRINTF("sent total: %d sent lc: %d\n", rdma->nb_sent, lc->nb_sent);
 
-    if (ret == 0) {
-        rdma->nb_sent++;
-        DDDPRINTF("sent total: %d\n", rdma->nb_sent);
-    }
+    src->current_length = 0;
+    src->current_addr = 0;
 
-    rdma->current_length = 0;
-    rdma->current_addr = 0;
+    if (dest) {
+        dest->current_length = 0;
+        dest->current_addr = 0;
+    }
 
     return 0;
 }
 
 static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
-                    uint64_t offset, uint64_t len)
+                                            RDMACurrentChunk *cc,
+                                            uint64_t current_addr, 
+                                            uint64_t len)
 {
     RDMALocalBlock *block;
     uint8_t *host_addr;
     uint8_t *chunk_end;
 
-    if (rdma->current_index < 0) {
+    if (cc->current_block_idx < 0) {
         return 0;
     }
 
-    if (rdma->current_chunk < 0) {
+    if (cc->current_chunk < 0) {
         return 0;
     }
 
-    block = &(rdma->local_ram_blocks.block[rdma->current_index]);
-    host_addr = block->local_host_addr + (offset - block->offset);
-    chunk_end = ram_chunk_end(block, rdma->current_chunk);
+    block = &(rdma->local_ram_blocks.block[cc->current_block_idx]);
+    host_addr = block->local_host_addr + (current_addr - block->offset);
+    chunk_end = ram_chunk_end(block, cc->current_chunk);
 
-    if (rdma->current_length == 0) {
+    if (cc->current_length == 0) {
         return 0;
     }
 
     /*
      * Only merge into chunk sequentially.
      */
-    if (offset != (rdma->current_addr + rdma->current_length)) {
+    if (current_addr != (cc->current_addr + cc->current_length)) {
         return 0;
     }
 
-    if (offset < block->offset) {
+    if (current_addr < block->offset) {
         return 0;
     }
 
-    if ((offset + len) > (block->offset + block->length)) {
+    if ((current_addr + len) > (block->offset + block->length)) {
         return 0;
     }
 
@@ -2143,72 +2534,121 @@ static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
     return 1;
 }
 
-/*
- * We're not actually writing here, but doing three things:
- *
- * 1. Identify the chunk the buffer belongs to.
- * 2. If the chunk is full or the buffer doesn't belong to the current
- *    chunk, then start a new chunk and flush() the old chunk.
- * 3. To keep the hardware busy, we also group chunks into batches
- *    and only require that a batch gets acknowledged in the completion
- *    qeueue instead of each individual chunk.
+static int write_start(RDMAContext *rdma,
+                        RDMACurrentChunk *cc,
+                        uint64_t len,
+                        uint64_t current_addr)
+{
+    int ret;
+    uint64_t block_idx, chunk;
+
+    cc->current_addr = current_addr;
+    block_idx = cc->current_block_idx;
+    chunk = cc->current_chunk;
+
+    ret = qemu_rdma_search_ram_block(rdma, cc->block_offset,
+                                     cc->offset, len, &block_idx, &chunk);
+    if (ret) {
+        ERROR(NULL, "ram block search failed");
+        return ret;
+    }
+
+    cc->current_block_idx = block_idx;
+    cc->current_chunk = chunk;
+
+    return 0;
+}
+
+/* 
+ * If we cannot merge it, we flush the current buffer first.
  */
-static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
-                           uint64_t block_offset, uint64_t offset,
-                           uint64_t len)
+static int qemu_rdma_flush_unmergable(RDMAContext *rdma,
+                                      RDMACurrentChunk *src,
+                                      RDMACurrentChunk *dest,
+                                      QEMUFile *f, uint64_t len)
 {
-    uint64_t current_addr = block_offset + offset;
-    uint64_t index = rdma->current_index;
-    uint64_t chunk = rdma->current_chunk;
+    uint64_t current_addr_src;
+    uint64_t current_addr_dest;
     int ret;
 
-    /* If we cannot merge it, we flush the current buffer first. */
-    if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
-        ret = qemu_rdma_write_flush(f, rdma);
-        if (ret) {
-            return ret;
+    current_addr_src = src->block_offset + src->offset;
+
+    if (dest) {
+        current_addr_dest = dest->block_offset + dest->offset;
+    }
+
+    if (qemu_rdma_buffer_mergable(rdma, src, current_addr_src, len)) {
+        if (dest) {
+            if (qemu_rdma_buffer_mergable(rdma, dest, current_addr_dest, len)) {
+                goto merge;
+            }
+        } else {
+            goto merge;
         }
-        rdma->current_length = 0;
-        rdma->current_addr = current_addr;
+    }
+
+    ret = qemu_rdma_write(f, rdma, src, dest);
+
+    if (ret) {
+        return ret;
+    }
+
+    ret = write_start(rdma, src, len, current_addr_src);
+
+    if (ret) {
+        return ret;
+    }
+
+    if (dest) {
+        ret = write_start(rdma, dest, len, current_addr_dest);
 
-        ret = qemu_rdma_search_ram_block(rdma, block_offset,
-                                         offset, len, &index, &chunk);
         if (ret) {
-            fprintf(stderr, "ram block search failed\n");
             return ret;
         }
-        rdma->current_index = index;
-        rdma->current_chunk = chunk;
     }
 
-    /* merge it */
-    rdma->current_length += len;
-
-    /* flush it if buffer is too large */
-    if (rdma->current_length >= RDMA_MERGE_MAX) {
-        return qemu_rdma_write_flush(f, rdma);
+merge:
+    src->current_length += len;
+    if (dest) {
+        dest->current_length += len;
     }
 
     return 0;
 }
 
-static void qemu_rdma_cleanup(RDMAContext *rdma)
+static void qemu_rdma_cleanup(RDMAContext *rdma, bool force)
 {
     struct rdma_cm_event *cm_event;
     int ret, idx;
 
+    if (connection_timer) {
+        timer_del(connection_timer);
+        timer_free(connection_timer);
+        connection_timer = NULL;
+    }
+
+    if (keepalive_timer) {
+        timer_del(keepalive_timer);
+        timer_free(keepalive_timer);
+        keepalive_timer = NULL;
+    }
+
     if (rdma->cm_id && rdma->connected) {
         if (rdma->error_state) {
-            RDMAControlHeader head = { .len = 0,
-                                       .type = RDMA_CONTROL_ERROR,
-                                       .repeat = 1,
-                                     };
-            fprintf(stderr, "Early error. Sending error.\n");
-            qemu_rdma_post_send_control(rdma, NULL, &head);
+            if (rdma->error_state != -ENETUNREACH) {
+                RDMAControlHeader head = { .len = 0,
+                                           .type = RDMA_CONTROL_ERROR,
+                                           .repeat = 1,
+                                         };
+                fprintf(stderr, "Early error. Sending error.\n");
+                qemu_rdma_post_send_control(rdma, NULL, &head);
+            } else {
+                fprintf(stderr, "Early error.\n");
+            }
         }
 
         ret = rdma_disconnect(rdma->cm_id);
-        if (!ret) {
+        if (!ret && !force && (rdma->error_state != -ENETUNREACH)) {
             DDPRINTF("waiting for disconnect\n");
             ret = rdma_get_cm_event(rdma->channel, &cm_event);
             if (!ret) {
@@ -2216,6 +2656,7 @@ static void qemu_rdma_cleanup(RDMAContext *rdma)
             }
         }
         DDPRINTF("Disconnected.\n");
+        rdma->lc_remote.verbs = NULL;
         rdma->connected = false;
     }
 
@@ -2237,22 +2678,10 @@ static void qemu_rdma_cleanup(RDMAContext *rdma)
         }
     }
 
-    if (rdma->qp) {
-        rdma_destroy_qp(rdma->cm_id);
-        rdma->qp = NULL;
-    }
-    if (rdma->cq) {
-        ibv_destroy_cq(rdma->cq);
-        rdma->cq = NULL;
-    }
-    if (rdma->comp_channel) {
-        ibv_destroy_comp_channel(rdma->comp_channel);
-        rdma->comp_channel = NULL;
-    }
-    if (rdma->pd) {
-        ibv_dealloc_pd(rdma->pd);
-        rdma->pd = NULL;
-    }
+    close_ibv(rdma, &rdma->lc_remote);
+    close_ibv(rdma, &rdma->lc_src);
+    close_ibv(rdma, &rdma->lc_dest);
+
     if (rdma->listen_id) {
         rdma_destroy_id(rdma->listen_id);
         rdma->listen_id = NULL;
@@ -2265,12 +2694,24 @@ static void qemu_rdma_cleanup(RDMAContext *rdma)
         rdma_destroy_event_channel(rdma->channel);
         rdma->channel = NULL;
     }
+
     g_free(rdma->host);
     rdma->host = NULL;
+
+    if (rdma->keepalive_mr) {
+        ibv_dereg_mr(rdma->keepalive_mr);
+        rdma->keepalive_mr = NULL;
+    }
+    if (rdma->next_keepalive_mr) {
+        ibv_dereg_mr(rdma->next_keepalive_mr);
+        rdma->next_keepalive_mr = NULL;
+    }
 }
 
 
-static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all)
+static int qemu_rdma_source_init(RDMAContext *rdma,
+                                 Error **errp,
+                                 MigrationState *s)
 {
     int ret, idx;
     Error *local_err = NULL, **temp = &local_err;
@@ -2279,38 +2720,45 @@ static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all)
      * Will be validated against destination's actual capabilities
      * after the connect() completes.
      */
-    rdma->pin_all = pin_all;
+    rdma->pin_all = s->enabled_capabilities[MIGRATION_CAPABILITY_X_RDMA_PIN_ALL];
+    rdma->do_keepalive = s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_KEEPALIVE];
 
     ret = qemu_rdma_resolve_host(rdma, temp);
     if (ret) {
         goto err_rdma_source_init;
     }
 
-    ret = qemu_rdma_alloc_pd_cq(rdma);
+    ret = qemu_rdma_alloc_pd_cq(rdma, &rdma->lc_remote);
     if (ret) {
-        ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
+        ERROR(temp, "allocating pd and cq! Your mlock()"
                     " limits may be too low. Please check $ ulimit -a # and "
                     "search for 'ulimit -l' in the output");
         goto err_rdma_source_init;
     }
 
+    ret = qemu_rdma_alloc_keepalive(rdma);
+
+    if (ret) {
+        ERROR(temp, "allocating keepalive structures");
+        goto err_rdma_source_init;
+    }
+
     ret = qemu_rdma_alloc_qp(rdma);
     if (ret) {
-        ERROR(temp, "rdma migration: error allocating qp!");
+        ERROR(temp, "allocating qp!");
         goto err_rdma_source_init;
     }
 
     ret = qemu_rdma_init_ram_blocks(rdma);
     if (ret) {
-        ERROR(temp, "rdma migration: error initializing ram blocks!");
+        ERROR(temp, "initializing ram blocks!");
         goto err_rdma_source_init;
     }
 
     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
         ret = qemu_rdma_reg_control(rdma, idx);
         if (ret) {
-            ERROR(temp, "rdma migration: error registering %d control!",
-                                                            idx);
+            ERROR(temp, "registering %d control!", idx);
             goto err_rdma_source_init;
         }
     }
@@ -2319,7 +2767,7 @@ static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all)
 
 err_rdma_source_init:
     error_propagate(errp, local_err);
-    qemu_rdma_cleanup(rdma);
+    qemu_rdma_cleanup(rdma, false);
     return -1;
 }
 
@@ -2328,6 +2776,8 @@ static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
     RDMACapabilities cap = {
                                 .version = RDMA_CONTROL_VERSION_CURRENT,
                                 .flags = 0,
+                                .keepalive_rkey = rdma->keepalive_mr->rkey,
+                                .keepalive_addr = (uint64_t) &rdma->keepalive,
                            };
     struct rdma_conn_param conn_param = { .initiator_depth = 2,
                                           .retry_count = 5,
@@ -2346,6 +2796,13 @@ static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
         cap.flags |= RDMA_CAPABILITY_PIN_ALL;
     }
 
+    if (rdma->do_keepalive) {
+        DPRINTF("Keepalives requested.\n");
+        cap.flags |= RDMA_CAPABILITY_KEEPALIVE;
+    }
+
+    DDPRINTF("Sending keepalive params: key %x addr: %" PRIx64 "\n",
+            cap.keepalive_rkey, cap.keepalive_addr);
     caps_to_network(&cap);
 
     ret = rdma_connect(rdma->cm_id, &conn_param);
@@ -2380,6 +2837,12 @@ static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
     network_to_caps(&cap);
 
+    rdma->keepalive_rkey = cap.keepalive_rkey;
+    rdma->keepalive_addr = cap.keepalive_addr;
+
+    DDPRINTF("Received keepalive params: key %x addr: %" PRIx64 "\n",
+            cap.keepalive_rkey, cap.keepalive_addr);
+
     /*
      * Verify that the *requested* capabilities are supported by the destination
      * and disable them otherwise.
@@ -2390,7 +2853,14 @@ static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
         rdma->pin_all = false;
     }
 
+    if (rdma->do_keepalive && !(cap.flags & RDMA_CAPABILITY_KEEPALIVE)) {
+        ERROR(errp, "Server cannot support keepalives. "
+                        "Will not check for them.");
+        rdma->do_keepalive = false;
+    }
+
     DPRINTF("Pin all memory: %s\n", rdma->pin_all ? "enabled" : "disabled");
+    DPRINTF("Keepalives: %s\n", rdma->do_keepalive ? "enabled" : "disabled");
 
     rdma_ack_cm_event(cm_event);
 
@@ -2405,7 +2875,7 @@ static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
     return 0;
 
 err_rdma_source_connect:
-    qemu_rdma_cleanup(rdma);
+    qemu_rdma_cleanup(rdma, false);
     return -1;
 }
 
@@ -2424,14 +2894,14 @@ static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
 
     if (rdma->host == NULL) {
         ERROR(errp, "RDMA host is not set!");
-        rdma->error_state = -EINVAL;
+        SET_ERROR(rdma, -EINVAL);
         return -1;
     }
     /* create CM channel */
     rdma->channel = rdma_create_event_channel();
     if (!rdma->channel) {
         ERROR(errp, "could not create rdma event channel");
-        rdma->error_state = -EINVAL;
+        SET_ERROR(rdma, -EINVAL);
         return -1;
     }
 
@@ -2489,11 +2959,117 @@ err_dest_init_bind_addr:
 err_dest_init_create_listen_id:
     rdma_destroy_event_channel(rdma->channel);
     rdma->channel = NULL;
-    rdma->error_state = ret;
+    SET_ERROR(rdma, ret);
     return ret;
 
 }
 
+static void send_keepalive(void *opaque)
+{
+    RDMAContext *rdma = opaque;
+    struct ibv_sge sge;
+    struct ibv_send_wr send_wr = { 0 };
+    struct ibv_send_wr *bad_wr;
+    int ret;
+
+    if (!rdma->migration_started) {
+        goto reset;
+    }
+
+    rdma->next_keepalive++;
+retry:
+
+    sge.addr = (uint64_t) &rdma->next_keepalive;
+    sge.length = sizeof(rdma->next_keepalive);
+    sge.lkey = rdma->next_keepalive_mr->lkey;
+    send_wr.wr_id = RDMA_WRID_RDMA_KEEPALIVE;
+    send_wr.opcode = IBV_WR_RDMA_WRITE;
+    send_wr.send_flags = 0;
+    send_wr.sg_list = &sge;
+    send_wr.num_sge = 1;
+    send_wr.wr.rdma.remote_addr = rdma->keepalive_addr;
+    send_wr.wr.rdma.rkey = rdma->keepalive_rkey;
+
+    DDPRINTF("Posting keepalive: addr: %lx"
+              " remote: %lx, bytes %" PRIu32 "\n",
+              sge.addr, send_wr.wr.rdma.remote_addr, sge.length);
+
+    ret = ibv_post_send(rdma->lc_remote.qp, &send_wr, &bad_wr);
+
+    if (ret == ENOMEM) {
+        DPRINTF("send queue is full. wait a little....\n");
+        g_usleep(RDMA_KEEPALIVE_INTERVAL_MS * 1000);
+        goto retry;
+    } else if (ret > 0) {
+        perror("rdma migration: post keepalive");
+        SET_ERROR(rdma, -ret);
+        return;
+    }
+
+reset:
+    timer_mod(keepalive_timer, qemu_clock_get_ms(QEMU_CLOCK_REALTIME) +
+                    RDMA_KEEPALIVE_INTERVAL_MS);
+}
+
+static void check_qp_state(void *opaque)
+{
+    RDMAContext *rdma = opaque;
+    int first_missed = 0;
+
+    if (!rdma->migration_started) {
+        goto reset;
+    }
+
+    if (rdma->last_keepalive == rdma->keepalive) {
+        rdma->nb_missed_keepalive++;
+        if (rdma->nb_missed_keepalive == 1) {
+            first_missed = RDMA_KEEPALIVE_FIRST_MISSED_OFFSET;
+            DDPRINTF("Setting first missed additional delay\n");
+        } else {
+            DPRINTF("WARN: missed keepalive: %" PRIu64 "\n",
+                        rdma->nb_missed_keepalive);
+        }
+    } else {
+        rdma->keepalive_startup = true;
+        rdma->nb_missed_keepalive = 0;
+    }
+
+    rdma->last_keepalive = rdma->keepalive;
+
+    if (rdma->keepalive_startup) {
+        if (rdma->nb_missed_keepalive > RDMA_MAX_LOST_KEEPALIVE) {
+            struct ibv_qp_attr attr = {.qp_state = IBV_QPS_ERR };
+            SET_ERROR(rdma, -ENETUNREACH);
+            ERROR(NULL, "peer keepalive failed.");
+             
+            if (ibv_modify_qp(rdma->lc_remote.qp, &attr, IBV_QP_STATE)) {
+                ERROR(NULL, "modify QP to RTR");
+                return;
+            }
+            return;
+        }
+    } else if (rdma->nb_missed_keepalive < RDMA_MAX_STARTUP_MISSED_KEEPALIVE) {
+        DDPRINTF("Keepalive startup waiting: %" PRIu64 "\n",
+                rdma->nb_missed_keepalive);
+    } else {
+        DDPRINTF("Keepalive startup too long.\n");
+        rdma->keepalive_startup = true;
+    }
+
+reset:
+    timer_mod(connection_timer, qemu_clock_get_ms(QEMU_CLOCK_REALTIME) +
+                    RDMA_KEEPALIVE_INTERVAL_MS + first_missed);
+}
+
+static void qemu_rdma_keepalive_start(void)
+{
+    DPRINTF("Starting up keepalives....\n");
+    timer_mod(connection_timer, qemu_clock_get_ms(QEMU_CLOCK_REALTIME) + 
+                    RDMA_CONNECTION_INTERVAL_MS);
+    timer_mod(keepalive_timer, qemu_clock_get_ms(QEMU_CLOCK_REALTIME) +
+                    RDMA_KEEPALIVE_INTERVAL_MS);
+}
+
 static void *qemu_rdma_data_init(const char *host_port, Error **errp)
 {
     RDMAContext *rdma = NULL;
@@ -2502,8 +3078,12 @@ static void *qemu_rdma_data_init(const char *host_port, Error **errp)
     if (host_port) {
         rdma = g_malloc0(sizeof(RDMAContext));
         memset(rdma, 0, sizeof(RDMAContext));
-        rdma->current_index = -1;
-        rdma->current_chunk = -1;
+        rdma->chunk_remote.current_block_idx = -1;
+        rdma->chunk_remote.current_chunk = -1;
+        rdma->chunk_local_src.current_block_idx = -1;
+        rdma->chunk_local_src.current_chunk = -1;
+        rdma->chunk_local_dest.current_block_idx = -1;
+        rdma->chunk_local_dest.current_chunk = -1;
 
         addr = inet_parse(host_port, NULL);
         if (addr != NULL) {
@@ -2515,6 +3095,14 @@ static void *qemu_rdma_data_init(const char *host_port, Error **errp)
             return NULL;
         }
     }
+       
+    rdma->keepalive_startup = false;
+    connection_timer = timer_new_ms(QEMU_CLOCK_REALTIME, check_qp_state, rdma);
+    keepalive_timer = timer_new_ms(QEMU_CLOCK_REALTIME, send_keepalive, rdma);
+    rdma->lc_dest.id_str = "local destination";
+    rdma->lc_src.id_str = "local src";
+    rdma->lc_remote.id_str = "remote";
+
 
     return rdma;
 }
@@ -2540,9 +3128,9 @@ static int qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
      * Push out any writes that
      * we're queued up for pc.ram.
      */
-    ret = qemu_rdma_write_flush(f, rdma);
+    ret = qemu_rdma_write(f, rdma, &rdma->chunk_remote, NULL);
     if (ret < 0) {
-        rdma->error_state = ret;
+        SET_ERROR(rdma, ret);
         return ret;
     }
 
@@ -2558,7 +3146,7 @@ static int qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
         ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
 
         if (ret < 0) {
-            rdma->error_state = ret;
+            SET_ERROR(rdma, ret);
             return ret;
         }
 
@@ -2618,7 +3206,7 @@ static int qemu_rdma_get_buffer(void *opaque, uint8_t *buf,
     ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
 
     if (ret < 0) {
-        rdma->error_state = ret;
+        SET_ERROR(rdma, ret);
         return ret;
     }
 
@@ -2631,18 +3219,23 @@ static int qemu_rdma_get_buffer(void *opaque, uint8_t *buf,
 /*
  * Block until all the outstanding chunks have been delivered by the hardware.
  */
-static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
+static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma,
+                              RDMACurrentChunk *src,
+                              RDMACurrentChunk *dest)
 {
     int ret;
+    RDMALocalContext *lc = (dest && dest != src) ? 
+            (rdma->source ? &rdma->lc_src : &rdma->lc_dest) : &rdma->lc_remote;
 
-    if (qemu_rdma_write_flush(f, rdma) < 0) {
+    if (qemu_rdma_write(f, rdma, src, dest) < 0) {
         return -EIO;
     }
 
-    while (rdma->nb_sent) {
-        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
+    while (lc->nb_sent) {
+        ret = qemu_rdma_block_for_wrid(rdma, lc,
+                                       RDMA_WRID_RDMA_WRITE_REMOTE, NULL);
         if (ret < 0) {
-            fprintf(stderr, "rdma migration: complete polling error!\n");
+            ERROR(NULL, "complete polling!");
             return -EIO;
         }
     }
@@ -2657,13 +3250,190 @@ static int qemu_rdma_close(void *opaque)
     DPRINTF("Shutting down connection.\n");
     QEMUFileRDMA *r = opaque;
     if (r->rdma) {
-        qemu_rdma_cleanup(r->rdma);
+        qemu_rdma_cleanup(r->rdma, false);
         g_free(r->rdma);
     }
     g_free(r);
     return 0;
 }
 
+static int qemu_rdma_instruct_unregister(RDMAContext *rdma, QEMUFile *f,
+                                         ram_addr_t block_offset,
+                                         ram_addr_t offset, long size)
+{
+    int ret;
+    uint64_t block, chunk;
+
+    if (size < 0) {
+        ret = qemu_rdma_drain_cq(f, rdma, &rdma->chunk_remote, NULL);
+        if (ret < 0) {
+            fprintf(stderr, "rdma: failed to synchronously drain"
+                            " completion queue before unregistration.\n");
+            return ret;
+        }
+    }
+
+    ret = qemu_rdma_search_ram_block(rdma, block_offset, 
+                                     offset, size, &block, &chunk);
+
+    if (ret) {
+        fprintf(stderr, "ram block search failed\n");
+        return ret;
+    }
+
+    qemu_rdma_signal_unregister(rdma, block, chunk, 0);
+
+    /*
+     * Synchronous, gauranteed unregistration (should not occur during
+     * fast-path). Otherwise, unregisters will process on the next call to
+     * qemu_rdma_drain_cq()
+     */
+    if (size < 0) {
+        qemu_rdma_unregister_waiting(rdma);
+    }
+
+    return 0;
+}
+
+
+static int qemu_rdma_poll_until_empty(RDMAContext *rdma, RDMALocalContext *lc)
+{
+    uint64_t wr_id, wr_id_in;
+    int ret;
+
+    /*
+     * Drain the Completion Queue if possible, but do not block,
+     * just poll.
+     *
+     * If nothing to poll, the end of the iteration will do this
+     * again to make sure we don't overflow the request queue.
+     */
+    while (1) {
+        ret = qemu_rdma_poll(rdma, lc, &wr_id_in, NULL);
+        if (ret < 0) {
+            ERROR(NULL, "empty polling error! %d", ret);
+            return ret;
+        }
+
+        wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
+
+        if (wr_id == RDMA_WRID_NONE) {
+            break;
+        }
+    }
+
+    return 0;
+}
+
+/*
+ * Parameters:
+ *    @offset_{source|dest} == 0 :
+ *        This means that 'block_offset' is a full virtual address that does not
+ *        belong to a RAMBlock of the virtual machine and instead
+ *        represents a private malloc'd memory area that the caller wishes to
+ *        transfer. Source and dest can be different (either real RAMBlocks or
+ *        private).
+ *
+ *    @offset != 0 :
+ *        Offset is an offset to be added to block_offset and used
+ *        to also lookup the corresponding RAMBlock. Source and dest can be different 
+ *        (either real RAMBlocks or private).
+ *
+ *    @size > 0 :
+ *        Amount of memory to copy locally using RDMA.
+ *
+ *    @size == 0 :
+ *        A 'hint' or 'advice' that means that we wish to speculatively
+ *        and asynchronously unregister either the source or destination memory.
+ *        In this case, there is no gaurantee that the unregister will actually happen, 
+ *        for example, if the memory is being actively copied. Additionally, the memory
+ *        may be re-registered at any future time if a copy within the same
+ *        range was requested again, even if you attempted to unregister it here.
+ *
+ *    @size < 0 : TODO, not yet supported
+ *        Unregister the memory NOW. This means that the caller does not
+ *        expect there to be any future RDMA copies and we just want to clean
+ *        things up. This is used in case the upper layer owns the memory and
+ *        cannot wait for qemu_fclose() to occur.
+ */
+static int qemu_rdma_copy_page(QEMUFile *f, void *opaque,
+                                  ram_addr_t block_offset_dest,
+                                  ram_addr_t offset_dest,
+                                  ram_addr_t block_offset_source,
+                                  ram_addr_t offset_source,
+                                  long size)
+{
+    QEMUFileRDMA *rfile = opaque;
+    RDMAContext *rdma = rfile->rdma;
+    int ret;
+    RDMACurrentChunk *src = &rdma->chunk_local_src;
+    RDMACurrentChunk *dest = &rdma->chunk_local_dest;
+
+    CHECK_ERROR_STATE();
+
+    qemu_fflush(f);
+
+    if (size > 0) {
+        /*
+         * Add this page to the current 'chunk'. If the chunk
+         * is full, or the page doen't belong to the current chunk,
+         * an actual RDMA write will occur and a new chunk will be formed.
+         */
+        src->block_offset = block_offset_source;
+        src->offset = offset_source;
+        dest->block_offset = block_offset_dest;
+        dest->offset = offset_dest;
+
+        DDPRINTF("Copy page: %p src offset %" PRIu64
+                " dest %p offset %" PRIu64 "\n",
+                (void *) block_offset_source, offset_source,
+                (void *) block_offset_dest, offset_dest);
+
+        ret = qemu_rdma_flush_unmergable(rdma, src, dest, f, size);
+
+        if (ret) {
+            ERROR(NULL, "local copy flush");
+            goto err;
+        }
+
+        if ((src->current_length >= RDMA_MERGE_MAX) || 
+            (dest->current_length >= RDMA_MERGE_MAX)) {
+            ret = qemu_rdma_write(f, rdma, src, dest);
+
+            if (ret < 0) {
+                goto err;
+            }
+        } else {
+            ret = 0;
+        }
+    } else {
+        ret = qemu_rdma_instruct_unregister(rdma, f, block_offset_source,
+                                                  offset_source, size);
+        if (ret) {
+            goto err;
+        }
+
+        ret = qemu_rdma_instruct_unregister(rdma, f, block_offset_dest, 
+                                                  offset_dest, size);
+
+        if (ret) {
+            goto err;
+        }
+    }
+
+    ret = qemu_rdma_poll_until_empty(rdma, 
+                rdma->source ? &rdma->lc_src : &rdma->lc_dest);
+
+    if (ret) {
+        goto err;
+    }
+
+    return RAM_COPY_CONTROL_DELAYED;
+err:
+    SET_ERROR(rdma, ret);
+    return ret;
+}
+
 /*
  * Parameters:
  *    @offset == 0 :
@@ -2672,6 +3442,20 @@ static int qemu_rdma_close(void *opaque)
  *        represents a private malloc'd memory area that the caller wishes to
  *        transfer.
  *
+ *        This allows callers to initiate RDMA transfers of arbitrary memory
+ *        areas and not just only by migration itself.
+ *
+ *        If this is true, then the virtual address specified by 'block_offset'
+ *        below must have been pre-registered with us in advance by calling the
+ *        new QEMUFileOps->add()/remove() functions on both sides of the
+ *        connection.
+ *
+ *        Also note: add()/remove() must been called in the *same sequence* and
+ *        against the *same size* private virtual memory on both sides of the
+ *        connection for this to work, regardless whether or not transfer of
+ *        this private memory was initiated by the migration code or a private
+ *        caller.
+ *
  *    @offset != 0 :
  *        Offset is an offset to be added to block_offset and used
  *        to also lookup the corresponding RAMBlock.
@@ -2680,7 +3464,7 @@ static int qemu_rdma_close(void *opaque)
  *        Initiate an transfer this size.
  *
  *    @size == 0 :
- *        A 'hint' or 'advice' that means that we wish to speculatively
+ *        A 'hint' that means that we wish to speculatively
  *        and asynchronously unregister this memory. In this case, there is no
  *        guarantee that the unregister will actually happen, for example,
  *        if the memory is being actively transmitted. Additionally, the memory
@@ -2698,12 +3482,15 @@ static int qemu_rdma_close(void *opaque)
  *                  sent. Usually, this will not be more than a few bytes of
  *                  the protocol because most transfers are sent asynchronously.
  */
-static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
-                                  ram_addr_t block_offset, ram_addr_t offset,
-                                  size_t size, int *bytes_sent)
+static int qemu_rdma_save_page(QEMUFile *f, void *opaque,
+                                  ram_addr_t block_offset,
+                                  uint8_t *host_addr,
+                                  ram_addr_t offset,
+                                  long size, int *bytes_sent)
 {
     QEMUFileRDMA *rfile = opaque;
     RDMAContext *rdma = rfile->rdma;
+    RDMACurrentChunk *cc = &rdma->chunk_remote;
     int ret;
 
     CHECK_ERROR_STATE();
@@ -2716,12 +3503,27 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
          * is full, or the page doen't belong to the current chunk,
          * an actual RDMA write will occur and a new chunk will be formed.
          */
-        ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
-        if (ret < 0) {
-            fprintf(stderr, "rdma migration: write error! %d\n", ret);
+        cc->block_offset = block_offset;
+        cc->offset = offset;
+
+        ret = qemu_rdma_flush_unmergable(rdma, cc, NULL, f, size);
+
+        if (ret) {
+            ERROR(NULL, "remote flush unmergable");
             goto err;
         }
 
+        if (cc->current_length >= RDMA_MERGE_MAX) {
+            ret = qemu_rdma_write(f, rdma, cc, NULL);
+
+            if (ret < 0) {
+                ERROR(NULL, "remote write! %d", ret);
+                goto err;
+            }
+        } else {
+            ret = 0;
+        }
+
         /*
          * We always return 1 bytes because the RDMA
          * protocol is completely asynchronous. We do not yet know
@@ -2734,64 +3536,22 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
             *bytes_sent = 1;
         }
     } else {
-        uint64_t index, chunk;
-
-        /* TODO: Change QEMUFileOps prototype to be signed: size_t => long
-        if (size < 0) {
-            ret = qemu_rdma_drain_cq(f, rdma);
-            if (ret < 0) {
-                fprintf(stderr, "rdma: failed to synchronously drain"
-                                " completion queue before unregistration.\n");
-                goto err;
-            }
-        }
-        */
-
-        ret = qemu_rdma_search_ram_block(rdma, block_offset,
-                                         offset, size, &index, &chunk);
+        ret = qemu_rdma_instruct_unregister(rdma, f, block_offset, offset, size);
 
         if (ret) {
-            fprintf(stderr, "ram block search failed\n");
             goto err;
         }
-
-        qemu_rdma_signal_unregister(rdma, index, chunk, 0);
-
-        /*
-         * TODO: Synchronous, guaranteed unregistration (should not occur during
-         * fast-path). Otherwise, unregisters will process on the next call to
-         * qemu_rdma_drain_cq()
-        if (size < 0) {
-            qemu_rdma_unregister_waiting(rdma);
-        }
-        */
     }
 
-    /*
-     * Drain the Completion Queue if possible, but do not block,
-     * just poll.
-     *
-     * If nothing to poll, the end of the iteration will do this
-     * again to make sure we don't overflow the request queue.
-     */
-    while (1) {
-        uint64_t wr_id, wr_id_in;
-        int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL);
-        if (ret < 0) {
-            fprintf(stderr, "rdma migration: polling error! %d\n", ret);
-            goto err;
-        }
-
-        wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
+    ret = qemu_rdma_poll_until_empty(rdma, &rdma->lc_remote);
 
-        if (wr_id == RDMA_WRID_NONE) {
-            break;
-        }
+    if (ret) {
+        goto err;
     }
 
     return RAM_SAVE_CONTROL_DELAYED;
 err:
-    rdma->error_state = ret;
+    SET_ERROR(rdma, ret);
     return ret;
 }
 
@@ -2829,6 +3589,13 @@ static int qemu_rdma_accept(RDMAContext *rdma)
             goto err_rdma_dest_wait;
     }
 
+    rdma->keepalive_rkey = cap.keepalive_rkey;
+    rdma->keepalive_addr = cap.keepalive_addr;
+
+    DDPRINTF("Received keepalive params: key %x addr: %" PRIx64 
+            " local %" PRIx64 "\n",
+            cap.keepalive_rkey, cap.keepalive_addr, (uint64_t) &rdma->keepalive);
+
     /*
      * Respond with only the capabilities this version of QEMU knows about.
      */
@@ -2838,9 +3605,8 @@ static int qemu_rdma_accept(RDMAContext *rdma)
      * Enable the ones that we do know about.
      * Add other checks here as new ones are introduced.
      */
-    if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
-        rdma->pin_all = true;
-    }
+    rdma->pin_all = cap.flags & RDMA_CAPABILITY_PIN_ALL;
+    rdma->do_keepalive = cap.flags & RDMA_CAPABILITY_KEEPALIVE;
 
     rdma->cm_id = cm_event->id;
     verbs = cm_event->id->verbs;
@@ -2848,43 +3614,56 @@ static int qemu_rdma_accept(RDMAContext *rdma)
     rdma_ack_cm_event(cm_event);
 
     DPRINTF("Memory pin all: %s\n", rdma->pin_all ? "enabled" : "disabled");
-
-    caps_to_network(&cap);
+    DPRINTF("Keepalives: %s\n", rdma->do_keepalive ? "enabled" : "disabled");
 
     DPRINTF("verbs context after listen: %p\n", verbs);
 
-    if (!rdma->verbs) {
-        rdma->verbs = verbs;
-    } else if (rdma->verbs != verbs) {
-            fprintf(stderr, "ibv context not matching %p, %p!\n",
-                    rdma->verbs, verbs);
-            goto err_rdma_dest_wait;
+    if (!rdma->lc_remote.verbs) {
+        rdma->lc_remote.verbs = verbs;
+    } else if (rdma->lc_remote.verbs != verbs) {
+        ERROR(NULL, "ibv context %p != %p!", rdma->lc_remote.verbs, verbs);
+        goto err_rdma_dest_wait;
     }
 
     qemu_rdma_dump_id("dest_init", verbs);
 
-    ret = qemu_rdma_alloc_pd_cq(rdma);
+    ret = qemu_rdma_alloc_pd_cq(rdma, &rdma->lc_remote);
     if (ret) {
-        fprintf(stderr, "rdma migration: error allocating pd and cq!\n");
+        ERROR(NULL, "error allocating pd and cq!");
         goto err_rdma_dest_wait;
     }
 
+    ret = qemu_rdma_alloc_keepalive(rdma);
+
+    if (ret) {
+        ERROR(NULL, "allocating keepalive structures");
+        goto err_rdma_dest_wait;
+    }
+
+    cap.keepalive_rkey = rdma->keepalive_mr->rkey,
+    cap.keepalive_addr = (uint64_t) &rdma->keepalive;
+
+    DDPRINTF("Sending keepalive params: key %x addr: %" PRIx64 
+            " remote: %" PRIx64 "\n",
+            cap.keepalive_rkey, cap.keepalive_addr, rdma->keepalive_addr);
+    caps_to_network(&cap);
+
     ret = qemu_rdma_alloc_qp(rdma);
     if (ret) {
-        fprintf(stderr, "rdma migration: error allocating qp!\n");
+        ERROR(NULL, "allocating qp!");
         goto err_rdma_dest_wait;
     }
 
     ret = qemu_rdma_init_ram_blocks(rdma);
     if (ret) {
-        fprintf(stderr, "rdma migration: error initializing ram blocks!\n");
+        ERROR(NULL, "initializing ram blocks!");
         goto err_rdma_dest_wait;
     }
 
     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
         ret = qemu_rdma_reg_control(rdma, idx);
         if (ret) {
-            fprintf(stderr, "rdma: error registering %d control!\n", idx);
+            ERROR(NULL, "registering %d control!", idx);
             goto err_rdma_dest_wait;
         }
     }
@@ -2893,18 +3672,18 @@ static int qemu_rdma_accept(RDMAContext *rdma)
 
     ret = rdma_accept(rdma->cm_id, &conn_param);
     if (ret) {
-        fprintf(stderr, "rdma_accept returns %d!\n", ret);
+        ERROR(NULL, "rdma_accept returns %d!", ret);
         goto err_rdma_dest_wait;
     }
 
     ret = rdma_get_cm_event(rdma->channel, &cm_event);
     if (ret) {
-        fprintf(stderr, "rdma_accept get_cm_event failed %d!\n", ret);
+        ERROR(NULL, "rdma_accept get_cm_event failed %d!", ret);
         goto err_rdma_dest_wait;
     }
 
     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
-        fprintf(stderr, "rdma_accept not event established!\n");
+        ERROR(NULL, "rdma_accept not event established!");
         rdma_ack_cm_event(cm_event);
         goto err_rdma_dest_wait;
     }
@@ -2914,7 +3693,7 @@ static int qemu_rdma_accept(RDMAContext *rdma)
 
     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
     if (ret) {
-        fprintf(stderr, "rdma migration: error posting second control recv!\n");
+        ERROR(NULL, "posting second control recv!");
         goto err_rdma_dest_wait;
     }
 
@@ -2923,18 +3702,16 @@ static int qemu_rdma_accept(RDMAContext *rdma)
     return 0;
 
 err_rdma_dest_wait:
-    rdma->error_state = ret;
-    qemu_rdma_cleanup(rdma);
+    SET_ERROR(rdma, ret);
+    qemu_rdma_cleanup(rdma, false);
     return ret;
 }
 
 /*
  * During each iteration of the migration, we listen for instructions
- * by the source VM to perform dynamic page registrations before they
+ * by the source VM to perform pinning operations before they
  * can perform RDMA operations.
  *
- * We respond with the 'rkey'.
- *
  * Keep doing this until the source tells us to stop.
  */
 static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
@@ -2957,8 +3734,8 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
     RDMARegister *reg, *registers;
     RDMACompress *comp;
     RDMARegisterResult *reg_result;
-    static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
     RDMALocalBlock *block;
+    static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
     void *host_addr;
     int ret = 0;
     int idx = 0;
@@ -3009,8 +3786,7 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
             if (rdma->pin_all) {
                 ret = qemu_rdma_reg_whole_ram_blocks(rdma);
                 if (ret) {
-                    fprintf(stderr, "rdma migration: error dest "
-                                    "registering ram blocks!\n");
+                    ERROR(NULL, "dest registering ram blocks!");
                     goto out;
                 }
             }
@@ -3043,7 +3819,7 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
                                         (uint8_t *) rdma->block, &blocks);
 
             if (ret < 0) {
-                fprintf(stderr, "rdma migration: error sending remote info!\n");
+                ERROR(NULL, "sending remote info!");
                 goto out;
             }
 
@@ -3055,8 +3831,7 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
 
             for (count = 0; count < head.repeat; count++) {
-                uint64_t chunk;
-                uint8_t *chunk_start, *chunk_end;
+                RDMACurrentChunk cc;
 
                 reg = &registers[count];
                 network_to_register(reg);
@@ -3065,30 +3840,28 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
 
                 DDPRINTF("Registration request (%d): index %d, current_addr %"
                          PRIu64 " chunks: %" PRIu64 "\n", count,
-                         reg->current_index, reg->key.current_addr, reg->chunks);
-
-                block = &(rdma->local_ram_blocks.block[reg->current_index]);
-                if (block->is_ram_block) {
-                    host_addr = (block->local_host_addr +
-                                (reg->key.current_addr - block->offset));
-                    chunk = ram_chunk_index(block->local_host_addr,
-                                            (uint8_t *) host_addr);
+                         reg->current_block_idx, reg->key.current_addr, reg->chunks);
+
+                cc.block = &(rdma->local_ram_blocks.block[reg->current_block_idx]);
+                if (cc.block->is_ram_block) {
+                    cc.addr = (cc.block->local_host_addr +
+                                (reg->key.current_addr - cc.block->offset));
+                    cc.chunk_idx = ram_chunk_index(block->local_host_addr, cc.addr);
                 } else {
-                    chunk = reg->key.chunk;
-                    host_addr = block->local_host_addr +
+                    cc.chunk_idx = reg->key.chunk;
+                    cc.addr = cc.block->local_host_addr +
                         (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
                 }
-                chunk_start = ram_chunk_start(block, chunk);
-                chunk_end = ram_chunk_end(block, chunk + reg->chunks);
-                if (qemu_rdma_register_and_get_keys(rdma, block,
-                            (uint8_t *)host_addr, NULL, &reg_result->rkey,
-                            chunk, chunk_start, chunk_end)) {
+                cc.chunk_start = ram_chunk_start(cc.block, cc.chunk_idx);
+                cc.chunk_end = ram_chunk_end(cc.block, cc.chunk_idx + reg->chunks);
+                if (qemu_rdma_register_and_get_keys(rdma, &cc, &rdma->lc_remote,
+                                            false, NULL, &reg_result->rkey)) {
                     fprintf(stderr, "cannot get rkey!\n");
                     ret = -EINVAL;
                     goto out;
                 }
 
-                reg_result->host_addr = (uint64_t) block->local_host_addr;
+                reg_result->host_addr = (uint64_t) cc.block->local_host_addr;
 
                 DDPRINTF("Registered rkey for this request: %x\n",
                                 reg_result->rkey);
@@ -3115,9 +3888,9 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
 
                 DDPRINTF("Unregistration request (%d): "
                          " index %d, chunk %" PRIu64 "\n",
-                         count, reg->current_index, reg->key.chunk);
+                         count, reg->current_block_idx, reg->key.chunk);
 
-                block = &(rdma->local_ram_blocks.block[reg->current_index]);
+                block = &(rdma->local_ram_blocks.block[reg->current_block_idx]);
 
                 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
                 block->pmr[reg->key.chunk] = NULL;
@@ -3154,7 +3927,7 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
     } while (1);
 out:
     if (ret < 0) {
-        rdma->error_state = ret;
+        SET_ERROR(rdma, ret);
     }
     return ret;
 }
@@ -3168,7 +3941,23 @@ static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
     CHECK_ERROR_STATE();
 
     DDDPRINTF("start section: %" PRIu64 "\n", flags);
-    qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
+
+    if (flags == RAM_CONTROL_FLUSH) {
+        int ret;
+
+        if (rdma->source) {
+            ret = qemu_rdma_drain_cq(f, rdma, &rdma->chunk_local_src, 
+                                              &rdma->chunk_local_dest);
+
+            if (ret < 0) {
+                return ret;
+            }
+        }
+
+    } else {
+        qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
+    }
+
     qemu_fflush(f);
 
     return 0;
@@ -3190,7 +3979,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
     CHECK_ERROR_STATE();
 
     qemu_fflush(f);
-    ret = qemu_rdma_drain_cq(f, rdma);
+    ret = qemu_rdma_drain_cq(f, rdma, &rdma->chunk_remote, NULL);
 
     if (ret < 0) {
         goto err;
@@ -3225,13 +4014,13 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
         /*
          * The protocol uses two different sets of rkeys (mutually exclusive):
          * 1. One key to represent the virtual address of the entire ram block.
-         *    (dynamic chunk registration disabled - pin everything with one rkey.)
+         *    (pinning enabled - pin everything with one rkey.)
          * 2. One to represent individual chunks within a ram block.
-         *    (dynamic chunk registration enabled - pin individual chunks.)
+         *    (pinning disabled - pin individual chunks.)
          *
          * Once the capability is successfully negotiated, the destination transmits
          * the keys to use (or sends them later) including the virtual addresses
-         * and then propagates the remote ram block descriptions to his local copy.
+         * and then propagates the remote ram block descriptions to their local copy.
          */
 
         if (local->nb_blocks != nb_remote_blocks) {
@@ -3285,7 +4074,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
 
     return 0;
 err:
-    rdma->error_state = ret;
+    SET_ERROR(rdma, ret);
     return ret;
 }
 
@@ -3294,7 +4083,23 @@ static int qemu_rdma_get_fd(void *opaque)
     QEMUFileRDMA *rfile = opaque;
     RDMAContext *rdma = rfile->rdma;
 
-    return rdma->comp_channel->fd;
+    return rdma->lc_remote.comp_chan->fd;
+}
+
+static int qemu_rdma_delete_block(QEMUFile *f, void *opaque,
+                                  ram_addr_t block_offset)
+{
+    QEMUFileRDMA *rfile = opaque;
+    return __qemu_rdma_delete_block(rfile->rdma, block_offset);
+}
+
+
+static int qemu_rdma_add_block(QEMUFile *f, void *opaque, void *host_addr,
+                         ram_addr_t block_offset, uint64_t length)
+{
+    QEMUFileRDMA *rfile = opaque;
+    return __qemu_rdma_add_block(rfile->rdma, host_addr,
+                                 block_offset, length);
 }
 
 const QEMUFileOps rdma_read_ops = {
@@ -3302,6 +4107,9 @@ const QEMUFileOps rdma_read_ops = {
     .get_fd        = qemu_rdma_get_fd,
     .close         = qemu_rdma_close,
     .hook_ram_load = qemu_rdma_registration_handle,
+    .copy_page     = qemu_rdma_copy_page,
+    .add           = qemu_rdma_add_block,
+    .remove        = qemu_rdma_delete_block,
 };
 
 const QEMUFileOps rdma_write_ops = {
@@ -3310,6 +4118,9 @@ const QEMUFileOps rdma_write_ops = {
     .before_ram_iterate = qemu_rdma_registration_start,
     .after_ram_iterate  = qemu_rdma_registration_stop,
     .save_page          = qemu_rdma_save_page,
+    .copy_page          = qemu_rdma_copy_page,
+    .add                = qemu_rdma_add_block,
+    .remove             = qemu_rdma_delete_block,
 };
 
 static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
@@ -3331,6 +4142,98 @@ static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
     return r->file;
 }
 
+static int connect_local(RDMAContext *rdma,
+                                   RDMALocalContext *src,
+                                   RDMALocalContext *dest)
+{
+    int ret;
+	struct ibv_qp_attr next = {
+			.qp_state = IBV_QPS_RTR,
+			.path_mtu = IBV_MTU_1024,
+			.dest_qp_num = src->qp->qp_num,
+			.rq_psn = src->psn,
+			.max_dest_rd_atomic = 1,
+			.min_rnr_timer = 12,
+			.ah_attr = {
+				.is_global = 0,
+				.dlid = src->port.lid,
+				.sl = 0,
+				.src_path_bits = 0,
+				.port_num = src->port_num,
+			}
+	};
+
+	if(src->gid.global.interface_id) {
+		next.ah_attr.is_global = 1;
+		next.ah_attr.grh.hop_limit = 1;
+		next.ah_attr.grh.dgid = src->gid;
+		next.ah_attr.grh.sgid_index = 0;
+	}
+
+	ret = ibv_modify_qp(dest->qp, &next,
+		IBV_QP_STATE |
+		IBV_QP_AV |
+		IBV_QP_PATH_MTU |
+		IBV_QP_DEST_QPN |
+		IBV_QP_RQ_PSN |
+		IBV_QP_MAX_DEST_RD_ATOMIC |
+		IBV_QP_MIN_RNR_TIMER);
+
+    if (ret) {
+        SET_ERROR(rdma, -ret);
+		ERROR(NULL, "modify src verbs to ready");
+		return rdma->error_state;
+	}
+
+	next.qp_state = IBV_QPS_RTS;
+	next.timeout = 14;
+	next.retry_cnt = 7;
+	next.rnr_retry = 7;
+	next.sq_psn = dest->psn;
+	next.max_rd_atomic = 1; 
+
+	ret = ibv_modify_qp(dest->qp, &next,
+		IBV_QP_STATE |
+		IBV_QP_TIMEOUT |
+		IBV_QP_RETRY_CNT |
+		IBV_QP_RNR_RETRY |
+		IBV_QP_SQ_PSN |
+		IBV_QP_MAX_QP_RD_ATOMIC);
+
+    if (ret) {
+        SET_ERROR(rdma, -ret);
+		ERROR(NULL, "modify dest verbs to ready\n");
+		return rdma->error_state;
+	}
+
+    return 0;
+}
+
+static int init_local(RDMAContext *rdma)
+{
+    DDPRINTF("Opening copy local source queue pair...\n");
+    if (open_local(rdma, &rdma->lc_src)) {
+        return 1;
+    }
+
+    DDPRINTF("Opening copy local destination queue pair...\n");
+    if (open_local(rdma, &rdma->lc_dest)) {
+        return 1;
+    }
+
+    DDPRINTF("Connecting local src queue pairs...\n");
+    if (connect_local(rdma, &rdma->lc_src, &rdma->lc_dest)) {
+        return 1;
+    }
+
+    DDPRINTF("Connecting local dest queue pairs...\n");
+    if (connect_local(rdma, &rdma->lc_dest, &rdma->lc_src)) {
+        return 1;
+    }
+
+    return 0;
+}
+
 static void rdma_accept_incoming_migration(void *opaque)
 {
     RDMAContext *rdma = opaque;
@@ -3342,21 +4245,32 @@ static void rdma_accept_incoming_migration(void *opaque)
     ret = qemu_rdma_accept(rdma);
 
     if (ret) {
-        ERROR(errp, "RDMA Migration initialization failed!");
+        ERROR(errp, "initialization failed!");
         return;
     }
 
     DPRINTF("Accepted migration\n");
 
+    if (init_local(rdma)) {
+        ERROR(errp, "could not initialize local rdma queue pairs!");
+        goto err;
+    }
+
     f = qemu_fopen_rdma(rdma, "rb");
     if (f == NULL) {
         ERROR(errp, "could not qemu_fopen_rdma!");
-        qemu_rdma_cleanup(rdma);
-        return;
+        goto err;
     }
 
-    rdma->migration_started_on_destination = 1;
+    if (rdma->do_keepalive) {
+        qemu_rdma_keepalive_start();
+    }
+
+    rdma->migration_started = 1;
     process_incoming_migration(f);
+    return;
+err:
+    qemu_rdma_cleanup(rdma, false);
 }
 
 void rdma_start_incoming_migration(const char *host_port, Error **errp)
@@ -3372,6 +4286,9 @@ void rdma_start_incoming_migration(const char *host_port, Error **errp)
         goto err;
     }
 
+    rdma->source = false;
+    rdma->dest = true;
+
     ret = qemu_rdma_dest_init(rdma, &local_err);
 
     if (ret) {
@@ -3411,8 +4328,10 @@ void rdma_start_outgoing_migration(void *opaque,
         goto err;
     }
 
-    ret = qemu_rdma_source_init(rdma, &local_err,
-        s->enabled_capabilities[MIGRATION_CAPABILITY_X_RDMA_PIN_ALL]);
+    rdma->source = true;
+    rdma->dest = false;
+
+    ret = qemu_rdma_source_init(rdma, &local_err, s);
 
     if (ret) {
         goto err;
@@ -3425,9 +4344,20 @@ void rdma_start_outgoing_migration(void *opaque,
         goto err;
     }
 
+    if (init_local(rdma)) {
+        ERROR(temp, "could not initialize local rdma queue pairs!");
+        goto err;
+    }
+
     DPRINTF("qemu_rdma_source_connect success\n");
 
     s->file = qemu_fopen_rdma(rdma, "wb");
+    rdma->migration_started = 1;
+
+    if (rdma->do_keepalive) {
+        qemu_rdma_keepalive_start();
+    }
+
     migrate_fd_connect(s);
     return;
 err:
diff --git a/savevm.c b/savevm.c
index 05f8a05..f8eb225 100644
--- a/savevm.c
+++ b/savevm.c
@@ -647,6 +647,31 @@ void ram_control_after_iterate(QEMUFile *f, uint64_t flags)
     }
 }
 
+void ram_control_add(QEMUFile *f, void *host_addr,
+                         ram_addr_t block_offset, uint64_t length)
+{
+    int ret = 0;
+
+    if (f->ops->add) {
+        ret = f->ops->add(f, f->opaque, host_addr, block_offset, length);
+        if (ret < 0) {
+            qemu_file_set_error(f, ret);
+        }
+    }
+}
+
+void ram_control_remove(QEMUFile *f, ram_addr_t block_offset)
+{
+    int ret = 0;
+
+    if (f->ops->remove) {
+        ret = f->ops->remove(f, f->opaque, block_offset);
+        if (ret < 0) {
+            qemu_file_set_error(f, ret);
+        }
+    }
+}
+
 void ram_control_load_hook(QEMUFile *f, uint64_t flags)
 {
     int ret = -EINVAL;
@@ -703,6 +728,33 @@ int ram_control_load_page(QEMUFile *f, void *host_addr, long size)
     return RAM_LOAD_CONTROL_NOT_SUPP;
 }
 
+int ram_control_copy_page(QEMUFile *f, 
+                             ram_addr_t block_offset_dest,
+                             ram_addr_t offset_dest,
+                             ram_addr_t block_offset_source,
+                             ram_addr_t offset_source,
+                             long size)
+{
+    if (f->ops->copy_page) {
+        int ret = f->ops->copy_page(f, f->opaque,
+                                    block_offset_dest,
+                                    offset_dest,
+                                    block_offset_source,
+                                    offset_source,
+                                    size);
+
+        if (ret != RAM_COPY_CONTROL_DELAYED) {
+            if (ret < 0) {
+                qemu_file_set_error(f, ret);
+            }
+        }
+
+        return ret;
+    }
+
+    return RAM_COPY_CONTROL_NOT_SUPP;
+}
+
 static void qemu_fill_buffer(QEMUFile *f)
 {
     int len;
-- 
1.8.1.2

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] [RFC PATCH v1: 07/12] mc: introduce state machine error handling and migration_bitmap prep
  2013-10-21  1:14 [Qemu-devel] [RFC PATCH v1: 00/12] fault tolerance through micro-checkpointing mrhines
                   ` (5 preceding siblings ...)
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 06/12] rdma: accelerated memcpy() support mrhines
@ 2013-10-21  1:14 ` mrhines
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 08/12] mc: modified QMP statistics and migration_thread handoff mrhines
                   ` (4 subsequent siblings)
  11 siblings, 0 replies; 16+ messages in thread
From: mrhines @ 2013-10-21  1:14 UTC (permalink / raw)
  To: qemu-devel
  Cc: aliguori, quintela, owasserm, onom, abali, mrhines, gokul, pbonzini

From: "Michael R. Hines" <mrhines@us.ibm.com>

Since MC will repeatedly call the pre-existing live migration
call path over and over again (forever), the migration_bitmap
initialization only needs to happen once and the destruction of
the bitmap needs to be avoided in successive checkpoints.

Also, there some additional state machine error handling to
prepare for before introducing the MC core logic.

Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
---
 arch_init.c                   | 29 ++++++++++++++++++++++++-----
 include/migration/migration.h | 19 +++++++++++++++++++
 include/migration/qemu-file.h |  1 +
 migration.c                   | 33 +++++++++++++++++++--------------
 4 files changed, 63 insertions(+), 19 deletions(-)

diff --git a/arch_init.c b/arch_init.c
index 9cf7d18..d47b38b 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -795,13 +795,13 @@ static void ram_migration_cancel(void *opaque)
     migration_end();
 }
 
-static void reset_ram_globals(void)
+static void reset_ram_globals(bool reset_bulk_stage)
 {
     last_seen_block = NULL;
     last_sent_block = NULL;
     last_offset = 0;
     last_version = ram_list.version;
-    ram_bulk_stage = true;
+    ram_bulk_stage = reset_bulk_stage;
 }
 
 #define MAX_WAIT 50 /* ms, half buffered_file limit */
@@ -811,6 +811,15 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
     RAMBlock *block;
     int64_t ram_pages = last_ram_offset() >> TARGET_PAGE_BITS;
 
+    /*
+     * RAM stays open during micro-checkpointing for the next transaction.
+     */
+    if (migration_is_mc(migrate_get_current())) {
+        qemu_mutex_lock_ramlist();
+        reset_ram_globals(false);
+        goto skip_setup;
+    }
+
     migration_bitmap = bitmap_new(ram_pages);
     bitmap_set(migration_bitmap, 0, ram_pages);
     migration_dirty_pages = ram_pages;
@@ -833,12 +842,14 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
     qemu_mutex_lock_iothread();
     qemu_mutex_lock_ramlist();
     bytes_transferred = 0;
-    reset_ram_globals();
+    reset_ram_globals(true);
 
     memory_global_dirty_log_start();
     migration_bitmap_sync();
     qemu_mutex_unlock_iothread();
 
+skip_setup:
+
     qemu_put_be64(f, ram_bytes_total() | RAM_SAVE_FLAG_MEM_SIZE);
 
     QTAILQ_FOREACH(block, &ram_list.blocks, next) {
@@ -867,7 +878,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
     qemu_mutex_lock_ramlist();
 
     if (ram_list.version != last_version) {
-        reset_ram_globals();
+        reset_ram_globals(true);
     }
 
     ram_control_before_iterate(f, RAM_CONTROL_ROUND);
@@ -948,7 +959,15 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
     }
 
     ram_control_after_iterate(f, RAM_CONTROL_FINISH);
-    migration_end();
+
+    /*
+     * Only cleanup at the end of normal migrations
+     * or if the MC destination failed and we got an error.
+     * Otherwise, we are (or will be soon) in MIG_STATE_MC.
+     */
+    if(!migrate_use_mc() || migration_has_failed(migrate_get_current())) {
+        migration_end();
+    }
 
     qemu_mutex_unlock_ramlist();
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
diff --git a/include/migration/migration.h b/include/migration/migration.h
index 0e7f121..fcf7684 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -95,6 +95,8 @@ int migrate_fd_close(MigrationState *s);
 void add_migration_state_change_notifier(Notifier *notify);
 void remove_migration_state_change_notifier(Notifier *notify);
 bool migration_in_setup(MigrationState *);
+bool migration_is_active(MigrationState *);
+bool migration_is_mc(MigrationState *s);
 bool migration_has_finished(MigrationState *);
 bool migration_has_failed(MigrationState *);
 MigrationState *migrate_get_current(void);
@@ -126,6 +128,15 @@ void migration_bitmap_worker_start(MigrationState *s);
 void migration_bitmap_worker_stop(MigrationState *s);
 void migrate_set_state(MigrationState *s, int old_state, int new_state);
 
+enum {
+    MIG_STATE_ERROR = -1,
+    MIG_STATE_NONE,
+    MIG_STATE_SETUP,
+    MIG_STATE_CANCELLED,
+    MIG_STATE_ACTIVE,
+    MIG_STATE_MC,
+    MIG_STATE_COMPLETED,
+};
 void ram_handle_compressed(void *host, uint8_t ch, uint64_t size);
 
 /**
@@ -194,4 +205,12 @@ int ram_control_copy_page(QEMUFile *f,
                              ram_addr_t block_offset_source,
                              ram_addr_t offset_source,
                              long size);
+
+int migrate_use_mc(void);
+int migrate_use_mc_rdma_copy(void);
+
+#define MC_VERSION 1
+
+void qemu_rdma_info_save(QEMUFile *f, void *opaque);
+int qemu_rdma_info_load(QEMUFile *f, void *opaque, int version_id);
 #endif
diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h
index d67e97a..b547de9 100644
--- a/include/migration/qemu-file.h
+++ b/include/migration/qemu-file.h
@@ -71,6 +71,7 @@ typedef int (QEMURamHookFunc)(QEMUFile *f, void *opaque, uint64_t flags);
 #define RAM_CONTROL_ROUND    1
 #define RAM_CONTROL_HOOK     2
 #define RAM_CONTROL_FINISH   3
+#define RAM_CONTROL_FLUSH    4
 
 /*
  * This function allows override of where the RAM page
diff --git a/migration.c b/migration.c
index 2b1ab20..62dded3 100644
--- a/migration.c
+++ b/migration.c
@@ -36,15 +36,6 @@
     do { } while (0)
 #endif
 
-enum {
-    MIG_STATE_ERROR = -1,
-    MIG_STATE_NONE,
-    MIG_STATE_SETUP,
-    MIG_STATE_CANCELLED,
-    MIG_STATE_ACTIVE,
-    MIG_STATE_COMPLETED,
-};
-
 #define MAX_THROTTLE  (32 << 20)      /* Migration speed throttling */
 
 /* Amount of time to allocate to each "chunk" of bandwidth-throttled
@@ -270,7 +261,7 @@ void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params,
     MigrationState *s = migrate_get_current();
     MigrationCapabilityStatusList *cap;
 
-    if (s->state == MIG_STATE_ACTIVE || s->state == MIG_STATE_SETUP) {
+    if (migration_is_active(s)) {
         error_set(errp, QERR_MIGRATION_ACTIVE);
         return;
     }
@@ -282,6 +273,17 @@ void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params,
 
 /* shared migration helpers */
 
+bool migration_is_mc(MigrationState *s)
+{
+    return s->state == MIG_STATE_MC;
+}
+
+bool migration_is_active(MigrationState *s)
+{
+    return (s->state == MIG_STATE_ACTIVE) || migration_in_setup(s)
+            || migration_is_mc(s);
+}
+
 static void migrate_fd_cleanup(void *opaque)
 {
     MigrationState *s = opaque;
@@ -299,7 +301,7 @@ static void migrate_fd_cleanup(void *opaque)
         s->file = NULL;
     }
 
-    assert(s->state != MIG_STATE_ACTIVE);
+    assert(!migration_is_active(s));
 
     if (s->state != MIG_STATE_COMPLETED) {
         qemu_savevm_state_cancel();
@@ -308,7 +310,7 @@ static void migrate_fd_cleanup(void *opaque)
     notifier_list_notify(&migration_state_notifiers, s);
 }
 
-static void migrate_set_state(MigrationState *s, int old_state, int new_state)
+void migrate_set_state(MigrationState *s, int old_state, int new_state)
 {
     if (atomic_cmpxchg(&s->state, old_state, new_state) == new_state) {
         trace_migrate_set_state(new_state);
@@ -405,7 +407,7 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk,
     params.blk = has_blk && blk;
     params.shared = has_inc && inc;
 
-    if (s->state == MIG_STATE_ACTIVE || s->state == MIG_STATE_SETUP) {
+    if (migration_is_active(s)) {
         error_set(errp, QERR_MIGRATION_ACTIVE);
         return;
     }
@@ -594,7 +596,10 @@ static void *migration_thread(void *opaque)
                 }
 
                 if (!qemu_file_get_error(s->file)) {
-                    migrate_set_state(s, MIG_STATE_ACTIVE, MIG_STATE_COMPLETED);
+                    if (!migrate_use_mc()) {
+                        migrate_set_state(s,
+                            MIG_STATE_ACTIVE, MIG_STATE_COMPLETED);
+                    }
                     break;
                 }
             }
-- 
1.8.1.2

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] [RFC PATCH v1: 08/12] mc: modified QMP statistics and migration_thread handoff
  2013-10-21  1:14 [Qemu-devel] [RFC PATCH v1: 00/12] fault tolerance through micro-checkpointing mrhines
                   ` (6 preceding siblings ...)
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 07/12] mc: introduce state machine error handling and migration_bitmap prep mrhines
@ 2013-10-21  1:14 ` mrhines
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 09/12] mc: core logic mrhines
                   ` (3 subsequent siblings)
  11 siblings, 0 replies; 16+ messages in thread
From: mrhines @ 2013-10-21  1:14 UTC (permalink / raw)
  To: qemu-devel
  Cc: aliguori, quintela, owasserm, onom, abali, mrhines, gokul, pbonzini

From: "Michael R. Hines" <mrhines@us.ibm.com>

In addition to better handling of new QMP statistics associated
with the migration_bitmap and MC performance, we need to transfer
control from the migration thread to the MC thread more cleanly,
which means dynamically allocating the threads and doing
the handoff after the initial live migration has completed.

Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
---
 hmp.c                         | 17 ++++++++
 include/migration/migration.h | 14 ++++++-
 migration.c                   | 94 +++++++++++++++++++++++++++----------------
 qapi-schema.json              |  2 +
 savevm.c                      |  5 +--
 5 files changed, 93 insertions(+), 39 deletions(-)

diff --git a/hmp.c b/hmp.c
index 32ee285..43896e9 100644
--- a/hmp.c
+++ b/hmp.c
@@ -202,6 +202,23 @@ void hmp_info_migrate(Monitor *mon, const QDict *qdict)
                        info->disk->total >> 10);
     }
 
+    if (info->has_mc) {
+        monitor_printf(mon, "checkpoints: %" PRIu64 "\n",
+                       info->mc->checkpoints);
+        monitor_printf(mon, "xmit_time: %" PRIu64 " ms\n",
+                       info->mc->xmit_time);
+        monitor_printf(mon, "log_dirty_time: %" PRIu64 " ms\n",
+                       info->mc->log_dirty_time);
+        monitor_printf(mon, "migration_bitmap_time: %" PRIu64 " ms\n",
+                       info->mc->migration_bitmap_time);
+        monitor_printf(mon, "ram_copy_time: %" PRIu64 " ms\n",
+                       info->mc->ram_copy_time);
+        monitor_printf(mon, "copy_mbps: %0.2f mbps\n",
+                       info->mc->copy_mbps);
+        monitor_printf(mon, "throughput: %0.2f mbps\n",
+                       info->mc->mbps);
+    }
+
     if (info->has_xbzrle_cache) {
         monitor_printf(mon, "cache size: %" PRIu64 " bytes\n",
                        info->xbzrle_cache->cache_size);
diff --git a/include/migration/migration.h b/include/migration/migration.h
index fcf7684..a1ab06c 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -35,13 +35,14 @@ struct MigrationState
     int64_t bandwidth_limit;
     size_t bytes_xfer;
     size_t xfer_limit;
-    QemuThread thread;
+    QemuThread *thread;
     QEMUBH *cleanup_bh;
     QEMUFile *file;
 
     int state;
     MigrationParams params;
     double mbps;
+    double copy_mbps;
     int64_t total_time;
     int64_t downtime;
     int64_t expected_downtime;
@@ -54,6 +55,7 @@ struct MigrationState
     bool enabled_capabilities[MIGRATION_CAPABILITY_MAX];
     int64_t xbzrle_cache_size;
     int64_t setup_time;
+    int64_t checkpoints;
 };
 
 void process_incoming_migration(QEMUFile *f);
@@ -137,6 +139,12 @@ enum {
     MIG_STATE_MC,
     MIG_STATE_COMPLETED,
 };
+
+int mc_enable_buffering(void);
+int mc_start_buffer(void);
+void mc_init_checkpointer(MigrationState *s);
+void mc_process_incoming_checkpoints_if_requested(QEMUFile *f);
+
 void ram_handle_compressed(void *host, uint8_t ch, uint64_t size);
 
 /**
@@ -207,10 +215,14 @@ int ram_control_copy_page(QEMUFile *f,
                              long size);
 
 int migrate_use_mc(void);
+int migrate_use_mc_net(void);
 int migrate_use_mc_rdma_copy(void);
 
 #define MC_VERSION 1
 
+int mc_info_load(QEMUFile *f, void *opaque, int version_id);
+void mc_info_save(QEMUFile *f, void *opaque);
+
 void qemu_rdma_info_save(QEMUFile *f, void *opaque);
 int qemu_rdma_info_load(QEMUFile *f, void *opaque, int version_id);
 #endif
diff --git a/migration.c b/migration.c
index 62dded3..8e0827e 100644
--- a/migration.c
+++ b/migration.c
@@ -172,6 +172,31 @@ static void get_xbzrle_cache_stats(MigrationInfo *info)
     }
 }
 
+static void get_ram_stats(MigrationState *s, MigrationInfo *info)
+{
+    info->has_total_time = true;
+    info->total_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME)
+        - s->total_time;
+
+    info->has_ram = true;
+    info->ram = g_malloc0(sizeof(*info->ram));
+    info->ram->transferred = ram_bytes_transferred();
+    info->ram->total = ram_bytes_total();
+    info->ram->duplicate = dup_mig_pages_transferred();
+    info->ram->skipped = skipped_mig_pages_transferred();
+    info->ram->normal = norm_mig_pages_transferred();
+    info->ram->normal_bytes = norm_mig_bytes_transferred();
+    info->ram->mbps = s->mbps;
+
+    if (blk_mig_active()) {
+        info->has_disk = true;
+        info->disk = g_malloc0(sizeof(*info->disk));
+        info->disk->transferred = blk_mig_bytes_transferred();
+        info->disk->remaining = blk_mig_bytes_remaining();
+        info->disk->total = blk_mig_bytes_total();
+    }
+}
+
 MigrationInfo *qmp_query_migrate(Error **errp)
 {
     MigrationInfo *info = g_malloc0(sizeof(*info));
@@ -197,26 +222,8 @@ MigrationInfo *qmp_query_migrate(Error **errp)
         info->has_setup_time = true;
         info->setup_time = s->setup_time;
 
-        info->has_ram = true;
-        info->ram = g_malloc0(sizeof(*info->ram));
-        info->ram->transferred = ram_bytes_transferred();
-        info->ram->remaining = ram_bytes_remaining();
-        info->ram->total = ram_bytes_total();
-        info->ram->duplicate = dup_mig_pages_transferred();
-        info->ram->skipped = skipped_mig_pages_transferred();
-        info->ram->normal = norm_mig_pages_transferred();
-        info->ram->normal_bytes = norm_mig_bytes_transferred();
+        get_ram_stats(s, info);
         info->ram->dirty_pages_rate = s->dirty_pages_rate;
-        info->ram->mbps = s->mbps;
-
-        if (blk_mig_active()) {
-            info->has_disk = true;
-            info->disk = g_malloc0(sizeof(*info->disk));
-            info->disk->transferred = blk_mig_bytes_transferred();
-            info->disk->remaining = blk_mig_bytes_remaining();
-            info->disk->total = blk_mig_bytes_total();
-        }
-
         get_xbzrle_cache_stats(info);
         break;
     case MIG_STATE_COMPLETED:
@@ -225,22 +232,37 @@ MigrationInfo *qmp_query_migrate(Error **errp)
         info->has_status = true;
         info->status = g_strdup("completed");
         info->has_total_time = true;
-        info->total_time = s->total_time;
+        info->total_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME)
+            - s->total_time;
         info->has_downtime = true;
         info->downtime = s->downtime;
         info->has_setup_time = true;
         info->setup_time = s->setup_time;
 
-        info->has_ram = true;
-        info->ram = g_malloc0(sizeof(*info->ram));
-        info->ram->transferred = ram_bytes_transferred();
-        info->ram->remaining = 0;
-        info->ram->total = ram_bytes_total();
-        info->ram->duplicate = dup_mig_pages_transferred();
-        info->ram->skipped = skipped_mig_pages_transferred();
-        info->ram->normal = norm_mig_pages_transferred();
-        info->ram->normal_bytes = norm_mig_bytes_transferred();
-        info->ram->mbps = s->mbps;
+        get_ram_stats(s, info);
+        break;
+    case MIG_STATE_MC:
+        info->has_status = true;
+        info->status = g_strdup("checkpointing");
+        info->has_setup_time = true;
+        info->setup_time = s->setup_time;
+        info->has_downtime = true;
+        info->downtime = s->downtime;
+
+        get_ram_stats(s, info);
+        info->ram->dirty_pages_rate = s->dirty_pages_rate;
+        get_xbzrle_cache_stats(info);
+
+
+        info->has_mc = true;
+        info->mc = g_malloc0(sizeof(*info->mc));
+        info->mc->xmit_time = s->xmit_time;
+        info->mc->log_dirty_time = s->log_dirty_time; 
+        info->mc->migration_bitmap_time = s->bitmap_time;
+        info->mc->ram_copy_time = s->ram_copy_time;
+        info->mc->copy_mbps = s->copy_mbps;
+        info->mc->mbps = s->mbps;
+        info->mc->checkpoints = s->checkpoints;
         break;
     case MIG_STATE_ERROR:
         info->has_status = true;
@@ -288,14 +310,17 @@ static void migrate_fd_cleanup(void *opaque)
 {
     MigrationState *s = opaque;
 
-    qemu_bh_delete(s->cleanup_bh);
-    s->cleanup_bh = NULL;
+    if(s->cleanup_bh) {
+        qemu_bh_delete(s->cleanup_bh);
+        s->cleanup_bh = NULL;
+    }
 
     if (s->file) {
         DPRINTF("closing file\n");
         qemu_mutex_unlock_iothread();
-        qemu_thread_join(&s->thread);
+        qemu_thread_join(s->thread);
         qemu_mutex_lock_iothread();
+        g_free(s->thread);
 
         qemu_fclose(s->file);
         s->file = NULL;
@@ -670,6 +695,7 @@ void migrate_fd_connect(MigrationState *s)
     /* Notify before starting migration thread */
     notifier_list_notify(&migration_state_notifiers, s);
 
-    qemu_thread_create(&s->thread, migration_thread, s,
+    s->thread = g_malloc0(sizeof(*s->thread));
+    qemu_thread_create(s->thread, migration_thread, s,
                        QEMU_THREAD_JOINABLE);
 }
diff --git a/qapi-schema.json b/qapi-schema.json
index 8e72bcf..e0a430c 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -630,6 +630,8 @@
 #                migration statistics, only returned if XBZRLE feature is on and
 #                status is 'active' or 'completed' (since 1.2)
 #
+# @mc: #options @MCStats containing details Micro-Checkpointing statistics
+#
 # @total-time: #optional total amount of milliseconds since migration started.
 #        If migration has ended, it returns the total migration
 #        time. (since 1.2)
diff --git a/savevm.c b/savevm.c
index f8eb225..3ad4eea 100644
--- a/savevm.c
+++ b/savevm.c
@@ -419,10 +419,7 @@ QEMUFile *qemu_fdopen(int fd, const char *mode)
 {
     QEMUFileSocket *s;
 
-    if (mode == NULL ||
-	(mode[0] != 'r' && mode[0] != 'w') ||
-	mode[1] != 'b' || mode[2] != 0) {
-        fprintf(stderr, "qemu_fdopen: Argument validity check failed\n");
+    if (qemu_file_mode_is_not_valid(mode)) {
         return NULL;
     }
 
-- 
1.8.1.2

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] [RFC PATCH v1: 09/12] mc: core logic
  2013-10-21  1:14 [Qemu-devel] [RFC PATCH v1: 00/12] fault tolerance through micro-checkpointing mrhines
                   ` (7 preceding siblings ...)
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 08/12] mc: modified QMP statistics and migration_thread handoff mrhines
@ 2013-10-21  1:14 ` mrhines
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 10/12] mc: configure and makefile support mrhines
                   ` (2 subsequent siblings)
  11 siblings, 0 replies; 16+ messages in thread
From: mrhines @ 2013-10-21  1:14 UTC (permalink / raw)
  To: qemu-devel
  Cc: aliguori, quintela, owasserm, onom, abali, mrhines, gokul, pbonzini

From: "Michael R. Hines" <mrhines@us.ibm.com>

This implements the core logic, all described in docs/mc.txt

Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
---
 migration-checkpoint.c | 1589 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 1589 insertions(+)
 create mode 100644 migration-checkpoint.c

diff --git a/migration-checkpoint.c b/migration-checkpoint.c
new file mode 100644
index 0000000..14b03e8
--- /dev/null
+++ b/migration-checkpoint.c
@@ -0,0 +1,1589 @@
+/*
+ *  Copyright (C) 2014 Michael R. Hines <mrhines@us.ibm.com>
+ *
+ *  Micro-Checkpointing (MC) support 
+ *  (a.k.a. Fault Tolerance or Continuous Replication)
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; under version 2 of the License.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program; if not, see <http://www.gnu.org/licenses/>.
+ */
+#include <libnl3/netlink/route/qdisc/plug.h>
+#include <libnl3/netlink/route/class.h>
+#include <libnl3/netlink/cli/utils.h>
+#include <libnl3/netlink/cli/tc.h>
+#include <libnl3/netlink/cli/qdisc.h>
+#include <libnl3/netlink/cli/link.h>
+#include "qemu-common.h"
+#include "hw/virtio/virtio.h"
+#include "hw/virtio/virtio-net.h"
+#include "qemu/sockets.h"
+#include "migration/migration.h"
+#include "migration/qemu-file.h"
+#include "qmp-commands.h"
+#include "net/tap-linux.h"
+#include <sys/ioctl.h>
+
+#define DEBUG_MC
+//#define DEBUG_MC_VERBOSE
+//#define DEBUG_MC_REALLY_VERBOSE
+
+#ifdef DEBUG_MC
+#define DPRINTF(fmt, ...) \
+    do { printf("mc: " fmt, ## __VA_ARGS__); } while (0)
+#else
+#define DPRINTF(fmt, ...) \
+    do { } while (0)
+#endif
+
+#ifdef DEBUG_MC_VERBOSE
+#define DDPRINTF(fmt, ...) \
+    do { printf("mc: " fmt, ## __VA_ARGS__); } while (0)
+#else
+#define DDPRINTF(fmt, ...) \
+    do { } while (0)
+#endif
+
+#ifdef DEBUG_MC_REALLY_VERBOSE
+#define DDDPRINTF(fmt, ...) \
+    do { printf("mc: " fmt, ## __VA_ARGS__); } while (0)
+#else
+#define DDDPRINTF(fmt, ...) \
+    do { } while (0)
+#endif
+
+#define MBPS(bytes, time) time ? ((((double) bytes * 8)         \
+        / ((double) time / 1000.0)) / 1000.0 / 1000.0) : -1.0
+
+/*
+ * Micro checkpoints (MC)s are typically only a few MB when idle.
+ * However, they can easily be very large during heavy workloads.
+ * In the *extreme* worst-case, QEMU might need double the amount of main memory
+ * than that of what was originally allocated to the virtual machine.
+ *
+ * To support this variability during transient periods, a MC
+ * consists of a linked list of slabs, each of identical size. A better name
+ * would be welcome, as the name was only chosen because it resembles linux
+ * memory allocation. Because MCs occur several times per second 
+ * (a frequency of 10s of milliseconds), slabs allow MCs to grow and shrink 
+ * without constantly re-allocating all memory in place during each checkpoint.
+ *
+ * During steady-state, the 'head' slab is permanently allocated and never goes
+ * away, so when the VM is idle, there is no memory allocation at all.
+ * This design supports the use of RDMA. Since RDMA requires memory pinning, we
+ * must be able to hold on to a slab for a reasonable amount of time to get any
+ * real use out of it.
+ *
+ * Regardless, the current strategy taken is:
+ * 
+ * 1. If the checkpoint size increases,
+ *    then grow the number of slabs to support it.
+ * 2. If the next checkpoint size is smaller than the last one,
+      then that's a "strike".
+ * 3. After N strikes, cut the size of the slab cache in half
+ *    (to a minimum of 1 slab as described before).
+ *
+ * As of this writing, a typical average size of 
+ * an Idle-VM checkpoint is under 5MB.
+ */
+
+#define MC_SLAB_BUFFER_SIZE     (5UL * 1024UL * 1024UL) /* empirical */
+#define MC_DEV_NAME_MAX_SIZE    256
+
+#define MC_DEFAULT_CHECKPOINT_FREQ_MS 100 /* too slow, but best for now */
+#define CALC_MAX_STRIKES()                                           \
+    do {  max_strikes = (max_strikes_delay_secs * 1000) / freq_ms; } \
+    while (0)
+
+/*
+ * How many "seconds-worth" of checkpoints to wait before re-evaluating the size
+ * of the slab cache?
+ *
+ * #strikes_until_shrink_cache = Function(#checkpoints/sec)
+ *
+ * Increasing the number of seconds, increases the number of strikes needed to
+ * be reached until it is time to cut the cache in half.
+ *
+ * Below value is open for debate - we just want it to be small enough to ensure
+ * that a large, idle cache doesn't stay too large for too long.
+ */
+#define MC_DEFAULT_SLAB_MAX_CHECK_DELAY_SECS 10
+
+/* 
+ * MC serializes the actual RAM page contents in such a way that the actual
+ * pages are separated from the meta-data (all the QEMUFile stuff).
+ *
+ * This is done strictly for the purposes of being able to use RDMA
+ * to replace memcpy() on the local machine.
+ * 
+ * This serialization requires recording the page descriptions and then
+ * pushing them into slabs after the checkpoint has been captured
+ * (minus the page data).
+ *
+ * The memory holding the page descriptions are allocated in unison with the
+ * slabs themselves, and thus we need to know in advance the maximum number of
+ * page descriptions that can fit into a slab before allocating the slab.
+ * It should be safe to assume the *minimum* page size (not the maximum,
+ * that would be dangerous) is 4096.
+ *
+ * We're not actually using this assumption for any memory management 
+ * management, only as a hint to know how big of an array to allocate.
+ *
+ * The following adds a fixed-cost of about 40 KB to each slab.
+ */
+#define MC_MAX_SLAB_COPY_DESCRIPTORS (MC_SLAB_BUFFER_SIZE / 4096)
+
+#define SLAB_RESET(s) do {                      \
+                            s->size = 0;      \
+                            s->read = 0;      \
+                      } while(0)
+
+uint64_t freq_ms = MC_DEFAULT_CHECKPOINT_FREQ_MS;
+uint32_t max_strikes_delay_secs = MC_DEFAULT_SLAB_MAX_CHECK_DELAY_SECS;
+uint32_t max_strikes = -1;
+
+typedef struct QEMU_PACKED MCCopy {
+    uint64_t ramblock_offset;
+    uint64_t host_addr;
+    uint64_t offset;
+    uint64_t size;
+} MCCopy;
+
+typedef struct QEMU_PACKED MCCopyset {
+    QTAILQ_ENTRY(MCCopyset) node;
+    MCCopy copies[MC_MAX_SLAB_COPY_DESCRIPTORS];
+    uint64_t nb_copies;
+    int idx;
+} MCCopyset;
+
+typedef struct QEMU_PACKED MCSlab {
+    QTAILQ_ENTRY(MCSlab) node;
+    uint8_t buf[MC_SLAB_BUFFER_SIZE];
+    uint64_t read;
+    uint64_t size;
+    int idx;
+} MCSlab;
+
+typedef struct MCParams {
+    QTAILQ_HEAD(shead, MCSlab) slab_head;
+    QTAILQ_HEAD(chead, MCCopyset) copy_head;
+    MCSlab *curr_slab;
+    MCSlab *mem_slab;
+    MCCopyset *curr_copyset;
+    MCCopy *copy;
+    QEMUFile *file;
+    QEMUFile *staging;
+    uint64_t start_copyset;
+    uint64_t slab_total;
+    uint64_t total_copies;
+    uint64_t nb_slabs;
+    uint64_t used_slabs;
+    uint32_t slab_strikes;
+    uint32_t copy_strikes;
+    int nb_copysets;
+    uint64_t checkpoints;
+} MCParams;
+
+enum {
+    MC_TRANSACTION_NACK = 300,
+    MC_TRANSACTION_START,
+    MC_TRANSACTION_COMMIT,
+    MC_TRANSACTION_ABORT,
+    MC_TRANSACTION_ACK,
+    MC_TRANSACTION_END,
+    MC_TRANSACTION_ANY,
+};
+
+static const char * mc_desc[] = {
+    [MC_TRANSACTION_NACK] = "NACK",
+    [MC_TRANSACTION_START] = "START",
+    [MC_TRANSACTION_COMMIT] = "COMMIT",
+    [MC_TRANSACTION_ABORT] = "ABORT",
+    [MC_TRANSACTION_ACK] = "ACK",
+    [MC_TRANSACTION_END] = "END",
+    [MC_TRANSACTION_ANY] = "ANY",
+};
+
+static struct rtnl_qdisc        *qdisc      = NULL;
+static struct nl_sock           *sock       = NULL;
+static struct rtnl_tc           *tc         = NULL;
+static struct nl_cache          *link_cache = NULL;
+static struct rtnl_tc_ops       *ops        = NULL;
+static struct nl_cli_tc_module  *tm         = NULL;
+static int first_nic_chosen = 0;
+
+/*
+ * Assuming a guest can 'try' to fill a 1 Gbps pipe,
+ * that works about to 125000000 bytes/sec.
+ *
+ * Netlink better not be pre-allocating megabytes in the
+ * kernel qdisc, that would be crazy....
+ */
+#define START_BUFFER (1000*1000*1000 / 8)
+static int buffer_size = START_BUFFER, new_buffer_size = START_BUFFER;
+static const char * parent = "root";
+static int buffering_enabled = 0;
+static const char * BUFFER_NIC_PREFIX = "ifb";
+static QEMUBH *checkpoint_bh = NULL;
+static bool mc_requested = false;
+
+int migrate_use_mc(void)
+{
+    MigrationState *s = migrate_get_current();
+    return s->enabled_capabilities[MIGRATION_CAPABILITY_X_MC];
+}
+
+int migrate_use_mc_net(void)
+{
+    MigrationState *s = migrate_get_current();
+    return s->enabled_capabilities[MIGRATION_CAPABILITY_MC_NET_DISABLE];
+}
+
+int migrate_use_mc_rdma_copy(void)
+{
+    MigrationState *s = migrate_get_current();
+    return s->enabled_capabilities[MIGRATION_CAPABILITY_MC_RDMA_COPY];
+}
+
+static int mc_deliver(int update)
+{
+    int err, flags = NLM_F_CREATE;
+
+    if (!buffering_enabled)
+        return -EINVAL;
+
+    if (!update)
+        flags |= NLM_F_EXCL;
+ 
+    if ((err = rtnl_qdisc_add(sock, qdisc, flags)) < 0) {
+        fprintf(stderr, "Unable control qdisc: %s! %p %p %d\n",
+            nl_geterror(err), sock, qdisc, flags);
+        return -EINVAL;
+    }
+
+    return 0;
+}
+
+static int mc_set_buffer_size(int size)
+{
+    int err;
+
+    if (!buffering_enabled) {
+        return 1;
+    }
+
+    buffer_size = size;
+    new_buffer_size = size;
+
+    if ((err = rtnl_qdisc_plug_set_limit((void *) qdisc, size)) < 0) {
+       fprintf(stderr, "MC: Unable to change buffer size: %s\n",
+			nl_geterror(err));
+       return -EINVAL;
+    }
+
+    DPRINTF("Set buffer size to %d bytes\n", size);
+
+    return mc_deliver(1);
+}
+
+/*
+ * Micro-checkpointing may require buffering network packets.
+ * Set that up for the first NIC only....
+ */
+static void init_mc_nic_buffering(NICState *nic, void *opaque)
+{
+    char * device = opaque;
+    NetClientState * nc = &nic->ncs[0];
+    const char * key = "ifname=";
+    int keylen = strlen(key);
+    char * name;
+    int end = 0;
+    bool use_fd = false;
+   
+    if (first_nic_chosen) {
+         fprintf(stderr, "Micro-Checkpointing with multiple NICs not yet supported!\n");
+         return;
+    }
+
+    if (!nc->peer) {
+        fprintf(stderr, "Micro-Checkpoint nic %s does not have peer host device for buffering. VM will not be consistent.\n", nc->name);
+        return;
+    }
+
+    name = nc->peer->info_str;
+
+    DPRINTF("Checking contents of %s\n", name);
+
+    if (strncmp(name, key, keylen)) {
+        fprintf(stderr, "Micro-Checkpoint nic %s does not have 'ifname' "
+                        "in its description (%s, %s). Trying workaround...\n",
+                        nc->name, name, nc->peer->name);
+        key = "fd=";
+        keylen = strlen(key);
+        if (strncmp(name, key, keylen)) {
+            fprintf(stderr, "Still cannot find 'fd=' either. Failure.\n");
+            return;
+        }
+
+        use_fd = true;
+    }
+
+    name += keylen;
+
+    while (name[end++] != (use_fd ? '\0' : ','));
+
+    strncpy(device, name, end - 1);
+    memset(&device[end - 1], 0, MC_DEV_NAME_MAX_SIZE - (end - 1));
+
+    if (use_fd) {
+        struct ifreq r;
+        DPRINTF("Want to retreive name from fd: %d\n", atoi(device));
+
+        if (ioctl(atoi(device), TUNGETIFF, &r) == -1) {
+            fprintf(stderr, "Failed to convert fd %s to name.\n", device);
+            return;
+        }
+
+        DPRINTF("Got name %s!\n", r.ifr_name);
+        strcpy(device, r.ifr_name);
+    }
+
+    first_nic_chosen = 1;
+}
+
+static int mc_suspend_buffering(void)
+{
+    int err;
+
+    if (!buffering_enabled) {
+        return -EINVAL;
+    }
+
+    if ((err = rtnl_qdisc_plug_release_indefinite((void *) qdisc)) < 0) {
+        fprintf(stderr, "MC: Unable to release indefinite: %s\n",
+            nl_geterror(err));
+        return -EINVAL;
+    }
+
+    DPRINTF("Buffering suspended\n");
+
+    return mc_deliver(1);
+}
+
+static int mc_disable_buffering(void)
+{
+    int err;
+
+    if (!buffering_enabled) {
+		goto out;
+	}
+
+    mc_suspend_buffering();
+
+    if (qdisc && sock && (err = rtnl_qdisc_delete(sock, (void *) qdisc)) < 0) {
+        fprintf(stderr, "Unable to release indefinite: %s\n", nl_geterror(err));
+    }
+
+out:
+    buffering_enabled = 0;
+    qdisc = NULL;
+    sock = NULL;
+    tc = NULL;
+    link_cache = NULL;
+    ops = NULL;
+    tm = NULL;
+
+    DPRINTF("Buffering disabled\n");
+
+    return 0;
+}
+
+/*
+ * Install a Qdisc plug for micro-checkpointing.
+ * If it exists already (say, from a previous dead VM or debugging
+ * session) then just open all the netlink data structures pointing
+ * to the existing plug so that we can continue to manipulate it.
+ */
+int mc_enable_buffering(void)
+{
+    char dev[MC_DEV_NAME_MAX_SIZE], buffer_dev[MC_DEV_NAME_MAX_SIZE];
+    int prefix_len = 0;
+    int buffer_prefix_len = strlen(BUFFER_NIC_PREFIX);
+
+    if (buffering_enabled) {
+        fprintf(stderr, "Buffering already enable Skipping.\n");
+        return 0;
+    }
+
+    first_nic_chosen = 0;
+
+    qemu_foreach_nic(init_mc_nic_buffering, dev);
+
+    if (!first_nic_chosen) {
+        fprintf(stderr, "Enumeration of NICs complete, but failed.\n");
+        goto failed;
+    }
+
+    while ((dev[prefix_len] < '0') || (dev[prefix_len] > '9'))
+        prefix_len++;
+
+    strcpy(buffer_dev, BUFFER_NIC_PREFIX);
+    strncpy(buffer_dev + buffer_prefix_len,
+                dev + prefix_len, strlen(dev) - prefix_len + 1);
+
+    fprintf(stderr, "Initializing buffering for nic %s => %s\n", dev, buffer_dev);
+
+    if (sock == NULL) {
+        sock = (struct nl_sock *) nl_cli_alloc_socket();
+        if (!sock) {
+            fprintf(stderr, "MC: failed to allocate netlink socket\n");
+            goto failed;
+        }
+		nl_cli_connect(sock, NETLINK_ROUTE);
+    }
+
+    if (qdisc == NULL) {
+        qdisc = nl_cli_qdisc_alloc();
+        if (!qdisc) {
+            fprintf(stderr, "MC: failed to allocate netlink qdisc\n");
+            goto failed;
+        }
+        tc = (struct rtnl_tc *) qdisc;
+    }
+
+    if (link_cache == NULL) {
+		link_cache = nl_cli_link_alloc_cache(sock);
+        if (!link_cache) {
+            fprintf(stderr, "MC: failed to allocate netlink link_cache\n");
+            goto failed;
+        }
+    }
+
+    nl_cli_tc_parse_dev(tc, link_cache, (char *) buffer_dev);
+    nl_cli_tc_parse_parent(tc, (char *) parent);
+
+    if (!rtnl_tc_get_ifindex(tc)) {
+        fprintf(stderr, "Qdisc device '%s' does not exist!\n", buffer_dev);
+        goto failed;
+    }
+
+    if (!rtnl_tc_get_parent(tc)) {
+        fprintf(stderr, "Qdisc parent '%s' is not valid!\n", parent);
+        goto failed;
+    }
+
+    if (rtnl_tc_set_kind(tc, "plug") < 0) {
+        fprintf(stderr, "Could not open qdisc plug!\n");
+        goto failed;
+    }
+
+    if (!(ops = rtnl_tc_get_ops(tc))) {
+        fprintf(stderr, "Could not open qdisc plug!\n");
+        goto failed;
+    }
+
+    if (!(tm = nl_cli_tc_lookup(ops))) {
+        fprintf(stderr, "Qdisc plug not supported!\n");
+        goto failed;
+    }
+   
+    buffering_enabled = 1;
+
+    if (mc_deliver(0) < 0) {
+		fprintf(stderr, "First time qdisc create failed\n");
+		goto failed;
+    }
+
+    DPRINTF("Buffering enabled, size: %d MB.\n", buffer_size / 1024 / 1024);
+  
+    if (mc_set_buffer_size(buffer_size) < 0) {
+		goto failed;
+	}
+
+    if (mc_suspend_buffering() < 0) {
+		goto failed;
+	}
+
+
+    return 0;
+
+failed:
+    mc_disable_buffering();
+    return -EINVAL;
+}
+
+int mc_start_buffer(void)
+{
+    int err;
+
+    if (!buffering_enabled) {
+        return -EINVAL;
+    }
+
+    if (new_buffer_size != buffer_size) {
+        buffer_size = new_buffer_size;
+        fprintf(stderr, "GDB setting new buffer size to %d\n", buffer_size);
+        if (mc_set_buffer_size(buffer_size) < 0)
+            return -EINVAL;
+    }
+
+    if ((err = rtnl_qdisc_plug_buffer((void *) qdisc)) < 0) {
+        fprintf(stderr, "Unable to flush oldest checkpoint: %s\n", nl_geterror(err));
+        return -EINVAL;
+    }
+
+    DDPRINTF("Inserted checkpoint barrier\n");
+
+    return mc_deliver(1);
+}
+
+static int mc_flush_oldest_buffer(void)
+{
+    int err;
+
+    if (!buffering_enabled)
+        return -EINVAL;
+
+    if ((err = rtnl_qdisc_plug_release_one((void *) qdisc)) < 0) {
+        fprintf(stderr, "Unable to flush oldest checkpoint: %s\n", nl_geterror(err));
+        return -EINVAL;
+    }
+
+    DDPRINTF("Flushed oldest checkpoint barrier\n");
+
+    return mc_deliver(1);
+}
+
+/*
+ * Get the next slab in the list. If there is none, then make one.
+ */
+static MCSlab *mc_slab_next(MCParams *mc, MCSlab *slab)
+{
+    if (!QTAILQ_NEXT(slab, node)) {
+        int idx = mc->nb_slabs++;
+        mc->used_slabs++;
+        DDPRINTF("Extending slabs by one: %" PRIu64 " slabs total, "
+                 "%" PRIu64 " MB\n", mc->nb_slabs,
+                 mc->nb_slabs * sizeof(MCSlab) / 1024UL / 1024UL);
+        mc->curr_slab = qemu_memalign(4096, sizeof(MCSlab));
+        memset(mc->curr_slab, 0, sizeof(*(mc->curr_slab)));
+        mc->curr_slab->idx = idx;
+        QTAILQ_INSERT_TAIL(&mc->slab_head, mc->curr_slab, node);
+        slab = mc->curr_slab;
+        ram_control_add(mc->file, slab->buf, 
+                (uint64_t) slab->buf, MC_SLAB_BUFFER_SIZE);
+    } else {
+        slab = QTAILQ_NEXT(slab, node);
+        mc->used_slabs++;
+    }
+
+    mc->curr_slab = slab;
+    SLAB_RESET(slab);
+
+    if (slab->idx == mc->start_copyset) {
+        DDPRINTF("Found copyset slab @ idx %d\n", slab->idx);
+        mc->mem_slab = slab;
+    }
+
+    return slab;
+}
+
+static int mc_put_buffer(void *opaque, const uint8_t *buf,
+                                  int64_t pos, int size)
+{
+    MCParams *mc = opaque;
+    MCSlab *slab = mc->curr_slab;
+    uint64_t len = size;
+
+    assert(slab);
+
+    while (len) {
+        long put = MIN(MC_SLAB_BUFFER_SIZE - slab->size, len);
+
+        if (put == 0) {
+            DDPRINTF("Reached the end of slab %d Need a new one\n", slab->idx);
+            goto zero;
+        }
+
+        if (mc->copy && migrate_use_mc_rdma_copy()) {
+            int ret = ram_control_copy_page(mc->file, 
+                                        (uint64_t) slab->buf,
+                                        slab->size,
+                                        (ram_addr_t) mc->copy->ramblock_offset,
+                                        (ram_addr_t) mc->copy->offset,
+                                        put);
+
+            DDDPRINTF("Attempted offloaded memcpy.\n");
+
+            if (ret != RAM_COPY_CONTROL_NOT_SUPP) {
+                if (ret == RAM_COPY_CONTROL_DELAYED) {
+                    DDDPRINTF("Offloaded memcpy successful.\n"); 
+                    mc->copy->offset += put;
+                    goto next;
+                } else {
+                    fprintf(stderr, "Offloaded memcpy failed: %d\n", ret);
+                    return ret;
+                }
+            }
+        }
+
+        DDDPRINTF("Copying to %p from %p, size %" PRId64 "\n",
+                 slab->buf + slab->size, buf, put);
+
+        memcpy(slab->buf + slab->size, buf, put);
+next:
+
+        buf            += put;
+        slab->size     += put;
+        len            -= put;
+        mc->slab_total += put;
+
+        DDDPRINTF("put: %" PRIu64 " len: %" PRIu64
+                  " total %" PRIu64 " size: %" PRIu64 
+                  " slab %d\n",
+                  put, len, mc->slab_total, slab->size,
+                  slab->idx);
+zero:
+        if (len) {
+            slab = mc_slab_next(mc, slab);
+        }
+    }
+
+    return size;
+}
+
+/*
+ * Stop the VM, generate the micro checkpoint,
+ * but save the dirty memory into staging memory
+ * (buffered_file will sit on it) until
+ * we can re-activate the VM as soon as possible.
+ */
+static int capture_checkpoint(MCParams *mc, MigrationState *s)
+{
+    MCCopyset *copyset;
+    int idx, ret = 0;
+    uint64_t start, stop, copies = 0;
+    int64_t start_time;
+
+    mc->total_copies = 0;
+    qemu_mutex_lock_iothread();
+    vm_stop_force_state(RUN_STATE_CHECKPOINT_VM);
+    start = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
+
+    /*
+     * If buffering is enabled, insert a Qdisc plug here
+     * to hold packets for the *next* MC, (not this one,
+     * the packets for this one have already been plugged
+     * and will be released after the MC has been transmitted.
+     */
+    mc_start_buffer();
+
+    qemu_savevm_state_begin(mc->staging, &s->params);
+    ret = qemu_file_get_error(s->file);
+
+    if (ret < 0) {
+        migrate_set_state(s, MIG_STATE_MC, MIG_STATE_ERROR);
+    }
+
+    qemu_savevm_state_complete(mc->staging);
+
+    ret = qemu_file_get_error(s->file);
+    if (ret < 0) {
+        migrate_set_state(s, MIG_STATE_MC, MIG_STATE_ERROR);
+        goto out;
+    }
+
+    /*
+     * The copied memory gets appended to the end of the snapshot, so let's
+     * remember where its going to go first and start a new slab.
+     */
+    mc_slab_next(mc, mc->curr_slab);
+    mc->start_copyset = mc->curr_slab->idx;
+
+    start_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
+
+    /*
+     * Now perform the actual copy of memory into the tail end of the slab list. 
+     */
+    QTAILQ_FOREACH(copyset, &mc->copy_head, node) {
+        if (!copyset->nb_copies) {
+            break;
+        }
+
+        copies += copyset->nb_copies;
+
+        DDDPRINTF("copyset %d copies: %" PRIu64 " total: %" PRIu64 "\n",
+                copyset->idx, copyset->nb_copies, copies);
+
+        for (idx = 0; idx < copyset->nb_copies; idx++) {
+            uint8_t *addr;
+            long size;
+            mc->copy = &copyset->copies[idx];
+            addr = (uint8_t *) (mc->copy->host_addr + mc->copy->offset);
+            size = mc_put_buffer(mc, addr, mc->copy->offset, mc->copy->size);
+            if (size != mc->copy->size) {
+                fprintf(stderr, "Failure to initiate copyset %d index %d\n",
+                        copyset->idx, idx);
+                migrate_set_state(s, MIG_STATE_MC, MIG_STATE_ERROR);
+                vm_start();
+                goto out;
+            }
+
+            DDDPRINTF("Success copyset %d index %d\n", copyset->idx, idx);
+        }
+
+        copyset->nb_copies = 0;
+    }
+
+    s->ram_copy_time = (qemu_clock_get_ms(QEMU_CLOCK_REALTIME) - start_time);
+
+    mc->copy = NULL;
+    ram_control_before_iterate(mc->file, RAM_CONTROL_FLUSH); 
+    assert(mc->total_copies == copies);
+
+    stop = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
+
+    /*
+     * MC is safe in staging area. Let the VM go.
+     */
+    vm_start();
+    qemu_fflush(mc->staging);
+
+    s->downtime = stop - start;
+out:
+    qemu_mutex_unlock_iothread();
+    return ret;
+}
+
+/*
+ * Synchronously send a micro-checkpointing command
+ */
+static int mc_send(QEMUFile *f, uint64_t request)
+{
+    int ret = 0;
+
+    qemu_put_be64(f, request);
+
+    ret = qemu_file_get_error(f);
+    if (ret) {
+        fprintf(stderr, "transaction: send error while sending %" PRIu64 ", "
+                "bailing: %s\n", request, strerror(-ret));
+    } else {
+        DDPRINTF("transaction: sent: %s (%" PRIu64 ")\n", 
+            mc_desc[request], request);
+    }
+
+    qemu_fflush(f);
+
+    return ret;
+}
+
+/*
+ * Synchronously receive a micro-checkpointing command
+ */
+static int mc_recv(QEMUFile *f, uint64_t request, uint64_t *action)
+{
+    int ret = 0;
+    uint64_t got;
+
+    got = qemu_get_be64(f);
+
+    ret = qemu_file_get_error(f);
+    if (ret) {
+        fprintf(stderr, "transaction: recv error while expecting %s (%"
+                PRIu64 "), bailing: %s\n", mc_desc[request], 
+                request, strerror(-ret));
+    } else {
+        if ((request != MC_TRANSACTION_ANY) && request != got) {
+            fprintf(stderr, "transaction: was expecting %s (%" PRIu64 
+                    ") but got %" PRIu64 " instead\n",
+                    mc_desc[request], request, got);
+            ret = -EINVAL;
+        } else {
+            DDPRINTF("transaction: recv: %s (%" PRIu64 ")\n", 
+                     mc_desc[got], got);
+            ret = 0;
+            if (action) {
+                *action = got;
+            }
+        }
+    }
+
+    return ret;
+}
+
+static int migrate_use_bitworkers(void)
+{
+    MigrationState *s = migrate_get_current();
+    return s->enabled_capabilities[MIGRATION_CAPABILITY_BITWORKERS];
+}
+
+static MCSlab *mc_slab_start(MCParams *mc)
+{
+    if (mc->nb_slabs > 2) {
+        if (mc->slab_strikes >= max_strikes) {
+            uint64_t nb_slabs_to_free = MAX(1, (((mc->nb_slabs - 1) / 2)));
+
+            DPRINTF("MC has reached max strikes. Will free %" 
+                    PRIu64 " / %" PRIu64 " slabs max %d, "
+                    "checkpoints %" PRIu64 "\n",
+                    nb_slabs_to_free, mc->nb_slabs,
+                    max_strikes, mc->checkpoints);
+
+            mc->slab_strikes = 0;
+
+            while (nb_slabs_to_free) {
+                MCSlab *slab = QTAILQ_LAST(&mc->slab_head, shead);
+                ram_control_remove(mc->file, (uint64_t) slab->buf);
+                QTAILQ_REMOVE(&mc->slab_head, slab, node);
+                g_free(slab);
+                nb_slabs_to_free--;
+                mc->nb_slabs--;
+            }
+
+            goto skip;
+        } else if (((mc->slab_total <= 
+                    ((mc->nb_slabs - 1) * MC_SLAB_BUFFER_SIZE)))) {
+            mc->slab_strikes++;
+            DDPRINTF("MC has strike %d slabs %" PRIu64 " max %d\n", 
+                     mc->slab_strikes, mc->nb_slabs, max_strikes);
+            goto skip;
+        }
+    }
+
+    if (mc->slab_strikes) {
+        DDPRINTF("MC used all slabs. Resetting strikes to zero.\n");
+        mc->slab_strikes = 0;
+    }
+skip:
+
+    mc->used_slabs = 1;
+    mc->slab_total = 0;
+    mc->curr_slab = QTAILQ_FIRST(&mc->slab_head);
+    SLAB_RESET(mc->curr_slab);
+
+    return mc->curr_slab;
+}
+
+static MCCopyset *mc_copy_start(MCParams *mc)
+{
+    if (mc->nb_copysets >= 2) {
+        if (mc->copy_strikes >= max_strikes) {
+            int nb_copies_to_free = MAX(1, (((mc->nb_copysets - 1) / 2)));
+
+            DPRINTF("MC has reached max strikes. Will free %d / %d copies max %d\n",
+                    nb_copies_to_free, mc->nb_copysets, max_strikes);
+
+            mc->copy_strikes = 0;
+
+            while (nb_copies_to_free) {
+                MCCopyset * copyset = QTAILQ_LAST(&mc->copy_head, chead);
+                QTAILQ_REMOVE(&mc->copy_head, copyset, node);
+                g_free(copyset);
+                nb_copies_to_free--;
+                mc->nb_copysets--;
+            }
+
+            goto skip;
+        } else if (((mc->total_copies <= 
+                    ((mc->nb_copysets - 1) * MC_MAX_SLAB_COPY_DESCRIPTORS)))) {
+            mc->copy_strikes++;
+            DDPRINTF("MC has strike %d copies %d max %d\n", 
+                     mc->copy_strikes, mc->nb_copysets, max_strikes);
+            goto skip;
+        }
+    }
+
+    if (mc->copy_strikes) {
+        DDPRINTF("MC used all copies. Resetting strikes to zero.\n");
+        mc->copy_strikes = 0;
+    }
+skip:
+
+    mc->total_copies = 0;
+    mc->curr_copyset = QTAILQ_FIRST(&mc->copy_head);
+    mc->curr_copyset->nb_copies = 0;
+
+    return mc->curr_copyset;
+}
+
+/*
+ * Main MC loop. Stop the VM, dump the dirty memory
+ * into buffered_file, restart the VM, transmit the MC,
+ * and then sleep for some milliseconds before
+ * starting the next MC.
+ */
+static void *mc_thread(void *opaque)
+{
+    MigrationState *s = opaque;
+    MCParams mc = { .file = s->file };
+    MCSlab * slab;
+    int64_t initial_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
+    int ret = 0, fd = qemu_get_fd(s->file), x;
+    QEMUFile *mc_control, *mc_staging = NULL;
+    uint64_t wait_time = 0;
+   
+    if (migrate_use_bitworkers()) {
+        DPRINTF("Starting bitmap workers.\n");
+        qemu_mutex_lock_iothread();
+        migration_bitmap_worker_start(s);
+        qemu_mutex_unlock_iothread();
+    }
+
+    if (!(mc_control = qemu_fopen_socket(fd, "rb"))) {
+        fprintf(stderr, "Failed to setup read MC control\n");
+        goto err;
+    }
+
+    if (!(mc_staging = qemu_fopen_mc(&mc, "wb"))) {
+        fprintf(stderr, "Failed to setup MC staging area\n");
+        goto err;
+    }
+
+    mc.staging = mc_staging;
+
+    qemu_set_block(fd);
+    socket_set_nodelay(fd);
+
+    s->checkpoints = 0;
+
+    while (s->state == MIG_STATE_MC) {
+        int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
+        int64_t start_time, xmit_start, end_time;
+        bool commit_sent = false;
+        int nb_slab = 0;
+        (void)nb_slab;
+        
+        slab = mc_slab_start(&mc);
+        mc_copy_start(&mc);
+        acct_clear();
+        start_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
+
+        if (capture_checkpoint(&mc, s) < 0)
+                break;
+
+        xmit_start = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
+
+        if ((ret = mc_send(s->file, MC_TRANSACTION_START) < 0)) {
+            fprintf(stderr, "transaction start failed\n");
+            break;
+        }
+        
+        DDPRINTF("Sending checkpoint size %" PRId64 
+                 " copyset start: %" PRIu64 " nb slab %" PRIu64 
+                 " used slabs %" PRIu64 "\n",
+                 mc.slab_total, mc.start_copyset, mc.nb_slabs, mc.used_slabs);
+
+        mc.curr_slab = QTAILQ_FIRST(&mc.slab_head);
+
+        qemu_put_be64(s->file, mc.slab_total);
+        qemu_put_be64(s->file, mc.start_copyset);
+        qemu_put_be64(s->file, mc.used_slabs);
+
+        qemu_fflush(s->file);
+       
+        DDPRINTF("Transaction commit\n");
+
+        /*
+         * The MC is safe, and VM is running again.
+         * Start a transaction and send it.
+         */
+        ram_control_before_iterate(s->file, RAM_CONTROL_ROUND); 
+
+        slab = QTAILQ_FIRST(&mc.slab_head);
+
+        for (x = 0; x < mc.used_slabs; x++) {
+            DDPRINTF("Attempting write to slab #%d: %p"
+                    " total size: %" PRId64 " / %" PRIu64 "\n",
+                    nb_slab++, slab->buf, slab->size, MC_SLAB_BUFFER_SIZE);
+
+            ret = ram_control_save_page(s->file, (uint64_t) slab->buf,
+                                        NULL, 0, slab->size, NULL);
+
+            if (ret == RAM_SAVE_CONTROL_NOT_SUPP) {
+                if (!commit_sent) {
+                    if ((ret = mc_send(s->file, MC_TRANSACTION_COMMIT) < 0)) {
+                        fprintf(stderr, "transaction commit failed\n");
+                        break;
+                    }
+                    commit_sent = true;
+                }
+
+                qemu_put_be64(s->file, slab->size);
+                qemu_put_buffer_async(s->file, slab->buf, slab->size);
+            } else if ((ret < 0) && (ret != RAM_SAVE_CONTROL_DELAYED)) {
+                fprintf(stderr, "failed 1, skipping send\n");
+                goto err;
+            }
+
+            if (qemu_file_get_error(s->file)) {
+                fprintf(stderr, "failed 2, skipping send\n");
+                goto err;
+            }
+                
+            DDPRINTF("Sent %" PRId64 " all %ld\n", slab->size, mc.slab_total);
+
+            slab = QTAILQ_NEXT(slab, node);
+        }
+
+        if (!commit_sent) {
+            ram_control_after_iterate(s->file, RAM_CONTROL_ROUND); 
+            slab = QTAILQ_FIRST(&mc.slab_head);
+
+            for (x = 0; x < mc.used_slabs; x++) {
+                qemu_put_be64(s->file, slab->size);
+                slab = QTAILQ_NEXT(slab, node);
+            }
+        }
+
+        qemu_fflush(s->file);
+
+        if (commit_sent) {
+            DDPRINTF("Waiting for commit ACK\n");
+
+            if ((ret = mc_recv(mc_control, MC_TRANSACTION_ACK, NULL)) < 0) {
+                goto err;
+            }
+        }
+
+        ret = qemu_file_get_error(s->file);
+        if (ret) {
+            fprintf(stderr, "Error sending checkpoint: %d\n", ret);
+            goto err;
+        }
+
+        DDPRINTF("Memory transfer complete.\n");
+
+        /*
+         * The MC is safe on the other side now,
+         * go along our merry way and release the network
+         * packets from the buffer if enable
+         */
+        mc_flush_oldest_buffer();
+
+        end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
+        s->total_time = end_time - start_time;
+        s->xmit_time = end_time - xmit_start;
+        s->bitmap_time = norm_mig_bitmap_time();
+        s->log_dirty_time = norm_mig_log_dirty_time();
+        s->mbps = MBPS(mc.slab_total, s->xmit_time);
+        s->copy_mbps = MBPS(mc.slab_total, s->ram_copy_time);
+        s->bytes_xfer = mc.slab_total;
+        s->checkpoints = mc.checkpoints++;
+
+        wait_time = (s->downtime <= freq_ms) ? (freq_ms - s->downtime) : 0;
+
+        if (current_time >= initial_time + 1000) {
+            DPRINTF("bytes %" PRIu64 " xmit_mbps %0.1f xmit_time %" PRId64
+                    " downtime %" PRIu64 " sync_time %" PRId64
+                    " logdirty_time %" PRId64 " ram_copy_time %" PRId64
+                    " copy_mbps %0.1f wait time %" PRIu64
+                    " checkpoints %" PRId64 "\n",
+                    s->bytes_xfer,
+                    s->mbps,
+                    s->xmit_time,
+                    s->downtime,
+                    s->bitmap_time,
+                    s->log_dirty_time,
+                    s->ram_copy_time,
+                    s->copy_mbps,
+                    wait_time,
+                    s->checkpoints);
+            initial_time = current_time;
+        }
+
+        /*
+         * Checkpoint frequency in microseconds.
+         * 
+         * Sometimes, when checkpoints are very large,
+         * all of the wait time was dominated by the 
+         * time taken to copy the checkpoint into the staging area,
+         * in which case wait_time, will probably be zero and we
+         * will end up diving right back into the next checkpoint
+         * as soon as the previous transmission completed.
+         */
+        if (wait_time) {
+            g_usleep(wait_time * 1000);
+        }
+    }
+
+    goto out;
+
+err:
+    /*
+     * TODO: Possible split-brain scenario:
+     * Normally, this should never be reached unless there was a
+     * connection error or network partition - in which case
+     * only the management software can resume the VM safely 
+     * when it knows the exact state of the MC destination.
+     *
+     * We need management to poll the source and destination to deterine
+     * if the destination has already taken control. If not, then
+     * we need to resume the source.
+     *
+     * If there was a connection error during checkpoint *transmission*
+     * then the destination VM will likely have already resumed,
+     * in which case we need to stop the current VM from running
+     * and throw away any buffered packets.
+     * 
+     * Verify that "disable_buffering" below does not release any traffic.
+     */
+    migrate_set_state(s, MIG_STATE_MC, MIG_STATE_ERROR);
+out:
+    if (mc_staging) {
+        qemu_fclose(mc_staging);
+    }
+
+    if (mc_control) {
+        qemu_fclose(mc_control);
+    }
+
+    mc_disable_buffering();
+
+    qemu_mutex_lock_iothread();
+
+    if (migrate_use_bitworkers()) {
+        DPRINTF("Stopping bitmap workers.\n");
+        migration_bitmap_worker_stop(s);
+    }
+
+    if (s->state != MIG_STATE_ERROR) {
+        migrate_set_state(s, MIG_STATE_MC, MIG_STATE_COMPLETED);
+    }
+
+    qemu_bh_schedule(s->cleanup_bh);
+    qemu_mutex_unlock_iothread();
+
+    return NULL;
+}
+
+/*
+ * Get the next copyset in the list. If there is none, then make one.
+ */
+static MCCopyset *mc_copy_next(MCParams *mc, MCCopyset *copyset)
+{
+    if (!QTAILQ_NEXT(copyset, node)) {
+        int idx = mc->nb_copysets++;
+        DDPRINTF("Extending copysets by one: %d sets total, "
+                 "%" PRIu64 " MB\n", mc->nb_copysets,
+                 mc->nb_copysets * sizeof(MCCopyset) / 1024UL / 1024UL);
+        mc->curr_copyset = g_malloc(sizeof(MCCopyset));
+        mc->curr_copyset->idx = idx;
+        QTAILQ_INSERT_TAIL(&mc->copy_head, mc->curr_copyset, node);
+        copyset = mc->curr_copyset;
+    } else {
+        copyset = QTAILQ_NEXT(copyset, node);
+    }
+
+    mc->curr_copyset = copyset;
+    copyset->nb_copies = 0;
+
+    return copyset;
+}
+
+void mc_process_incoming_checkpoints_if_requested(QEMUFile *f)
+{
+    MCParams mc = { .file = f };
+    MCSlab *slab;
+    int fd = qemu_get_fd(f);
+    QEMUFile *mc_control, *mc_staging;
+    uint64_t checkpoint_size, action;
+    uint64_t slabs;
+    int got, x, ret, received = 0;
+    bool checkpoint_received;
+
+    CALC_MAX_STRIKES();
+
+    if (!mc_requested) {
+        DPRINTF("Source has not requested MC. Returning.\n");
+        return;
+    }
+   
+    if (!(mc_control = qemu_fopen_socket(fd, "wb"))) {
+        fprintf(stderr, "Could not make incoming MC control channel\n");
+        goto rollback;
+    }
+
+    if (!(mc_staging = qemu_fopen_mc(&mc, "rb"))) {
+        fprintf(stderr, "Could not make outgoing MC staging area\n");
+        goto rollback;
+    }
+
+    //qemu_set_block(fd);
+    socket_set_nodelay(fd);
+
+    while (true) {
+        checkpoint_received = false;
+        ret = mc_recv(f, MC_TRANSACTION_ANY, &action);
+        if (ret < 0) {
+            goto rollback;
+        }
+
+        switch(action) {
+        case MC_TRANSACTION_START:
+            checkpoint_size = qemu_get_be64(f);
+            mc.start_copyset = qemu_get_be64(f);
+            slabs = qemu_get_be64(f);
+
+            DDPRINTF("Transaction start: size %" PRIu64 
+                     " copyset start: %" PRIu64 " slabs %" PRIu64 "\n",
+                     checkpoint_size, mc.start_copyset, slabs);
+
+            assert(checkpoint_size);
+            break;
+        case MC_TRANSACTION_COMMIT: /* tcp */
+            slab = mc_slab_start(&mc);
+            received = 0;
+
+            while (received < checkpoint_size) {
+                int total = 0;
+                slab->size = qemu_get_be64(f);
+
+                DDPRINTF("Expecting size: %" PRIu64 "\n", slab->size);
+
+                while (total != slab->size) {
+                    got = qemu_get_buffer(f, slab->buf + total, slab->size - total);
+                    if (got <= 0) {
+                        fprintf(stderr, "Error pre-filling checkpoint: %d\n", got);
+                        goto rollback;
+                    }
+                    DDPRINTF("Received %d slab %d / %ld received %d total %"
+                             PRIu64 "\n", got, total, slab->size, 
+                             received, checkpoint_size);
+                    received += got;
+                    total += got;
+                }
+
+                if (received != checkpoint_size) {
+                    slab = mc_slab_next(&mc, slab);
+                }
+            }
+
+            DDPRINTF("Acknowledging successful commit\n");
+
+            if (mc_send(mc_control, MC_TRANSACTION_ACK) < 0) {
+                goto rollback;
+            }
+
+            checkpoint_received = true;
+            break;
+        case RAM_SAVE_FLAG_HOOK: /* rdma */
+            /*
+             * Must be RDMA registration handling. Preallocate
+             * the slabs (if not already done in a previous checkpoint)
+             * before allowing RDMA to register them.
+             */
+            slab = mc_slab_start(&mc);
+
+            DDPRINTF("Pre-populating slabs %" PRIu64 "...\n", slabs);
+
+            for(x = 1; x < slabs; x++) {
+                slab = mc_slab_next(&mc, slab);
+            }
+
+            ram_control_load_hook(f, action);
+
+            DDPRINTF("Hook complete.\n");
+
+            slab = QTAILQ_FIRST(&mc.slab_head);
+
+            for(x = 0; x < slabs; x++) {
+                slab->size = qemu_get_be64(f);
+                slab = QTAILQ_NEXT(slab, node);
+            }
+
+            checkpoint_received = true;
+            break;
+        default:
+            fprintf(stderr, "Unknown MC action: %" PRIu64 "\n", action);
+            goto rollback;
+        }
+
+        if (checkpoint_received) {
+            mc.curr_slab = QTAILQ_FIRST(&mc.slab_head);
+            mc.slab_total = checkpoint_size;
+
+            DDPRINTF("Committed Loading MC state \n");
+
+            mc_copy_start(&mc);
+
+            if (qemu_loadvm_state(mc_staging) < 0) {
+                fprintf(stderr, "loadvm transaction failed\n");
+                /*
+                 * This is fatal. No rollback possible because we have potentially
+                 * applied only a subset of the checkpoint to main memory, potentially
+                 * leaving the VM in an inconsistent state.
+                 */
+                goto err;
+            }
+
+            mc.slab_total = checkpoint_size;
+
+            DDPRINTF("Transaction complete.\n");
+            mc.checkpoints++;
+        }
+    }
+
+rollback:
+    fprintf(stderr, "MC: checkpointing stopped. Recovering VM\n");
+    goto out;
+err:
+    fprintf(stderr, "Micro Checkpointing Protocol Failed\n");
+    exit(1); 
+out:
+    if (mc_staging) {
+        qemu_fclose(mc_staging);
+    }
+
+    if (mc_control) {
+        qemu_fclose(mc_control);
+    }
+}
+
+static int mc_get_buffer_internal(void *opaque, uint8_t *buf, int64_t pos,
+                                  int size, MCSlab **curr_slab, uint64_t end_idx)
+{
+    uint64_t len = size;
+    uint8_t *data = (uint8_t *) buf;
+    MCSlab *slab = *curr_slab;
+    MCParams *mc = opaque;
+
+    assert(slab);
+
+    DDDPRINTF("got request for %d bytes %p %p. idx %d\n",
+              size, slab, QTAILQ_FIRST(&mc->slab_head), slab->idx);
+
+    while (len && slab) {
+        uint64_t get = MIN(slab->size - slab->read, len);
+
+        memcpy(data, slab->buf + slab->read, get);
+
+        data           += get;
+        slab->read     += get;
+        len            -= get;
+        mc->slab_total -= get;
+
+        DDDPRINTF("got: %" PRIu64 " read: %" PRIu64 
+                 " len %" PRIu64 " slab_total %" PRIu64 
+                 " size %" PRIu64 " addr: %p slab %d"
+                 " requested %d\n",
+                 get, slab->read, len, mc->slab_total, 
+                 slab->size, slab->buf, slab->idx, size);
+
+        if (len) {
+            if (slab->idx == end_idx) {
+                break;
+            }
+
+            slab = QTAILQ_NEXT(slab, node);
+        }
+    }
+
+    *curr_slab = slab;
+    DDDPRINTF("Returning %" PRIu64 " / %d bytes\n", size - len, size);
+
+    return size - len;
+}
+static int mc_get_buffer(void *opaque, uint8_t *buf, int64_t pos, int size)
+{
+    MCParams *mc = opaque;
+
+    return mc_get_buffer_internal(mc, buf, pos, size, &mc->curr_slab,
+                                  mc->start_copyset - 1);
+}
+
+static int mc_load_page(QEMUFile *f, void *opaque, void *host_addr, long size)
+{
+    MCParams *mc = opaque;
+
+    DDDPRINTF("Loading page into %p of size %" PRIu64 "\n", host_addr, size);
+
+    return mc_get_buffer_internal(mc, host_addr, 0, size, &mc->mem_slab,
+                                  mc->nb_slabs - 1);
+}
+
+/*
+ * Provide QEMUFile with an *local* RDMA-based way to do memcpy().
+ * This lowers cache pollution and allows the CPU pipeline to
+ * remain free for regular use by VMs (as well as by neighbors).
+ *
+ * In a future implementation, we may attempt to perform this
+ * copy *without* stopping the source VM - if the data shows
+ * that it can be done effectively.
+ */
+static int mc_save_page(QEMUFile *f, void *opaque,
+                           ram_addr_t block_offset, 
+                           uint8_t *host_addr,
+                           ram_addr_t offset,
+                           long size, int *bytes_sent)
+{
+    MCParams *mc = opaque;
+    MCCopyset *copyset = mc->curr_copyset;
+    MCCopy *c;
+
+    if (copyset->nb_copies >= MC_MAX_SLAB_COPY_DESCRIPTORS) {
+        copyset = mc_copy_next(mc, copyset);
+    }
+
+    c = &copyset->copies[copyset->nb_copies++];
+    c->ramblock_offset = (uint64_t) block_offset;
+    c->host_addr = (uint64_t) host_addr;
+    c->offset = (uint64_t) offset;
+    c->size = (uint64_t) size;
+    mc->total_copies++;
+
+    return RAM_SAVE_CONTROL_DELAYED;
+}
+
+static ssize_t mc_writev_buffer(void *opaque, struct iovec *iov,
+                                int iovcnt, int64_t pos)
+{
+    ssize_t len = 0;
+    unsigned int i;
+
+    for (i = 0; i < iovcnt; i++) {
+        DDDPRINTF("iov # %d, len: %" PRId64 "\n", i, iov[i].iov_len); 
+        len += mc_put_buffer(opaque, iov[i].iov_base, 0, iov[i].iov_len); 
+    }
+
+    return len;
+}
+
+static int mc_get_fd(void *opaque)
+{
+    MCParams *mc = opaque;
+
+    return qemu_get_fd(mc->file);
+}
+
+static int mc_close(void *opaque)
+{
+    MCParams *mc = opaque;
+    MCSlab *slab, *next;
+
+    QTAILQ_FOREACH_SAFE(slab, &mc->slab_head, node, next) {
+        ram_control_remove(mc->file, (uint64_t) slab->buf);
+        QTAILQ_REMOVE(&mc->slab_head, slab, node);
+        g_free(slab);
+    }
+
+    mc->curr_slab = NULL;
+
+    return 0;
+}
+	
+static const QEMUFileOps mc_write_ops = {
+    .writev_buffer = mc_writev_buffer,
+    .put_buffer = mc_put_buffer,
+    .get_fd = mc_get_fd,
+    .close = mc_close,
+    .save_page = mc_save_page,
+};
+
+static const QEMUFileOps mc_read_ops = {
+    .get_buffer = mc_get_buffer,
+    .get_fd = mc_get_fd,
+    .close = mc_close,
+    .load_page = mc_load_page,
+};
+
+QEMUFile *qemu_fopen_mc(void *opaque, const char *mode)
+{
+    MCParams *mc = opaque;
+    MCSlab *slab;
+    MCCopyset *copyset;
+
+    if (qemu_file_mode_is_not_valid(mode)) {
+        return NULL;
+    }
+
+    QTAILQ_INIT(&mc->slab_head);
+    QTAILQ_INIT(&mc->copy_head);
+
+    slab = qemu_memalign(8, sizeof(MCSlab));
+    memset(slab, 0, sizeof(*slab));
+    slab->idx = 0;
+    QTAILQ_INSERT_HEAD(&mc->slab_head, slab, node);
+    mc->slab_total = 0;
+    mc->curr_slab = slab;
+    mc->nb_slabs = 1;
+    mc->slab_strikes = 0;
+
+    ram_control_add(mc->file, slab->buf, (uint64_t) slab->buf, MC_SLAB_BUFFER_SIZE);
+
+    copyset = g_malloc(sizeof(MCCopyset));
+    copyset->idx = 0;
+    QTAILQ_INSERT_HEAD(&mc->copy_head, copyset, node);
+    mc->total_copies = 0;
+    mc->curr_copyset = copyset;
+    mc->nb_copysets = 1;
+    mc->copy_strikes = 0;
+
+    if (mode[0] == 'w') {
+        return qemu_fopen_ops(mc, &mc_write_ops);
+    }
+
+    return qemu_fopen_ops(mc, &mc_read_ops);
+}
+
+static void mc_start_checkpointer(void *opaque) {
+    MigrationState *s = opaque;
+
+    if (checkpoint_bh) {
+        qemu_bh_delete(checkpoint_bh);
+        checkpoint_bh = NULL;
+    }
+
+    qemu_mutex_unlock_iothread();
+    qemu_thread_join(s->thread);
+    g_free(s->thread);
+    qemu_mutex_lock_iothread();
+
+    migrate_set_state(s, MIG_STATE_ACTIVE, MIG_STATE_MC);
+    s->thread = g_malloc0(sizeof(*s->thread));
+	qemu_thread_create(s->thread, mc_thread, s, QEMU_THREAD_JOINABLE);
+}
+
+void mc_init_checkpointer(MigrationState *s)
+{
+    CALC_MAX_STRIKES();
+    checkpoint_bh = qemu_bh_new(mc_start_checkpointer, s);
+    qemu_bh_schedule(checkpoint_bh);
+}
+
+void qmp_migrate_set_mc_delay(int64_t value, Error **errp)
+{
+    freq_ms = value;
+    CALC_MAX_STRIKES();
+    DPRINTF("Setting checkpoint frequency to %" PRId64 " ms and "
+            "resetting strikes to %d based on a %d sec delay.\n",
+            freq_ms, max_strikes, max_strikes_delay_secs);
+}
+
+int mc_info_load(QEMUFile *f, void *opaque, int version_id)
+{
+    bool mc_enabled = qemu_get_byte(f);
+
+    if (mc_enabled && !mc_requested) {
+        DPRINTF("MC is requested\n");
+        mc_requested = true;
+    }
+
+    max_strikes = qemu_get_be32(f);
+
+    return 0;
+}
+
+void mc_info_save(QEMUFile *f, void *opaque)
+{
+    qemu_put_byte(f, migrate_use_mc());
+    qemu_put_be32(f, max_strikes);
+}
-- 
1.8.1.2

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] [RFC PATCH v1: 10/12] mc: configure and makefile support
  2013-10-21  1:14 [Qemu-devel] [RFC PATCH v1: 00/12] fault tolerance through micro-checkpointing mrhines
                   ` (8 preceding siblings ...)
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 09/12] mc: core logic mrhines
@ 2013-10-21  1:14 ` mrhines
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 11/12] mc: register MC qemu-file functions and expose MC tunable capability mrhines
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 12/12] mc: activate and use MC core logic if requested mrhines
  11 siblings, 0 replies; 16+ messages in thread
From: mrhines @ 2013-10-21  1:14 UTC (permalink / raw)
  To: qemu-devel
  Cc: aliguori, quintela, owasserm, onom, abali, mrhines, gokul, pbonzini

From: "Michael R. Hines" <mrhines@us.ibm.com>


Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
---
 Makefile.objs |  1 +
 configure     | 45 +++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 46 insertions(+)

diff --git a/Makefile.objs b/Makefile.objs
index 2b6c1fe..15356d6 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -52,6 +52,7 @@ common-obj-$(CONFIG_LINUX) += fsdev/
 
 common-obj-y += migration.o migration-tcp.o
 common-obj-$(CONFIG_RDMA) += migration-rdma.o
+common-obj-$(CONFIG_MC) += migration-checkpoint.o
 common-obj-y += qemu-char.o #aio.o
 common-obj-y += block-migration.o
 common-obj-y += page_cache.o xbzrle.o
diff --git a/configure b/configure
index 57ee62a..64c0d5e 100755
--- a/configure
+++ b/configure
@@ -182,6 +182,7 @@ kvm="no"
 rdma=""
 gprof="no"
 debug_tcg="no"
+mc=""
 debug="no"
 strip_opt="yes"
 tcg_interpreter="no"
@@ -969,6 +970,10 @@ for opt do
   ;;
   --enable-libssh2) libssh2="yes"
   ;;
+  --disable-mc) mc="no"
+  ;;
+  --enable-mc) mc="yes"
+  ;;
   *) echo "ERROR: unknown option $opt"; show_help="yes"
   ;;
   esac
@@ -1200,6 +1205,8 @@ echo "  --gcov=GCOV              use specified gcov [$gcov_tool]"
 echo "  --enable-tpm             enable TPM support"
 echo "  --disable-libssh2        disable ssh block device support"
 echo "  --enable-libssh2         enable ssh block device support"
+echo "  --disable-mc             disable Micro-Checkpointing support"
+echo "  --enable-mc              enable Micro-Checkpointing support"
 echo ""
 echo "NOTE: The object files are built at the place where configure is launched"
 exit 1
@@ -1861,6 +1868,35 @@ EOF
   fi
 fi
 
+##################################################
+# Micro-Checkpointing requires netlink
+if test "$mc" != "no" ; then
+  cat > $TMPC <<EOF
+#include <libnl3/netlink/route/qdisc/plug.h>
+#include <libnl3/netlink/route/class.h>
+#include <libnl3/netlink/cli/utils.h>
+#include <libnl3/netlink/cli/tc.h>
+#include <libnl3/netlink/cli/qdisc.h>
+#include <libnl3/netlink/cli/link.h>
+int main(void) { return 0; }
+EOF
+  mc_libs="-lnl-3 -lnl-cli-3 -lnl-route-3"
+  mc_cflags="-I/usr/include/libnl3"
+  if compile_prog "$mc_cflags" "$mc_libs" ; then
+    mc="yes"
+    libs_softmmu="$libs_softmmu $mc_libs"
+    QEMU_CFLAGS="$QEMU_CFLAGS $mc_cflags"
+  else
+    if test "$mc" = "yes" ; then
+        error_exit \
+            " NetLink v3 libs/headers not present." \
+            " Please install the libnl3-*-dev(el) packages from your distro."
+    fi
+    mc="no"
+  fi
+fi
+
+
 ##########################################
 # VNC TLS/WS detection
 if test "$vnc" = "yes" -a \( "$vnc_tls" != "no" -o "$vnc_ws" != "no" \) ; then
@@ -3723,6 +3759,7 @@ echo "KVM support       $kvm"
 echo "RDMA support      $rdma"
 echo "TCG interpreter   $tcg_interpreter"
 echo "fdt support       $fdt"
+echo "Micro checkpointing $mc"
 echo "preadv support    $preadv"
 echo "fdatasync         $fdatasync"
 echo "madvise           $madvise"
@@ -4206,6 +4243,10 @@ if test "$rdma" = "yes" ; then
   echo "CONFIG_RDMA=y" >> $config_host_mak
 fi
 
+if test "$mc" = "yes" ; then
+  echo "CONFIG_MC=y" >> $config_host_mak
+fi
+
 if test "$tcg_interpreter" = "yes"; then
   QEMU_INCLUDES="-I\$(SRC_PATH)/tcg/tci $QEMU_INCLUDES"
 elif test "$ARCH" = "sparc64" ; then
@@ -4633,6 +4674,10 @@ echo "QEMU_CFLAGS+=$cflags" >> $config_target_mak
 
 done # for target in $targets
 
+if test "$mc" = "yes" ; then
+echo "CONFIG_MC=y" >> $config_host_mak
+fi
+
 if [ "$pixman" = "internal" ]; then
   echo "config-host.h: subdir-pixman" >> $config_host_mak
 fi
-- 
1.8.1.2

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] [RFC PATCH v1: 11/12] mc: register MC qemu-file functions and expose MC tunable capability
  2013-10-21  1:14 [Qemu-devel] [RFC PATCH v1: 00/12] fault tolerance through micro-checkpointing mrhines
                   ` (9 preceding siblings ...)
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 10/12] mc: configure and makefile support mrhines
@ 2013-10-21  1:14 ` mrhines
  2013-10-23 11:00   ` Isaku Yamahata
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 12/12] mc: activate and use MC core logic if requested mrhines
  11 siblings, 1 reply; 16+ messages in thread
From: mrhines @ 2013-10-21  1:14 UTC (permalink / raw)
  To: qemu-devel
  Cc: aliguori, quintela, owasserm, onom, abali, mrhines, gokul, pbonzini

From: "Michael R. Hines" <mrhines@us.ibm.com>

The capability allows management software to throttle the MC frequency
during VM application transience.

The qemu-file savevm() functions inform the destination that the incoming
traffic is MC-specific traffic and not vanilla live-migration traffic.

Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
---
 hmp-commands.hx  | 14 ++++++++++++++
 hmp.c            |  6 ++++++
 hmp.h            |  1 +
 qapi-schema.json | 13 +++++++++++++
 qmp-commands.hx  | 23 +++++++++++++++++++++++
 vl.c             |  3 +++
 6 files changed, 60 insertions(+)

diff --git a/hmp-commands.hx b/hmp-commands.hx
index caae5ad..7db0597 100644
--- a/hmp-commands.hx
+++ b/hmp-commands.hx
@@ -960,6 +960,20 @@ Set maximum tolerated downtime (in seconds) for migration.
 ETEXI
 
     {
+        .name       = "migrate-set-mc-delay",
+        .args_type  = "value:i",
+        .params     = "value",
+        .help       = "set maximum delay (in milliseconds) between micro-checkpoints",
+        .mhandler.cmd = hmp_migrate_set_mc_delay,
+    },
+
+STEXI
+@item migrate_set_downtime @var{second}
+@findex migrate_set_downtime
+Set maximum tolerated downtime (in seconds) for migration.
+ETEXI
+
+    {
         .name       = "migrate_set_capability",
         .args_type  = "capability:s,state:b",
         .params     = "capability state",
diff --git a/hmp.c b/hmp.c
index 43896e9..8e89ac7 100644
--- a/hmp.c
+++ b/hmp.c
@@ -1026,6 +1026,12 @@ void hmp_migrate_set_downtime(Monitor *mon, const QDict *qdict)
     qmp_migrate_set_downtime(value, NULL);
 }
 
+void hmp_migrate_set_mc_delay(Monitor *mon, const QDict *qdict)
+{
+    int64_t value = qdict_get_int(qdict, "value");
+    qmp_migrate_set_mc_delay(value, NULL);
+}
+
 void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict)
 {
     int64_t value = qdict_get_int(qdict, "value");
diff --git a/hmp.h b/hmp.h
index 54cf71f..b6548a3 100644
--- a/hmp.h
+++ b/hmp.h
@@ -60,6 +60,7 @@ void hmp_drive_mirror(Monitor *mon, const QDict *qdict);
 void hmp_drive_backup(Monitor *mon, const QDict *qdict);
 void hmp_migrate_cancel(Monitor *mon, const QDict *qdict);
 void hmp_migrate_set_downtime(Monitor *mon, const QDict *qdict);
+void hmp_migrate_set_mc_delay(Monitor *mon, const QDict *qdict);
 void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict);
 void hmp_migrate_set_capability(Monitor *mon, const QDict *qdict);
 void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict);
diff --git a/qapi-schema.json b/qapi-schema.json
index e0a430c..2ed8098 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -2135,6 +2135,19 @@
 { 'command': 'migrate_set_downtime', 'data': {'value': 'number'} }
 
 ##
+# @migrate-set-mc-delay
+#
+# Set delay (in milliseconds) between micro checkpoints.
+#
+# @value: maximum delay in milliseconds 
+#
+# Returns: nothing on success
+#
+# Since: 1.6
+##
+{ 'command': 'migrate-set-mc-delay', 'data': {'value': 'int'} }
+
+##
 # @migrate_set_speed
 #
 # Set maximum speed for migration.
diff --git a/qmp-commands.hx b/qmp-commands.hx
index fba15cd..6d7ef2f 100644
--- a/qmp-commands.hx
+++ b/qmp-commands.hx
@@ -754,6 +754,29 @@ Example:
 EQMP
 
     {
+        .name       = "migrate-set-mc-delay",
+        .args_type  = "value:i",
+        .mhandler.cmd_new = qmp_marshal_input_migrate_set_mc_delay,
+    },
+
+SQMP
+migrate-set-mc-delay
+--------------------
+
+Set maximum delay (in milliseconds) between micro-checkpoints.
+
+Arguments:
+
+- "value": maximum delay (json-int)
+
+Example:
+
+-> { "execute": "migrate-set-mc-delay", "arguments": { "value": 100 } }
+<- { "return": {} }
+
+EQMP
+
+    {
         .name       = "client_migrate_info",
         .args_type  = "protocol:s,hostname:s,port:i?,tls-port:i?,cert-subject:s?",
         .params     = "protocol hostname port tls-port cert-subject",
diff --git a/vl.c b/vl.c
index 74d52ab..fa23d66 100644
--- a/vl.c
+++ b/vl.c
@@ -29,6 +29,7 @@
 #include <sys/time.h>
 #include <zlib.h>
 #include "qemu/bitmap.h"
+#include "migration/qemu-file.h"
 
 /* Needed early for CONFIG_BSD etc. */
 #include "config-host.h"
@@ -4192,6 +4193,8 @@ int main(int argc, char **argv, char **envp)
     default_drive(default_sdcard, snapshot, IF_SD, 0, SD_OPTS);
 
     register_savevm_live(NULL, "ram", 0, 4, &savevm_ram_handlers, NULL);
+    register_savevm(NULL, "mc", -1, MC_VERSION, mc_info_save, 
+                                mc_info_load, NULL); 
 
     if (nb_numa_nodes > 0) {
         int i;
-- 
1.8.1.2

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] [RFC PATCH v1: 12/12] mc: activate and use MC core logic if requested
  2013-10-21  1:14 [Qemu-devel] [RFC PATCH v1: 00/12] fault tolerance through micro-checkpointing mrhines
                   ` (10 preceding siblings ...)
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 11/12] mc: register MC qemu-file functions and expose MC tunable capability mrhines
@ 2013-10-21  1:14 ` mrhines
  11 siblings, 0 replies; 16+ messages in thread
From: mrhines @ 2013-10-21  1:14 UTC (permalink / raw)
  To: qemu-devel
  Cc: aliguori, quintela, owasserm, onom, abali, mrhines, gokul, pbonzini

From: "Michael R. Hines" <mrhines@us.ibm.com>

Building on the previous patches, this finally actually
activates protection of the VM by kicking off an MC thread
after the initial live migration completes. The live migration
thread will get destroyed and the MC thread will run and never die.

Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
---
 migration.c | 21 ++++++++++++++++++++-
 1 file changed, 20 insertions(+), 1 deletion(-)

diff --git a/migration.c b/migration.c
index 8e0827e..15ad264 100644
--- a/migration.c
+++ b/migration.c
@@ -94,6 +94,9 @@ static void process_incoming_migration_co(void *opaque)
     int ret;
 
     ret = qemu_loadvm_state(f);
+    if (ret >= 0) {
+        mc_process_incoming_checkpoints_if_requested(f);
+    }
     qemu_fclose(f);
     if (ret < 0) {
         fprintf(stderr, "load of migration failed\n");
@@ -670,11 +673,27 @@ static void *migration_thread(void *opaque)
         s->downtime = end_time - start_time;
         runstate_set(RUN_STATE_POSTMIGRATE);
     } else {
+        if(migrate_use_mc()) {
+            qemu_fflush(s->file);
+            if (migrate_use_mc_net()) {
+                if (mc_enable_buffering() < 0 ||
+                        mc_start_buffer() < 0) {
+                    migrate_set_state(s, MIG_STATE_ACTIVE, MIG_STATE_ERROR);
+                }
+            }
+        }
+
         if (old_vm_running) {
             vm_start();
         }
     }
-    qemu_bh_schedule(s->cleanup_bh);
+
+    if (migrate_use_mc() && s->state != MIG_STATE_ERROR) {
+        mc_init_checkpointer(s);
+    } else {
+        qemu_bh_schedule(s->cleanup_bh);
+    }
+
     qemu_mutex_unlock_iothread();
 
     return NULL;
-- 
1.8.1.2

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* Re: [Qemu-devel] [RFC PATCH v1: 11/12] mc: register MC qemu-file functions and expose MC tunable capability
  2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 11/12] mc: register MC qemu-file functions and expose MC tunable capability mrhines
@ 2013-10-23 11:00   ` Isaku Yamahata
  2013-11-06 16:34     ` Michael R. Hines
  0 siblings, 1 reply; 16+ messages in thread
From: Isaku Yamahata @ 2013-10-23 11:00 UTC (permalink / raw)
  To: mrhines
  Cc: aliguori, quintela, qemu-devel, owasserm, onom, abali, mrhines,
	gokul, pbonzini, isaku.yamahata

Since more integer parameters would come in the future, so how about
set_migrate_parameter similar to set_migrate_capability?
It sets integer value, while set_migrate_capability sets bool value.

thanks,

On Mon, Oct 21, 2013 at 01:14:21AM +0000,
mrhines@linux.vnet.ibm.com wrote:

> From: "Michael R. Hines" <mrhines@us.ibm.com>
> 
> The capability allows management software to throttle the MC frequency
> during VM application transience.
> 
> The qemu-file savevm() functions inform the destination that the incoming
> traffic is MC-specific traffic and not vanilla live-migration traffic.
> 
> Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
> ---
>  hmp-commands.hx  | 14 ++++++++++++++
>  hmp.c            |  6 ++++++
>  hmp.h            |  1 +
>  qapi-schema.json | 13 +++++++++++++
>  qmp-commands.hx  | 23 +++++++++++++++++++++++
>  vl.c             |  3 +++
>  6 files changed, 60 insertions(+)
> 
> diff --git a/hmp-commands.hx b/hmp-commands.hx
> index caae5ad..7db0597 100644
> --- a/hmp-commands.hx
> +++ b/hmp-commands.hx
> @@ -960,6 +960,20 @@ Set maximum tolerated downtime (in seconds) for migration.
>  ETEXI
>  
>      {
> +        .name       = "migrate-set-mc-delay",
> +        .args_type  = "value:i",
> +        .params     = "value",
> +        .help       = "set maximum delay (in milliseconds) between micro-checkpoints",
> +        .mhandler.cmd = hmp_migrate_set_mc_delay,
> +    },
> +
> +STEXI
> +@item migrate_set_downtime @var{second}
> +@findex migrate_set_downtime
> +Set maximum tolerated downtime (in seconds) for migration.
> +ETEXI
> +
> +    {
>          .name       = "migrate_set_capability",
>          .args_type  = "capability:s,state:b",
>          .params     = "capability state",
> diff --git a/hmp.c b/hmp.c
> index 43896e9..8e89ac7 100644
> --- a/hmp.c
> +++ b/hmp.c
> @@ -1026,6 +1026,12 @@ void hmp_migrate_set_downtime(Monitor *mon, const QDict *qdict)
>      qmp_migrate_set_downtime(value, NULL);
>  }
>  
> +void hmp_migrate_set_mc_delay(Monitor *mon, const QDict *qdict)
> +{
> +    int64_t value = qdict_get_int(qdict, "value");
> +    qmp_migrate_set_mc_delay(value, NULL);
> +}
> +
>  void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict)
>  {
>      int64_t value = qdict_get_int(qdict, "value");
> diff --git a/hmp.h b/hmp.h
> index 54cf71f..b6548a3 100644
> --- a/hmp.h
> +++ b/hmp.h
> @@ -60,6 +60,7 @@ void hmp_drive_mirror(Monitor *mon, const QDict *qdict);
>  void hmp_drive_backup(Monitor *mon, const QDict *qdict);
>  void hmp_migrate_cancel(Monitor *mon, const QDict *qdict);
>  void hmp_migrate_set_downtime(Monitor *mon, const QDict *qdict);
> +void hmp_migrate_set_mc_delay(Monitor *mon, const QDict *qdict);
>  void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict);
>  void hmp_migrate_set_capability(Monitor *mon, const QDict *qdict);
>  void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict);
> diff --git a/qapi-schema.json b/qapi-schema.json
> index e0a430c..2ed8098 100644
> --- a/qapi-schema.json
> +++ b/qapi-schema.json
> @@ -2135,6 +2135,19 @@
>  { 'command': 'migrate_set_downtime', 'data': {'value': 'number'} }
>  
>  ##
> +# @migrate-set-mc-delay
> +#
> +# Set delay (in milliseconds) between micro checkpoints.
> +#
> +# @value: maximum delay in milliseconds 
> +#
> +# Returns: nothing on success
> +#
> +# Since: 1.6
> +##
> +{ 'command': 'migrate-set-mc-delay', 'data': {'value': 'int'} }
> +
> +##
>  # @migrate_set_speed
>  #
>  # Set maximum speed for migration.
> diff --git a/qmp-commands.hx b/qmp-commands.hx
> index fba15cd..6d7ef2f 100644
> --- a/qmp-commands.hx
> +++ b/qmp-commands.hx
> @@ -754,6 +754,29 @@ Example:
>  EQMP
>  
>      {
> +        .name       = "migrate-set-mc-delay",
> +        .args_type  = "value:i",
> +        .mhandler.cmd_new = qmp_marshal_input_migrate_set_mc_delay,
> +    },
> +
> +SQMP
> +migrate-set-mc-delay
> +--------------------
> +
> +Set maximum delay (in milliseconds) between micro-checkpoints.
> +
> +Arguments:
> +
> +- "value": maximum delay (json-int)
> +
> +Example:
> +
> +-> { "execute": "migrate-set-mc-delay", "arguments": { "value": 100 } }
> +<- { "return": {} }
> +
> +EQMP
> +
> +    {
>          .name       = "client_migrate_info",
>          .args_type  = "protocol:s,hostname:s,port:i?,tls-port:i?,cert-subject:s?",
>          .params     = "protocol hostname port tls-port cert-subject",
> diff --git a/vl.c b/vl.c
> index 74d52ab..fa23d66 100644
> --- a/vl.c
> +++ b/vl.c
> @@ -29,6 +29,7 @@
>  #include <sys/time.h>
>  #include <zlib.h>
>  #include "qemu/bitmap.h"
> +#include "migration/qemu-file.h"
>  
>  /* Needed early for CONFIG_BSD etc. */
>  #include "config-host.h"
> @@ -4192,6 +4193,8 @@ int main(int argc, char **argv, char **envp)
>      default_drive(default_sdcard, snapshot, IF_SD, 0, SD_OPTS);
>  
>      register_savevm_live(NULL, "ram", 0, 4, &savevm_ram_handlers, NULL);
> +    register_savevm(NULL, "mc", -1, MC_VERSION, mc_info_save, 
> +                                mc_info_load, NULL); 
>  
>      if (nb_numa_nodes > 0) {
>          int i;
> -- 
> 1.8.1.2
> 
> 

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [Qemu-devel] [RFC PATCH v1: 11/12] mc: register MC qemu-file functions and expose MC tunable capability
  2013-10-23 11:00   ` Isaku Yamahata
@ 2013-11-06 16:34     ` Michael R. Hines
  2013-11-07  2:38       ` Isaku Yamahata
  0 siblings, 1 reply; 16+ messages in thread
From: Michael R. Hines @ 2013-11-06 16:34 UTC (permalink / raw)
  To: Isaku Yamahata
  Cc: aliguori, quintela, qemu-devel, owasserm, onom, abali, mrhines,
	gokul, pbonzini

On 10/23/2013 07:00 AM, Isaku Yamahata wrote:
> Since more integer parameters would come in the future, so how about
> set_migrate_parameter similar to set_migrate_capability?
> It sets integer value, while set_migrate_capability sets bool value.
>
> thanks,
>
> On Mon, Oct 21, 2013 at 01:14:21AM +0000,
> mrhines@linux.vnet.ibm.com wrote:
>
>> From: "Michael R. Hines" <mrhines@us.ibm.com>
>>
>> The capability allows management software to throttle the MC frequency
>> during VM application transience.
>>
>> The qemu-file savevm() functions inform the destination that the incoming
>> traffic is MC-specific traffic and not vanilla live-migration traffic.
>>
>> Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
>> ---
>>   hmp-commands.hx  | 14 ++++++++++++++
>>   hmp.c            |  6 ++++++
>>   hmp.h            |  1 +
>>   qapi-schema.json | 13 +++++++++++++
>>   qmp-commands.hx  | 23 +++++++++++++++++++++++
>>   vl.c             |  3 +++
>>   6 files changed, 60 insertions(+)
>>
>> diff --git a/hmp-commands.hx b/hmp-commands.hx
>> index caae5ad..7db0597 100644
>> --- a/hmp-commands.hx
>> +++ b/hmp-commands.hx
>> @@ -960,6 +960,20 @@ Set maximum tolerated downtime (in seconds) for migration.
>>   ETEXI
>>   
>>       {
>> +        .name       = "migrate-set-mc-delay",
>> +        .args_type  = "value:i",
>> +        .params     = "value",
>> +        .help       = "set maximum delay (in milliseconds) between micro-checkpoints",
>> +        .mhandler.cmd = hmp_migrate_set_mc_delay,
>> +    },
>> +
>> +STEXI
>> +@item migrate_set_downtime @var{second}
>> +@findex migrate_set_downtime
>> +Set maximum tolerated downtime (in seconds) for migration.
>> +ETEXI
>> +
>> +    {
>>           .name       = "migrate_set_capability",
>>           .args_type  = "capability:s,state:b",
>>           .params     = "capability state",
>> diff --git a/hmp.c b/hmp.c
>> index 43896e9..8e89ac7 100644
>> --- a/hmp.c
>> +++ b/hmp.c
>> @@ -1026,6 +1026,12 @@ void hmp_migrate_set_downtime(Monitor *mon, const QDict *qdict)
>>       qmp_migrate_set_downtime(value, NULL);
>>   }
>>   
>> +void hmp_migrate_set_mc_delay(Monitor *mon, const QDict *qdict)
>> +{
>> +    int64_t value = qdict_get_int(qdict, "value");
>> +    qmp_migrate_set_mc_delay(value, NULL);
>> +}
>> +
>>   void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict)
>>   {
>>       int64_t value = qdict_get_int(qdict, "value");
>> diff --git a/hmp.h b/hmp.h
>> index 54cf71f..b6548a3 100644
>> --- a/hmp.h
>> +++ b/hmp.h
>> @@ -60,6 +60,7 @@ void hmp_drive_mirror(Monitor *mon, const QDict *qdict);
>>   void hmp_drive_backup(Monitor *mon, const QDict *qdict);
>>   void hmp_migrate_cancel(Monitor *mon, const QDict *qdict);
>>   void hmp_migrate_set_downtime(Monitor *mon, const QDict *qdict);
>> +void hmp_migrate_set_mc_delay(Monitor *mon, const QDict *qdict);
>>   void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict);
>>   void hmp_migrate_set_capability(Monitor *mon, const QDict *qdict);
>>   void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict);
>> diff --git a/qapi-schema.json b/qapi-schema.json
>> index e0a430c..2ed8098 100644
>> --- a/qapi-schema.json
>> +++ b/qapi-schema.json
>> @@ -2135,6 +2135,19 @@
>>   { 'command': 'migrate_set_downtime', 'data': {'value': 'number'} }
>>   
>>   ##
>> +# @migrate-set-mc-delay
>> +#
>> +# Set delay (in milliseconds) between micro checkpoints.
>> +#
>> +# @value: maximum delay in milliseconds
>> +#
>> +# Returns: nothing on success
>> +#
>> +# Since: 1.6
>> +##
>> +{ 'command': 'migrate-set-mc-delay', 'data': {'value': 'int'} }
>> +
>> +##
>>   # @migrate_set_speed
>>   #
>>   # Set maximum speed for migration.
>> diff --git a/qmp-commands.hx b/qmp-commands.hx
>> index fba15cd..6d7ef2f 100644
>> --- a/qmp-commands.hx
>> +++ b/qmp-commands.hx
>> @@ -754,6 +754,29 @@ Example:
>>   EQMP
>>   
>>       {
>> +        .name       = "migrate-set-mc-delay",
>> +        .args_type  = "value:i",
>> +        .mhandler.cmd_new = qmp_marshal_input_migrate_set_mc_delay,
>> +    },
>> +
>> +SQMP
>> +migrate-set-mc-delay
>> +--------------------
>> +
>> +Set maximum delay (in milliseconds) between micro-checkpoints.
>> +
>> +Arguments:
>> +
>> +- "value": maximum delay (json-int)
>> +
>> +Example:
>> +
>> +-> { "execute": "migrate-set-mc-delay", "arguments": { "value": 100 } }
>> +<- { "return": {} }
>> +
>> +EQMP
>> +
>> +    {
>>           .name       = "client_migrate_info",
>>           .args_type  = "protocol:s,hostname:s,port:i?,tls-port:i?,cert-subject:s?",
>>           .params     = "protocol hostname port tls-port cert-subject",
>> diff --git a/vl.c b/vl.c
>> index 74d52ab..fa23d66 100644
>> --- a/vl.c
>> +++ b/vl.c
>> @@ -29,6 +29,7 @@
>>   #include <sys/time.h>
>>   #include <zlib.h>
>>   #include "qemu/bitmap.h"
>> +#include "migration/qemu-file.h"
>>   
>>   /* Needed early for CONFIG_BSD etc. */
>>   #include "config-host.h"
>> @@ -4192,6 +4193,8 @@ int main(int argc, char **argv, char **envp)
>>       default_drive(default_sdcard, snapshot, IF_SD, 0, SD_OPTS);
>>   
>>       register_savevm_live(NULL, "ram", 0, 4, &savevm_ram_handlers, NULL);
>> +    register_savevm(NULL, "mc", -1, MC_VERSION, mc_info_save,
>> +                                mc_info_load, NULL);
>>   
>>       if (nb_numa_nodes > 0) {
>>           int i;
>> -- 
>> 1.8.1.2
>>
>>

That's a very good idea, I think - but we might get some pushback from 
the list.

There is a hesitation to add such low-level parameters, but for 
something like
micro-checkpointing that may potentially have large impacts on 
application performance,
I do think it would be critical to expose more customizability
like 'set_migrate_parameter' to management software.

In fact, this separation of function may be "better" than continuing to 
expand
the list of individual commands in the QEMU monitor.

Could you submit a patch for the framework of such a command?

- Michael


- Michael

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [Qemu-devel] [RFC PATCH v1: 11/12] mc: register MC qemu-file functions and expose MC tunable capability
  2013-11-06 16:34     ` Michael R. Hines
@ 2013-11-07  2:38       ` Isaku Yamahata
  0 siblings, 0 replies; 16+ messages in thread
From: Isaku Yamahata @ 2013-11-07  2:38 UTC (permalink / raw)
  To: Michael R. Hines
  Cc: aliguori, quintela, qemu-devel, owasserm, onom, abali, mrhines,
	gokul, pbonzini, Isaku Yamahata

On Wed, Nov 06, 2013 at 11:34:25AM -0500,
"Michael R. Hines" <mrhines@linux.vnet.ibm.com> wrote:

> On 10/23/2013 07:00 AM, Isaku Yamahata wrote:
> >Since more integer parameters would come in the future, so how about
> >set_migrate_parameter similar to set_migrate_capability?
> >It sets integer value, while set_migrate_capability sets bool value.
>
> That's a very good idea, I think - but we might get some pushback
> from the list.

Maybe. Actually we already have three configurable parameters.
bandwith_limit, xbzrle_cache_size, max_downtime.
They can be all consolidated.


> There is a hesitation to add such low-level parameters, but for
> something like
> micro-checkpointing that may potentially have large impacts on
> application performance,
> I do think it would be critical to expose more customizability
> like 'set_migrate_parameter' to management software.
> 
> In fact, this separation of function may be "better" than continuing
> to expand
> the list of individual commands in the QEMU monitor.
> 
> Could you submit a patch for the framework of such a command?

Sure.
-- 
Isaku Yamahata <isaku.yamahata@gmail.com>

^ permalink raw reply	[flat|nested] 16+ messages in thread

end of thread, other threads:[~2013-11-07  2:38 UTC | newest]

Thread overview: 16+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2013-10-21  1:14 [Qemu-devel] [RFC PATCH v1: 00/12] fault tolerance through micro-checkpointing mrhines
2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 01/12] mc: add documentation for micro-checkpointing mrhines
2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 02/12] rdma: remove reference to github.com mrhines
2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 03/12] migration: introduce parallelization of migration_bitmap mrhines
2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 04/12] mc: introduce a "checkpointing" status check into the VCPU states mrhines
2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 05/12] migration: support custom page loading mrhines
2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 06/12] rdma: accelerated memcpy() support mrhines
2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 07/12] mc: introduce state machine error handling and migration_bitmap prep mrhines
2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 08/12] mc: modified QMP statistics and migration_thread handoff mrhines
2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 09/12] mc: core logic mrhines
2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 10/12] mc: configure and makefile support mrhines
2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 11/12] mc: register MC qemu-file functions and expose MC tunable capability mrhines
2013-10-23 11:00   ` Isaku Yamahata
2013-11-06 16:34     ` Michael R. Hines
2013-11-07  2:38       ` Isaku Yamahata
2013-10-21  1:14 ` [Qemu-devel] [RFC PATCH v1: 12/12] mc: activate and use MC core logic if requested mrhines

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.