All of lore.kernel.org
 help / color / mirror / Atom feed
* [Qemu-devel] [PATCH v3 resend 0/8] rdma: core logic
@ 2013-07-16 16:48 mrhines
  2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 1/8] rdma: update documentation to reflect new unpin support mrhines
                   ` (7 more replies)
  0 siblings, 8 replies; 13+ messages in thread
From: mrhines @ 2013-07-16 16:48 UTC (permalink / raw)
  To: qemu-devel
  Cc: aliguori, quintela, knoel, owasserm, abali, mrhines, gokul,
	pbonzini, chegu_vinod

From: "Michael R. Hines" <mrhines@us.ibm.com>

Changes since v2:
- trivial bugfix
- re-ran checkpatch

Michael R. Hines (8):
  rdma: update documentation to reflect new unpin support
  rdma: bugfix: ram_control_save_page()
  rdma: introduce ram_handle_compressed()
  rdma: core logic
  rdma: send pc.ram
  rdma: allow state transitions between other states besides ACTIVE
  rdma: introduce MIG_STATE_NONE and change MIG_STATE_SETUP state
    transition
  rdma: account for the time spent in MIG_STATE_SETUP through QMP

 Makefile.objs                 |    1 +
 arch_init.c                   |   62 +-
 configure                     |   40 +
 docs/rdma.txt                 |   51 +-
 hmp.c                         |    4 +
 include/migration/migration.h |    7 +
 migration-rdma.c              | 3249 +++++++++++++++++++++++++++++++++++++++++
 migration.c                   |   48 +-
 qapi-schema.json              |    9 +-
 savevm.c                      |    2 +-
 10 files changed, 3427 insertions(+), 46 deletions(-)
 create mode 100644 migration-rdma.c

-- 
1.7.10.4

^ permalink raw reply	[flat|nested] 13+ messages in thread

* [Qemu-devel] [PATCH v3 resend 1/8] rdma: update documentation to reflect new unpin support
  2013-07-16 16:48 [Qemu-devel] [PATCH v3 resend 0/8] rdma: core logic mrhines
@ 2013-07-16 16:48 ` mrhines
  2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 2/8] rdma: bugfix: ram_control_save_page() mrhines
                   ` (6 subsequent siblings)
  7 siblings, 0 replies; 13+ messages in thread
From: mrhines @ 2013-07-16 16:48 UTC (permalink / raw)
  To: qemu-devel
  Cc: aliguori, quintela, knoel, owasserm, abali, mrhines, gokul,
	pbonzini, chegu_vinod

From: "Michael R. Hines" <mrhines@us.ibm.com>

As requested, the protocol now includes memory unpinning support.
This has been implemented in a non-optimized manner, in such a way
that one could devise an LRU or other workload-specific information
on top of the basic mechanism to influence the way unpinning happens
during runtime.

The feature is not yet user-facing, and is thus can only be enabled
at compile-time.

Reviewed-by: Eric Blake <eblake@redhat.com>
Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
---
 docs/rdma.txt |   51 ++++++++++++++++++++++++++++++---------------------
 1 file changed, 30 insertions(+), 21 deletions(-)

diff --git a/docs/rdma.txt b/docs/rdma.txt
index 45a4b1d..45d1c8a 100644
--- a/docs/rdma.txt
+++ b/docs/rdma.txt
@@ -35,7 +35,7 @@ memory tracked during each live migration iteration round cannot keep pace
 with the rate of dirty memory produced by the workload.
 
 RDMA currently comes in two flavors: both Ethernet based (RoCE, or RDMA
-over Convered Ethernet) as well as Infiniband-based. This implementation of
+over Converged Ethernet) as well as Infiniband-based. This implementation of
 migration using RDMA is capable of using both technologies because of
 the use of the OpenFabrics OFED software stack that abstracts out the
 programming model irrespective of the underlying hardware.
@@ -188,9 +188,9 @@ header portion and a data portion (but together are transmitted
 as a single SEND message).
 
 Header:
-    * Length  (of the data portion, uint32, network byte order)
-    * Type    (what command to perform, uint32, network byte order)
-    * Repeat  (Number of commands in data portion, same type only)
+    * Length               (of the data portion, uint32, network byte order)
+    * Type                 (what command to perform, uint32, network byte order)
+    * Repeat               (Number of commands in data portion, same type only)
 
 The 'Repeat' field is here to support future multiple page registrations
 in a single message without any need to change the protocol itself
@@ -202,17 +202,19 @@ The maximum number of repeats is hard-coded to 4096. This is a conservative
 limit based on the maximum size of a SEND message along with emperical
 observations on the maximum future benefit of simultaneous page registrations.
 
-The 'type' field has 10 different command values:
-    1. Unused
-    2. Error              (sent to the source during bad things)
-    3. Ready              (control-channel is available)
-    4. QEMU File          (for sending non-live device state)
-    5. RAM Blocks request (used right after connection setup)
-    6. RAM Blocks result  (used right after connection setup)
-    7. Compress page      (zap zero page and skip registration)
-    8. Register request   (dynamic chunk registration)
-    9. Register result    ('rkey' to be used by sender)
-    10. Register finished  (registration for current iteration finished)
+The 'type' field has 12 different command values:
+     1. Unused
+     2. Error                      (sent to the source during bad things)
+     3. Ready                      (control-channel is available)
+     4. QEMU File                  (for sending non-live device state)
+     5. RAM Blocks request         (used right after connection setup)
+     6. RAM Blocks result          (used right after connection setup)
+     7. Compress page              (zap zero page and skip registration)
+     8. Register request           (dynamic chunk registration)
+     9. Register result            ('rkey' to be used by sender)
+    10. Register finished          (registration for current iteration finished)
+    11. Unregister request         (unpin previously registered memory)
+    12. Unregister finished        (confirmation that unpin completed)
 
 A single control message, as hinted above, can contain within the data
 portion an array of many commands of the same type. If there is more than
@@ -243,7 +245,7 @@ qemu_rdma_exchange_send(header, data, optional response header & data):
    from the receiver to tell us that the receiver
    is *ready* for us to transmit some new bytes.
 2. Optionally: if we are expecting a response from the command
-   (that we have no yet transmitted), let's post an RQ
+   (that we have not yet transmitted), let's post an RQ
    work request to receive that data a few moments later.
 3. When the READY arrives, librdmacm will
    unblock us and we immediately post a RQ work request
@@ -293,8 +295,10 @@ librdmacm provides the user with a 'private data' area to be exchanged
 at connection-setup time before any infiniband traffic is generated.
 
 Header:
-    * Version (protocol version validated before send/recv occurs), uint32, network byte order
-    * Flags   (bitwise OR of each capability), uint32, network byte order
+    * Version (protocol version validated before send/recv occurs),
+                                               uint32, network byte order
+    * Flags   (bitwise OR of each capability),
+                                               uint32, network byte order
 
 There is no data portion of this header right now, so there is
 no length field. The maximum size of the 'private data' section
@@ -313,7 +317,7 @@ If the version is invalid, we throw an error.
 If the version is new, we only negotiate the capabilities that the
 requested version is able to perform and ignore the rest.
 
-Currently there is only *one* capability in Version #1: dynamic page registration
+Currently there is only one capability in Version #1: dynamic page registration
 
 Finally: Negotiation happens with the Flags field: If the primary-VM
 sets a flag, but the destination does not support this capability, it
@@ -326,8 +330,8 @@ QEMUFileRDMA Interface:
 
 QEMUFileRDMA introduces a couple of new functions:
 
-1. qemu_rdma_get_buffer()  (QEMUFileOps rdma_read_ops)
-2. qemu_rdma_put_buffer()  (QEMUFileOps rdma_write_ops)
+1. qemu_rdma_get_buffer()               (QEMUFileOps rdma_read_ops)
+2. qemu_rdma_put_buffer()               (QEMUFileOps rdma_write_ops)
 
 These two functions are very short and simply use the protocol
 describe above to deliver bytes without changing the upper-level
@@ -413,3 +417,8 @@ TODO:
    the use of KSM and ballooning while using RDMA.
 4. Also, some form of balloon-device usage tracking would also
    help alleviate some issues.
+5. Move UNREGISTER requests to a separate thread.
+6. Use LRU to provide more fine-grained direction of UNREGISTER
+   requests for unpinning memory in an overcommitted environment.
+7. Expose UNREGISTER support to the user by way of workload-specific
+   hints about application behavior.
-- 
1.7.10.4

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [Qemu-devel] [PATCH v3 resend 2/8] rdma: bugfix: ram_control_save_page()
  2013-07-16 16:48 [Qemu-devel] [PATCH v3 resend 0/8] rdma: core logic mrhines
  2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 1/8] rdma: update documentation to reflect new unpin support mrhines
@ 2013-07-16 16:48 ` mrhines
  2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 3/8] rdma: introduce ram_handle_compressed() mrhines
                   ` (5 subsequent siblings)
  7 siblings, 0 replies; 13+ messages in thread
From: mrhines @ 2013-07-16 16:48 UTC (permalink / raw)
  To: qemu-devel
  Cc: aliguori, quintela, knoel, owasserm, abali, mrhines, gokul,
	pbonzini, chegu_vinod

From: "Michael R. Hines" <mrhines@us.ibm.com>

We were not checking for a valid 'bytes_sent' pointer before accessing it.

Reviewed-by: Eric Blake <eblake@redhat.com>
Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
---
 savevm.c |    2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/savevm.c b/savevm.c
index e0491e7..03fc4d9 100644
--- a/savevm.c
+++ b/savevm.c
@@ -662,7 +662,7 @@ size_t ram_control_save_page(QEMUFile *f, ram_addr_t block_offset,
                                     offset, size, bytes_sent);
 
         if (ret != RAM_SAVE_CONTROL_DELAYED) {
-            if (*bytes_sent > 0) {
+            if (bytes_sent && *bytes_sent > 0) {
                 qemu_update_position(f, *bytes_sent);
             } else if (ret < 0) {
                 qemu_file_set_error(f, ret);
-- 
1.7.10.4

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [Qemu-devel] [PATCH v3 resend 3/8] rdma: introduce ram_handle_compressed()
  2013-07-16 16:48 [Qemu-devel] [PATCH v3 resend 0/8] rdma: core logic mrhines
  2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 1/8] rdma: update documentation to reflect new unpin support mrhines
  2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 2/8] rdma: bugfix: ram_control_save_page() mrhines
@ 2013-07-16 16:48 ` mrhines
  2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 4/8] rdma: core logic mrhines
                   ` (4 subsequent siblings)
  7 siblings, 0 replies; 13+ messages in thread
From: mrhines @ 2013-07-16 16:48 UTC (permalink / raw)
  To: qemu-devel
  Cc: aliguori, quintela, knoel, owasserm, abali, mrhines, gokul,
	pbonzini, chegu_vinod

From: "Michael R. Hines" <mrhines@us.ibm.com>

This gives RDMA shared access to madvise() on the destination side
when an entire chunk is found to be zero.

Reviewed-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Paolo Bonzini <pbonzini@redhat.com>
Reviewed-by: Chegu Vinod <chegu_vinod@hp.com>
Tested-by: Chegu Vinod <chegu_vinod@hp.com>
Tested-by: Michael R. Hines <mrhines@us.ibm.com>
Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
---
 arch_init.c                   |   29 +++++++++++++++++++----------
 include/migration/migration.h |    2 ++
 2 files changed, 21 insertions(+), 10 deletions(-)

diff --git a/arch_init.c b/arch_init.c
index 0e553c9..6a96bbd 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -777,6 +777,24 @@ static inline void *host_from_stream_offset(QEMUFile *f,
     return NULL;
 }
 
+/*
+ * If a page (or a whole RDMA chunk) has been
+ * determined to be zero, then zap it.
+ */
+void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
+{
+    if (ch != 0 || !is_zero_page(host)) {
+        memset(host, ch, size);
+#ifndef _WIN32
+        if (ch == 0 &&
+            (!kvm_enabled() || kvm_has_sync_mmu()) &&
+            getpagesize() <= TARGET_PAGE_SIZE) {
+            qemu_madvise(host, TARGET_PAGE_SIZE, QEMU_MADV_DONTNEED);
+        }
+#endif
+    }
+}
+
 static int ram_load(QEMUFile *f, void *opaque, int version_id)
 {
     ram_addr_t addr;
@@ -848,16 +866,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
             }
 
             ch = qemu_get_byte(f);
-            if (ch != 0 || !is_zero_page(host)) {
-                memset(host, ch, TARGET_PAGE_SIZE);
-#ifndef _WIN32
-                if (ch == 0 &&
-                    (!kvm_enabled() || kvm_has_sync_mmu()) &&
-                    getpagesize() <= TARGET_PAGE_SIZE) {
-                    qemu_madvise(host, TARGET_PAGE_SIZE, QEMU_MADV_DONTNEED);
-                }
-#endif
-            }
+            ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
         } else if (flags & RAM_SAVE_FLAG_PAGE) {
             void *host;
 
diff --git a/include/migration/migration.h b/include/migration/migration.h
index f0640e0..9d3cc85 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -109,6 +109,8 @@ uint64_t xbzrle_mig_pages_transferred(void);
 uint64_t xbzrle_mig_pages_overflow(void);
 uint64_t xbzrle_mig_pages_cache_miss(void);
 
+void ram_handle_compressed(void *host, uint8_t ch, uint64_t size);
+
 /**
  * @migrate_add_blocker - prevent migration from proceeding
  *
-- 
1.7.10.4

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [Qemu-devel] [PATCH v3 resend 4/8] rdma: core logic
  2013-07-16 16:48 [Qemu-devel] [PATCH v3 resend 0/8] rdma: core logic mrhines
                   ` (2 preceding siblings ...)
  2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 3/8] rdma: introduce ram_handle_compressed() mrhines
@ 2013-07-16 16:48 ` mrhines
  2013-07-18  7:30   ` Marcel Apfelbaum
  2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 5/8] rdma: send pc.ram mrhines
                   ` (3 subsequent siblings)
  7 siblings, 1 reply; 13+ messages in thread
From: mrhines @ 2013-07-16 16:48 UTC (permalink / raw)
  To: qemu-devel
  Cc: aliguori, quintela, knoel, owasserm, abali, mrhines, gokul,
	pbonzini, chegu_vinod

From: "Michael R. Hines" <mrhines@us.ibm.com>

Code that does need to be visible is kept
well contained inside this file and this is the only
new additional file to the entire patch.

This file includes the entire protocol and interfaces
required to perform RDMA migration.

Also, the configure and Makefile modifications to link
this file are included.

Full documentation is in docs/rdma.txt

Reviewed-by: Paolo Bonzini <pbonzini@redhat.com>
Reviewed-by: Chegu Vinod <chegu_vinod@hp.com>
Tested-by: Chegu Vinod <chegu_vinod@hp.com>
Tested-by: Michael R. Hines <mrhines@us.ibm.com>
Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
---
 Makefile.objs                 |    1 +
 configure                     |   40 +
 include/migration/migration.h |    4 +
 migration-rdma.c              | 3249 +++++++++++++++++++++++++++++++++++++++++
 migration.c                   |    8 +
 5 files changed, 3302 insertions(+)
 create mode 100644 migration-rdma.c

diff --git a/Makefile.objs b/Makefile.objs
index 5b288ba..9928542 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -51,6 +51,7 @@ common-obj-$(CONFIG_POSIX) += os-posix.o
 common-obj-$(CONFIG_LINUX) += fsdev/
 
 common-obj-y += migration.o migration-tcp.o
+common-obj-$(CONFIG_RDMA) += migration-rdma.o
 common-obj-y += qemu-char.o #aio.o
 common-obj-y += block-migration.o
 common-obj-y += page_cache.o xbzrle.o
diff --git a/configure b/configure
index cb0f870..2e4b04a 100755
--- a/configure
+++ b/configure
@@ -180,6 +180,7 @@ xfs=""
 vhost_net="no"
 vhost_scsi="no"
 kvm="no"
+rdma=""
 gprof="no"
 debug_tcg="no"
 debug="no"
@@ -936,6 +937,10 @@ for opt do
   ;;
   --enable-gtk) gtk="yes"
   ;;
+  --enable-rdma) rdma="yes"
+  ;;
+  --disable-rdma) rdma="no"
+  ;;
   --with-gtkabi=*) gtkabi="$optarg"
   ;;
   --enable-tpm) tpm="yes"
@@ -1094,6 +1099,8 @@ echo "  --enable-bluez           enable bluez stack connectivity"
 echo "  --disable-slirp          disable SLIRP userspace network connectivity"
 echo "  --disable-kvm            disable KVM acceleration support"
 echo "  --enable-kvm             enable KVM acceleration support"
+echo "  --disable-rdma           disable RDMA-based migration support"
+echo "  --enable-rdma            enable RDMA-based migration support"
 echo "  --enable-tcg-interpreter enable TCG with bytecode interpreter (TCI)"
 echo "  --disable-nptl           disable usermode NPTL support"
 echo "  --enable-nptl            enable usermode NPTL support"
@@ -1797,6 +1804,30 @@ EOF
 fi
 
 ##########################################
+# RDMA needs OpenFabrics libraries
+if test "$rdma" != "no" ; then
+  cat > $TMPC <<EOF
+#include <rdma/rdma_cma.h>
+int main(void) { return 0; }
+EOF
+  rdma_libs="-lrdmacm -libverbs"
+  if compile_prog "" "$rdma_libs" ; then
+    rdma="yes"
+    libs_softmmu="$libs_softmmu $rdma_libs"
+  else
+    if test "$rdma" = "yes" ; then
+        error_exit \
+            " OpenFabrics librdmacm/libibverbs not present." \
+            " Your options:" \
+            "  (1) Fast: Install infiniband packages from your distro." \
+            "  (2) Cleanest: Install libraries from www.openfabrics.org" \
+            "  (3) Also: Install softiwarp if you don't have RDMA hardware"
+    fi
+    rdma="no"
+  fi
+fi
+
+##########################################
 # VNC TLS/WS detection
 if test "$vnc" = "yes" -a \( "$vnc_tls" != "no" -o "$vnc_ws" != "no" \) ; then
   cat > $TMPC <<EOF
@@ -3555,6 +3586,7 @@ echo "Linux AIO support $linux_aio"
 echo "ATTR/XATTR support $attr"
 echo "Install blobs     $blobs"
 echo "KVM support       $kvm"
+echo "RDMA support      $rdma"
 echo "TCG interpreter   $tcg_interpreter"
 echo "fdt support       $fdt"
 echo "preadv support    $preadv"
@@ -4039,6 +4071,10 @@ if test "$trace_default" = "yes"; then
   echo "CONFIG_TRACE_DEFAULT=y" >> $config_host_mak
 fi
 
+if test "$rdma" = "yes" ; then
+  echo "CONFIG_RDMA=y" >> $config_host_mak
+fi
+
 if test "$tcg_interpreter" = "yes"; then
   QEMU_INCLUDES="-I\$(SRC_PATH)/tcg/tci $QEMU_INCLUDES"
 elif test "$ARCH" = "sparc64" ; then
@@ -4478,6 +4514,10 @@ if [ "$pixman" = "internal" ]; then
   echo "config-host.h: subdir-pixman" >> $config_host_mak
 fi
 
+if test "$rdma" = "yes" ; then
+echo "CONFIG_RDMA=y" >> $config_host_mak
+fi
+
 if [ "$dtc_internal" = "yes" ]; then
   echo "config-host.h: subdir-dtc" >> $config_host_mak
 fi
diff --git a/include/migration/migration.h b/include/migration/migration.h
index 9d3cc85..b5e413a 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -77,6 +77,10 @@ void fd_start_incoming_migration(const char *path, Error **errp);
 
 void fd_start_outgoing_migration(MigrationState *s, const char *fdname, Error **errp);
 
+void rdma_start_outgoing_migration(void *opaque, const char *host_port, Error **errp);
+
+void rdma_start_incoming_migration(const char *host_port, Error **errp);
+
 void migrate_fd_error(MigrationState *s);
 
 void migrate_fd_connect(MigrationState *s);
diff --git a/migration-rdma.c b/migration-rdma.c
new file mode 100644
index 0000000..d044830
--- /dev/null
+++ b/migration-rdma.c
@@ -0,0 +1,3249 @@
+/*
+ * RDMA protocol and interfaces
+ *
+ * Copyright IBM, Corp. 2010-2013
+ *
+ * Authors:
+ *  Michael R. Hines <mrhines@us.ibm.com>
+ *  Jiuxing Liu <jl@us.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later.  See the COPYING file in the top-level directory.
+ *
+ */
+#include "qemu-common.h"
+#include "migration/migration.h"
+#include "migration/qemu-file.h"
+#include "exec/cpu-common.h"
+#include "qemu/main-loop.h"
+#include "qemu/sockets.h"
+#include "qemu/bitmap.h"
+#include "block/coroutine.h"
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <arpa/inet.h>
+#include <string.h>
+#include <rdma/rdma_cma.h>
+
+#define DEBUG_RDMA
+//#define DEBUG_RDMA_VERBOSE
+//#define DEBUG_RDMA_REALLY_VERBOSE
+
+#ifdef DEBUG_RDMA
+#define DPRINTF(fmt, ...) \
+    do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
+#else
+#define DPRINTF(fmt, ...) \
+    do { } while (0)
+#endif
+
+#ifdef DEBUG_RDMA_VERBOSE
+#define DDPRINTF(fmt, ...) \
+    do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
+#else
+#define DDPRINTF(fmt, ...) \
+    do { } while (0)
+#endif
+
+#ifdef DEBUG_RDMA_REALLY_VERBOSE
+#define DDDPRINTF(fmt, ...) \
+    do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
+#else
+#define DDDPRINTF(fmt, ...) \
+    do { } while (0)
+#endif
+
+/*
+ * Print and error on both the Monitor and the Log file.
+ */
+#define ERROR(errp, fmt, ...) \
+    do { \
+        fprintf(stderr, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
+        if (errp && (*(errp) == NULL)) { \
+            error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
+        } \
+    } while (0)
+
+#define RDMA_RESOLVE_TIMEOUT_MS 10000
+
+/* Do not merge data if larger than this. */
+#define RDMA_MERGE_MAX (2 * 1024 * 1024)
+#define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
+
+#define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
+
+/*
+ * This is only for non-live state being migrated.
+ * Instead of RDMA_WRITE messages, we use RDMA_SEND
+ * messages for that state, which requires a different
+ * delivery design than main memory.
+ */
+#define RDMA_SEND_INCREMENT 32768
+
+/*
+ * Maximum size infiniband SEND message
+ */
+#define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
+#define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
+
+#define RDMA_CONTROL_VERSION_CURRENT 1
+/*
+ * Capabilities for negotiation.
+ */
+#define RDMA_CAPABILITY_PIN_ALL 0x01
+
+/*
+ * Add the other flags above to this list of known capabilities
+ * as they are introduced.
+ */
+static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
+
+#define CHECK_ERROR_STATE() \
+    do { \
+        if (rdma->error_state) { \
+            if (!rdma->error_reported) { \
+                fprintf(stderr, "RDMA is in an error state waiting migration" \
+                                " to abort!\n"); \
+                rdma->error_reported = 1; \
+            } \
+            return rdma->error_state; \
+        } \
+    } while (0);
+
+/*
+ * A work request ID is 64-bits and we split up these bits
+ * into 3 parts:
+ *
+ * bits 0-15 : type of control message, 2^16
+ * bits 16-29: ram block index, 2^14
+ * bits 30-63: ram block chunk number, 2^34
+ *
+ * The last two bit ranges are only used for RDMA writes,
+ * in order to track their completion and potentially
+ * also track unregistration status of the message.
+ */
+#define RDMA_WRID_TYPE_SHIFT  0UL
+#define RDMA_WRID_BLOCK_SHIFT 16UL
+#define RDMA_WRID_CHUNK_SHIFT 30UL
+
+#define RDMA_WRID_TYPE_MASK \
+    ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
+
+#define RDMA_WRID_BLOCK_MASK \
+    (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
+
+#define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
+
+/*
+ * RDMA migration protocol:
+ * 1. RDMA Writes (data messages, i.e. RAM)
+ * 2. IB Send/Recv (control channel messages)
+ */
+enum {
+    RDMA_WRID_NONE = 0,
+    RDMA_WRID_RDMA_WRITE = 1,
+    RDMA_WRID_SEND_CONTROL = 2000,
+    RDMA_WRID_RECV_CONTROL = 4000,
+};
+
+const char *wrid_desc[] = {
+    [RDMA_WRID_NONE] = "NONE",
+    [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
+    [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
+    [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
+};
+
+/*
+ * Work request IDs for IB SEND messages only (not RDMA writes).
+ * This is used by the migration protocol to transmit
+ * control messages (such as device state and registration commands)
+ *
+ * We could use more WRs, but we have enough for now.
+ */
+enum {
+    RDMA_WRID_READY = 0,
+    RDMA_WRID_DATA,
+    RDMA_WRID_CONTROL,
+    RDMA_WRID_MAX,
+};
+
+/*
+ * SEND/RECV IB Control Messages.
+ */
+enum {
+    RDMA_CONTROL_NONE = 0,
+    RDMA_CONTROL_ERROR,
+    RDMA_CONTROL_READY,               /* ready to receive */
+    RDMA_CONTROL_QEMU_FILE,           /* QEMUFile-transmitted bytes */
+    RDMA_CONTROL_RAM_BLOCKS_REQUEST,  /* RAMBlock synchronization */
+    RDMA_CONTROL_RAM_BLOCKS_RESULT,   /* RAMBlock synchronization */
+    RDMA_CONTROL_COMPRESS,            /* page contains repeat values */
+    RDMA_CONTROL_REGISTER_REQUEST,    /* dynamic page registration */
+    RDMA_CONTROL_REGISTER_RESULT,     /* key to use after registration */
+    RDMA_CONTROL_REGISTER_FINISHED,   /* current iteration finished */
+    RDMA_CONTROL_UNREGISTER_REQUEST,  /* dynamic UN-registration */
+    RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
+};
+
+const char *control_desc[] = {
+    [RDMA_CONTROL_NONE] = "NONE",
+    [RDMA_CONTROL_ERROR] = "ERROR",
+    [RDMA_CONTROL_READY] = "READY",
+    [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
+    [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
+    [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
+    [RDMA_CONTROL_COMPRESS] = "COMPRESS",
+    [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
+    [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
+    [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
+    [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
+    [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
+};
+
+/*
+ * Memory and MR structures used to represent an IB Send/Recv work request.
+ * This is *not* used for RDMA writes, only IB Send/Recv.
+ */
+typedef struct {
+    uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
+    struct   ibv_mr *control_mr;               /* registration metadata */
+    size_t   control_len;                      /* length of the message */
+    uint8_t *control_curr;                     /* start of unconsumed bytes */
+} RDMAWorkRequestData;
+
+/*
+ * Negotiate RDMA capabilities during connection-setup time.
+ */
+typedef struct {
+    uint32_t version;
+    uint32_t flags;
+} RDMACapabilities;
+
+static void caps_to_network(RDMACapabilities *cap)
+{
+    cap->version = htonl(cap->version);
+    cap->flags = htonl(cap->flags);
+}
+
+static void network_to_caps(RDMACapabilities *cap)
+{
+    cap->version = ntohl(cap->version);
+    cap->flags = ntohl(cap->flags);
+}
+
+/*
+ * Representation of a RAMBlock from an RDMA perspective.
+ * This is not transmitted, only local.
+ * This and subsequent structures cannot be linked lists
+ * because we're using a single IB message to transmit
+ * the information. It's small anyway, so a list is overkill.
+ */
+typedef struct RDMALocalBlock {
+    uint8_t  *local_host_addr; /* local virtual address */
+    uint64_t remote_host_addr; /* remote virtual address */
+    uint64_t offset;
+    uint64_t length;
+    struct   ibv_mr **pmr;     /* MRs for chunk-level registration */
+    struct   ibv_mr *mr;       /* MR for non-chunk-level registration */
+    uint32_t *remote_keys;     /* rkeys for chunk-level registration */
+    uint32_t remote_rkey;      /* rkeys for non-chunk-level registration */
+    int      index;            /* which block are we */
+    bool     is_ram_block;
+    int      nb_chunks;
+    unsigned long *transit_bitmap;
+    unsigned long *unregister_bitmap;
+} RDMALocalBlock;
+
+/*
+ * Also represents a RAMblock, but only on the dest.
+ * This gets transmitted by the dest during connection-time
+ * to the source VM and then is used to populate the
+ * corresponding RDMALocalBlock with
+ * the information needed to perform the actual RDMA.
+ */
+typedef struct QEMU_PACKED RDMARemoteBlock {
+    uint64_t remote_host_addr;
+    uint64_t offset;
+    uint64_t length;
+    uint32_t remote_rkey;
+    uint32_t padding;
+} RDMARemoteBlock;
+
+static uint64_t htonll(uint64_t v)
+{
+    union { uint32_t lv[2]; uint64_t llv; } u;
+    u.lv[0] = htonl(v >> 32);
+    u.lv[1] = htonl(v & 0xFFFFFFFFULL);
+    return u.llv;
+}
+
+static uint64_t ntohll(uint64_t v) {
+    union { uint32_t lv[2]; uint64_t llv; } u;
+    u.llv = v;
+    return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
+}
+
+static void remote_block_to_network(RDMARemoteBlock *rb)
+{
+    rb->remote_host_addr = htonll(rb->remote_host_addr);
+    rb->offset = htonll(rb->offset);
+    rb->length = htonll(rb->length);
+    rb->remote_rkey = htonl(rb->remote_rkey);
+}
+
+static void network_to_remote_block(RDMARemoteBlock *rb)
+{
+    rb->remote_host_addr = ntohll(rb->remote_host_addr);
+    rb->offset = ntohll(rb->offset);
+    rb->length = ntohll(rb->length);
+    rb->remote_rkey = ntohl(rb->remote_rkey);
+}
+
+/*
+ * Virtual address of the above structures used for transmitting
+ * the RAMBlock descriptions at connection-time.
+ * This structure is *not* transmitted.
+ */
+typedef struct RDMALocalBlocks {
+    int nb_blocks;
+    bool     init;             /* main memory init complete */
+    RDMALocalBlock *block;
+} RDMALocalBlocks;
+
+/*
+ * Main data structure for RDMA state.
+ * While there is only one copy of this structure being allocated right now,
+ * this is the place where one would start if you wanted to consider
+ * having more than one RDMA connection open at the same time.
+ */
+typedef struct RDMAContext {
+    char *host;
+    int port;
+
+    RDMAWorkRequestData wr_data[RDMA_WRID_MAX + 1];
+
+    /*
+     * This is used by *_exchange_send() to figure out whether or not
+     * the initial "READY" message has already been received or not.
+     * This is because other functions may potentially poll() and detect
+     * the READY message before send() does, in which case we need to
+     * know if it completed.
+     */
+    int control_ready_expected;
+
+    /* number of outstanding writes */
+    int nb_sent;
+
+    /* store info about current buffer so that we can
+       merge it with future sends */
+    uint64_t current_addr;
+    uint64_t current_length;
+    /* index of ram block the current buffer belongs to */
+    int current_index;
+    /* index of the chunk in the current ram block */
+    int current_chunk;
+
+    bool pin_all;
+
+    /*
+     * infiniband-specific variables for opening the device
+     * and maintaining connection state and so forth.
+     *
+     * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
+     * cm_id->verbs, cm_id->channel, and cm_id->qp.
+     */
+    struct rdma_cm_id *cm_id;               /* connection manager ID */
+    struct rdma_cm_id *listen_id;
+
+    struct ibv_context          *verbs;
+    struct rdma_event_channel   *channel;
+    struct ibv_qp *qp;                      /* queue pair */
+    struct ibv_comp_channel *comp_channel;  /* completion channel */
+    struct ibv_pd *pd;                      /* protection domain */
+    struct ibv_cq *cq;                      /* completion queue */
+
+    /*
+     * If a previous write failed (perhaps because of a failed
+     * memory registration, then do not attempt any future work
+     * and remember the error state.
+     */
+    int error_state;
+    int error_reported;
+
+    /*
+     * Description of ram blocks used throughout the code.
+     */
+    RDMALocalBlocks local_ram_blocks;
+    RDMARemoteBlock *block;
+
+    /*
+     * Migration on *destination* started.
+     * Then use coroutine yield function.
+     * Source runs in a thread, so we don't care.
+     */
+    int migration_started_on_destination;
+
+    int total_registrations;
+    int total_writes;
+
+    int unregister_current, unregister_next;
+    uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
+
+    GHashTable *blockmap;
+} RDMAContext;
+
+/*
+ * Interface to the rest of the migration call stack.
+ */
+typedef struct QEMUFileRDMA {
+    RDMAContext *rdma;
+    size_t len;
+    void *file;
+} QEMUFileRDMA;
+
+/*
+ * Main structure for IB Send/Recv control messages.
+ * This gets prepended at the beginning of every Send/Recv.
+ */
+typedef struct QEMU_PACKED {
+    uint32_t len;     /* Total length of data portion */
+    uint32_t type;    /* which control command to perform */
+    uint32_t repeat;  /* number of commands in data portion of same type */
+    uint32_t padding;
+} RDMAControlHeader;
+
+static void control_to_network(RDMAControlHeader *control)
+{
+    control->type = htonl(control->type);
+    control->len = htonl(control->len);
+    control->repeat = htonl(control->repeat);
+}
+
+static void network_to_control(RDMAControlHeader *control)
+{
+    control->type = ntohl(control->type);
+    control->len = ntohl(control->len);
+    control->repeat = ntohl(control->repeat);
+}
+
+/*
+ * Register a single Chunk.
+ * Information sent by the source VM to inform the dest
+ * to register an single chunk of memory before we can perform
+ * the actual RDMA operation.
+ */
+typedef struct QEMU_PACKED {
+    union QEMU_PACKED {
+        uint64_t current_addr;  /* offset into the ramblock of the chunk */
+        uint64_t chunk;         /* chunk to lookup if unregistering */
+    } key;
+    uint32_t current_index; /* which ramblock the chunk belongs to */
+    uint32_t padding;
+    uint64_t chunks;            /* how many sequential chunks to register */
+} RDMARegister;
+
+static void register_to_network(RDMARegister *reg)
+{
+    reg->key.current_addr = htonll(reg->key.current_addr);
+    reg->current_index = htonl(reg->current_index);
+    reg->chunks = htonll(reg->chunks);
+}
+
+static void network_to_register(RDMARegister *reg)
+{
+    reg->key.current_addr = ntohll(reg->key.current_addr);
+    reg->current_index = ntohl(reg->current_index);
+    reg->chunks = ntohll(reg->chunks);
+}
+
+typedef struct QEMU_PACKED {
+    uint32_t value;     /* if zero, we will madvise() */
+    uint32_t block_idx; /* which ram block index */
+    uint64_t offset;    /* where in the remote ramblock this chunk */
+    uint64_t length;    /* length of the chunk */
+} RDMACompress;
+
+static void compress_to_network(RDMACompress *comp)
+{
+    comp->value = htonl(comp->value);
+    comp->block_idx = htonl(comp->block_idx);
+    comp->offset = htonll(comp->offset);
+    comp->length = htonll(comp->length);
+}
+
+static void network_to_compress(RDMACompress *comp)
+{
+    comp->value = ntohl(comp->value);
+    comp->block_idx = ntohl(comp->block_idx);
+    comp->offset = ntohll(comp->offset);
+    comp->length = ntohll(comp->length);
+}
+
+/*
+ * The result of the dest's memory registration produces an "rkey"
+ * which the source VM must reference in order to perform
+ * the RDMA operation.
+ */
+typedef struct QEMU_PACKED {
+    uint32_t rkey;
+    uint32_t padding;
+    uint64_t host_addr;
+} RDMARegisterResult;
+
+static void result_to_network(RDMARegisterResult *result)
+{
+    result->rkey = htonl(result->rkey);
+    result->host_addr = htonll(result->host_addr);
+};
+
+static void network_to_result(RDMARegisterResult *result)
+{
+    result->rkey = ntohl(result->rkey);
+    result->host_addr = ntohll(result->host_addr);
+};
+
+const char *print_wrid(int wrid);
+static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
+                                   uint8_t *data, RDMAControlHeader *resp,
+                                   int *resp_idx,
+                                   int (*callback)(RDMAContext *rdma));
+
+static inline uint64_t ram_chunk_index(uint8_t *start, uint8_t *host)
+{
+    return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
+}
+
+static inline uint8_t *ram_chunk_start(RDMALocalBlock *rdma_ram_block,
+                                       uint64_t i)
+{
+    return (uint8_t *) (((uintptr_t) rdma_ram_block->local_host_addr)
+                                    + (i << RDMA_REG_CHUNK_SHIFT));
+}
+
+static inline uint8_t *ram_chunk_end(RDMALocalBlock *rdma_ram_block, uint64_t i)
+{
+    uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
+                                         (1UL << RDMA_REG_CHUNK_SHIFT);
+
+    if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
+        result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
+    }
+
+    return result;
+}
+
+static int __qemu_rdma_add_block(RDMAContext *rdma, void *host_addr,
+                         ram_addr_t block_offset, uint64_t length)
+{
+    RDMALocalBlocks *local = &rdma->local_ram_blocks;
+    RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
+        (void *) block_offset);
+    RDMALocalBlock *old = local->block;
+
+    assert(block == NULL);
+
+    local->block = g_malloc0(sizeof(RDMALocalBlock) * (local->nb_blocks + 1));
+
+    if (local->nb_blocks) {
+        int x;
+
+        for (x = 0; x < local->nb_blocks; x++) {
+            g_hash_table_remove(rdma->blockmap, (void *)old[x].offset);
+            g_hash_table_insert(rdma->blockmap, (void *)old[x].offset,
+                                                &local->block[x]);
+        }
+        memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
+        g_free(old);
+    }
+
+    block = &local->block[local->nb_blocks];
+
+    block->local_host_addr = host_addr;
+    block->offset = block_offset;
+    block->length = length;
+    block->index = local->nb_blocks;
+    block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
+    block->transit_bitmap = bitmap_new(block->nb_chunks);
+    bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
+    block->unregister_bitmap = bitmap_new(block->nb_chunks);
+    bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
+    block->remote_keys = g_malloc0(block->nb_chunks * sizeof(uint32_t));
+
+    block->is_ram_block = local->init ? false : true;
+
+    g_hash_table_insert(rdma->blockmap, (void *) block_offset, block);
+
+    DDPRINTF("Added Block: %d, addr: %" PRIu64 ", offset: %" PRIu64
+           " length: %" PRIu64 " end: %" PRIu64 " bits %" PRIu64 " chunks %d\n",
+            local->nb_blocks, (uint64_t) block->local_host_addr, block->offset,
+            block->length, (uint64_t) (block->local_host_addr + block->length),
+                BITS_TO_LONGS(block->nb_chunks) *
+                    sizeof(unsigned long) * 8, block->nb_chunks);
+
+    local->nb_blocks++;
+
+    return 0;
+}
+
+/*
+ * Memory regions need to be registered with the device and queue pairs setup
+ * in advanced before the migration starts. This tells us where the RAM blocks
+ * are so that we can register them individually.
+ */
+static void qemu_rdma_init_one_block(void *host_addr,
+    ram_addr_t block_offset, ram_addr_t length, void *opaque)
+{
+    __qemu_rdma_add_block(opaque, host_addr, block_offset, length);
+}
+
+/*
+ * Identify the RAMBlocks and their quantity. They will be references to
+ * identify chunk boundaries inside each RAMBlock and also be referenced
+ * during dynamic page registration.
+ */
+static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
+{
+    RDMALocalBlocks *local = &rdma->local_ram_blocks;
+
+    assert(rdma->blockmap == NULL);
+    rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
+    memset(local, 0, sizeof *local);
+    qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma);
+    DPRINTF("Allocated %d local ram block structures\n", local->nb_blocks);
+    rdma->block = (RDMARemoteBlock *) g_malloc0(sizeof(RDMARemoteBlock) *
+                        rdma->local_ram_blocks.nb_blocks);
+    local->init = true;
+    return 0;
+}
+
+static int __qemu_rdma_delete_block(RDMAContext *rdma, ram_addr_t block_offset)
+{
+    RDMALocalBlocks *local = &rdma->local_ram_blocks;
+    RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
+        (void *) block_offset);
+    RDMALocalBlock *old = local->block;
+    int x;
+
+    assert(block);
+
+    if (block->pmr) {
+        int j;
+
+        for (j = 0; j < block->nb_chunks; j++) {
+            if (!block->pmr[j]) {
+                continue;
+            }
+            ibv_dereg_mr(block->pmr[j]);
+            rdma->total_registrations--;
+        }
+        g_free(block->pmr);
+        block->pmr = NULL;
+    }
+
+    if (block->mr) {
+        ibv_dereg_mr(block->mr);
+        rdma->total_registrations--;
+        block->mr = NULL;
+    }
+
+    g_free(block->transit_bitmap);
+    block->transit_bitmap = NULL;
+
+    g_free(block->unregister_bitmap);
+    block->unregister_bitmap = NULL;
+
+    g_free(block->remote_keys);
+    block->remote_keys = NULL;
+
+    for (x = 0; x < local->nb_blocks; x++) {
+        g_hash_table_remove(rdma->blockmap, (void *)old[x].offset);
+    }
+
+    if (local->nb_blocks > 1) {
+
+        local->block = g_malloc0(sizeof(RDMALocalBlock) *
+                                    (local->nb_blocks - 1));
+
+        if (block->index) {
+            memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
+        }
+
+        if (block->index < (local->nb_blocks - 1)) {
+            memcpy(local->block + block->index, old + (block->index + 1),
+                sizeof(RDMALocalBlock) *
+                    (local->nb_blocks - (block->index + 1)));
+        }
+    } else {
+        assert(block == local->block);
+        local->block = NULL;
+    }
+
+    DDPRINTF("Deleted Block: %d, addr: %" PRIu64 ", offset: %" PRIu64
+           " length: %" PRIu64 " end: %" PRIu64 " bits %" PRIu64 " chunks %d\n",
+            local->nb_blocks, (uint64_t) block->local_host_addr, block->offset,
+            block->length, (uint64_t) (block->local_host_addr + block->length),
+                BITS_TO_LONGS(block->nb_chunks) *
+                    sizeof(unsigned long) * 8, block->nb_chunks);
+
+    g_free(old);
+
+    local->nb_blocks--;
+
+    if (local->nb_blocks) {
+        for (x = 0; x < local->nb_blocks; x++) {
+            g_hash_table_insert(rdma->blockmap, (void *)local->block[x].offset,
+                                                &local->block[x]);
+        }
+    }
+
+    return 0;
+}
+
+/*
+ * Put in the log file which RDMA device was opened and the details
+ * associated with that device.
+ */
+static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
+{
+    printf("%s RDMA Device opened: kernel name %s "
+           "uverbs device name %s, "
+           "infiniband_verbs class device path %s,"
+           " infiniband class device path %s\n",
+                who,
+                verbs->device->name,
+                verbs->device->dev_name,
+                verbs->device->dev_path,
+                verbs->device->ibdev_path);
+}
+
+/*
+ * Put in the log file the RDMA gid addressing information,
+ * useful for folks who have trouble understanding the
+ * RDMA device hierarchy in the kernel.
+ */
+static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
+{
+    char sgid[33];
+    char dgid[33];
+    inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
+    inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
+    DPRINTF("%s Source GID: %s, Dest GID: %s\n", who, sgid, dgid);
+}
+
+/*
+ * Figure out which RDMA device corresponds to the requested IP hostname
+ * Also create the initial connection manager identifiers for opening
+ * the connection.
+ */
+static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
+{
+    int ret;
+    struct addrinfo *res;
+    char port_str[16];
+    struct rdma_cm_event *cm_event;
+    char ip[40] = "unknown";
+
+    if (rdma->host == NULL || !strcmp(rdma->host, "")) {
+        ERROR(errp, "RDMA hostname has not been set\n");
+        return -1;
+    }
+
+    /* create CM channel */
+    rdma->channel = rdma_create_event_channel();
+    if (!rdma->channel) {
+        ERROR(errp, "could not create CM channel\n");
+        return -1;
+    }
+
+    /* create CM id */
+    ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
+    if (ret) {
+        ERROR(errp, "could not create channel id\n");
+        goto err_resolve_create_id;
+    }
+
+    snprintf(port_str, 16, "%d", rdma->port);
+    port_str[15] = '\0';
+
+    ret = getaddrinfo(rdma->host, port_str, NULL, &res);
+    if (ret < 0) {
+        ERROR(errp, "could not getaddrinfo address %s\n", rdma->host);
+        goto err_resolve_get_addr;
+    }
+
+    inet_ntop(AF_INET, &((struct sockaddr_in *) res->ai_addr)->sin_addr,
+                                ip, sizeof ip);
+    DPRINTF("%s => %s\n", rdma->host, ip);
+
+    /* resolve the first address */
+    ret = rdma_resolve_addr(rdma->cm_id, NULL, res->ai_addr,
+            RDMA_RESOLVE_TIMEOUT_MS);
+    if (ret) {
+        ERROR(errp, "could not resolve address %s\n", rdma->host);
+        goto err_resolve_get_addr;
+    }
+
+    qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
+
+    ret = rdma_get_cm_event(rdma->channel, &cm_event);
+    if (ret) {
+        ERROR(errp, "could not perform event_addr_resolved\n");
+        goto err_resolve_get_addr;
+    }
+
+    if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
+        ERROR(errp, "result not equal to event_addr_resolved %s\n",
+                rdma_event_str(cm_event->event));
+        perror("rdma_resolve_addr");
+        goto err_resolve_get_addr;
+    }
+    rdma_ack_cm_event(cm_event);
+
+    /* resolve route */
+    ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
+    if (ret) {
+        ERROR(errp, "could not resolve rdma route\n");
+        goto err_resolve_get_addr;
+    }
+
+    ret = rdma_get_cm_event(rdma->channel, &cm_event);
+    if (ret) {
+        ERROR(errp, "could not perform event_route_resolved\n");
+        goto err_resolve_get_addr;
+    }
+    if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
+        ERROR(errp, "result not equal to event_route_resolved: %s\n",
+                        rdma_event_str(cm_event->event));
+        rdma_ack_cm_event(cm_event);
+        goto err_resolve_get_addr;
+    }
+    rdma_ack_cm_event(cm_event);
+    rdma->verbs = rdma->cm_id->verbs;
+    qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
+    qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
+    return 0;
+
+err_resolve_get_addr:
+    rdma_destroy_id(rdma->cm_id);
+    rdma->cm_id = NULL;
+err_resolve_create_id:
+    rdma_destroy_event_channel(rdma->channel);
+    rdma->channel = NULL;
+
+    return -1;
+}
+
+/*
+ * Create protection domain and completion queues
+ */
+static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
+{
+    /* allocate pd */
+    rdma->pd = ibv_alloc_pd(rdma->verbs);
+    if (!rdma->pd) {
+        fprintf(stderr, "failed to allocate protection domain\n");
+        return -1;
+    }
+
+    /* create completion channel */
+    rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
+    if (!rdma->comp_channel) {
+        fprintf(stderr, "failed to allocate completion channel\n");
+        goto err_alloc_pd_cq;
+    }
+
+    /*
+     * Completion queue can be filled by both read and write work requests,
+     * so must reflect the sum of both possible queue sizes.
+     */
+    rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
+            NULL, rdma->comp_channel, 0);
+    if (!rdma->cq) {
+        fprintf(stderr, "failed to allocate completion queue\n");
+        goto err_alloc_pd_cq;
+    }
+
+    return 0;
+
+err_alloc_pd_cq:
+    if (rdma->pd) {
+        ibv_dealloc_pd(rdma->pd);
+    }
+    if (rdma->comp_channel) {
+        ibv_destroy_comp_channel(rdma->comp_channel);
+    }
+    rdma->pd = NULL;
+    rdma->comp_channel = NULL;
+    return -1;
+
+}
+
+/*
+ * Create queue pairs.
+ */
+static int qemu_rdma_alloc_qp(RDMAContext *rdma)
+{
+    struct ibv_qp_init_attr attr = { 0 };
+    int ret;
+
+    attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
+    attr.cap.max_recv_wr = 3;
+    attr.cap.max_send_sge = 1;
+    attr.cap.max_recv_sge = 1;
+    attr.send_cq = rdma->cq;
+    attr.recv_cq = rdma->cq;
+    attr.qp_type = IBV_QPT_RC;
+
+    ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
+    if (ret) {
+        return -1;
+    }
+
+    rdma->qp = rdma->cm_id->qp;
+    return 0;
+}
+
+static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
+{
+    int i;
+    RDMALocalBlocks *local = &rdma->local_ram_blocks;
+
+    for (i = 0; i < local->nb_blocks; i++) {
+        local->block[i].mr =
+            ibv_reg_mr(rdma->pd,
+                    local->block[i].local_host_addr,
+                    local->block[i].length,
+                    IBV_ACCESS_LOCAL_WRITE |
+                    IBV_ACCESS_REMOTE_WRITE
+                    );
+        if (!local->block[i].mr) {
+            perror("Failed to register local dest ram block!\n");
+            break;
+        }
+        rdma->total_registrations++;
+    }
+
+    if (i >= local->nb_blocks) {
+        return 0;
+    }
+
+    for (i--; i >= 0; i--) {
+        ibv_dereg_mr(local->block[i].mr);
+        rdma->total_registrations--;
+    }
+
+    return -1;
+
+}
+
+/*
+ * Find the ram block that corresponds to the page requested to be
+ * transmitted by QEMU.
+ *
+ * Once the block is found, also identify which 'chunk' within that
+ * block that the page belongs to.
+ *
+ * This search cannot fail or the migration will fail.
+ */
+static int qemu_rdma_search_ram_block(RDMAContext *rdma,
+                                      uint64_t block_offset,
+                                      uint64_t offset,
+                                      uint64_t length,
+                                      uint64_t *block_index,
+                                      uint64_t *chunk_index)
+{
+    uint64_t current_addr = block_offset + offset;
+    RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
+                                                (void *) block_offset);
+    assert(block);
+    assert(current_addr >= block->offset);
+    assert((current_addr + length) <= (block->offset + block->length));
+
+    *block_index = block->index;
+    *chunk_index = ram_chunk_index(block->local_host_addr,
+                block->local_host_addr + (current_addr - block->offset));
+
+    return 0;
+}
+
+/*
+ * Register a chunk with IB. If the chunk was already registered
+ * previously, then skip.
+ *
+ * Also return the keys associated with the registration needed
+ * to perform the actual RDMA operation.
+ */
+static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
+        RDMALocalBlock *block, uint8_t *host_addr,
+        uint32_t *lkey, uint32_t *rkey, int chunk,
+        uint8_t *chunk_start, uint8_t *chunk_end)
+{
+    if (block->mr) {
+        if (lkey) {
+            *lkey = block->mr->lkey;
+        }
+        if (rkey) {
+            *rkey = block->mr->rkey;
+        }
+        return 0;
+    }
+
+    /* allocate memory to store chunk MRs */
+    if (!block->pmr) {
+        block->pmr = g_malloc0(block->nb_chunks * sizeof(struct ibv_mr *));
+        if (!block->pmr) {
+            return -1;
+        }
+    }
+
+    /*
+     * If 'rkey', then we're the destination, so grant access to the source.
+     *
+     * If 'lkey', then we're the source VM, so grant access only to ourselves.
+     */
+    if (!block->pmr[chunk]) {
+        uint64_t len = chunk_end - chunk_start;
+
+        DDPRINTF("Registering %" PRIu64 " bytes @ %p\n",
+                 len, chunk_start);
+
+        block->pmr[chunk] = ibv_reg_mr(rdma->pd,
+                chunk_start, len,
+                (rkey ? (IBV_ACCESS_LOCAL_WRITE |
+                        IBV_ACCESS_REMOTE_WRITE) : 0));
+
+        if (!block->pmr[chunk]) {
+            perror("Failed to register chunk!");
+            fprintf(stderr, "Chunk details: block: %d chunk index %d"
+                            " start %" PRIu64 " end %" PRIu64 " host %" PRIu64
+                            " local %" PRIu64 " registrations: %d\n",
+                            block->index, chunk, (uint64_t) chunk_start,
+                            (uint64_t) chunk_end, (uint64_t) host_addr,
+                            (uint64_t) block->local_host_addr,
+                            rdma->total_registrations);
+            return -1;
+        }
+        rdma->total_registrations++;
+    }
+
+    if (lkey) {
+        *lkey = block->pmr[chunk]->lkey;
+    }
+    if (rkey) {
+        *rkey = block->pmr[chunk]->rkey;
+    }
+    return 0;
+}
+
+/*
+ * Register (at connection time) the memory used for control
+ * channel messages.
+ */
+static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
+{
+    rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
+            rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
+            IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
+    if (rdma->wr_data[idx].control_mr) {
+        rdma->total_registrations++;
+        return 0;
+    }
+    fprintf(stderr, "qemu_rdma_reg_control failed!\n");
+    return -1;
+}
+
+const char *print_wrid(int wrid)
+{
+    if (wrid >= RDMA_WRID_RECV_CONTROL) {
+        return wrid_desc[RDMA_WRID_RECV_CONTROL];
+    }
+    return wrid_desc[wrid];
+}
+
+/*
+ * RDMA requires memory registration (mlock/pinning), but this is not good for
+ * overcommitment.
+ *
+ * In preparation for the future where LRU information or workload-specific
+ * writable writable working set memory access behavior is available to QEMU
+ * it would be nice to have in place the ability to UN-register/UN-pin
+ * particular memory regions from the RDMA hardware when it is determine that
+ * those regions of memory will likely not be accessed again in the near future.
+ *
+ * While we do not yet have such information right now, the following
+ * compile-time option allows us to perform a non-optimized version of this
+ * behavior.
+ *
+ * By uncommenting this option, you will cause *all* RDMA transfers to be
+ * unregistered immediately after the transfer completes on both sides of the
+ * connection. This has no effect in 'rdma-pin-all' mode, only regular mode.
+ *
+ * This will have a terrible impact on migration performance, so until future
+ * workload information or LRU information is available, do not attempt to use
+ * this feature except for basic testing.
+ */
+//#define RDMA_UNREGISTRATION_EXAMPLE
+
+/*
+ * Perform a non-optimized memory unregistration after every transfer
+ * for demonsration purposes, only if pin-all is not requested.
+ *
+ * Potential optimizations:
+ * 1. Start a new thread to run this function continuously
+        - for bit clearing
+        - and for receipt of unregister messages
+ * 2. Use an LRU.
+ * 3. Use workload hints.
+ */
+static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
+{
+    while (rdma->unregistrations[rdma->unregister_current]) {
+        int ret;
+        uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
+        uint64_t chunk =
+            (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
+        uint64_t index =
+            (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
+        RDMALocalBlock *block =
+            &(rdma->local_ram_blocks.block[index]);
+        RDMARegister reg = { .current_index = index };
+        RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
+                                 };
+        RDMAControlHeader head = { .len = sizeof(RDMARegister),
+                                   .type = RDMA_CONTROL_UNREGISTER_REQUEST,
+                                   .repeat = 1,
+                                 };
+
+        DDPRINTF("Processing unregister for chunk: %" PRIu64
+                 " at position %d\n", chunk, rdma->unregister_current);
+
+        rdma->unregistrations[rdma->unregister_current] = 0;
+        rdma->unregister_current++;
+
+        if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
+            rdma->unregister_current = 0;
+        }
+
+
+        /*
+         * Unregistration is speculative (because migration is single-threaded
+         * and we cannot break the protocol's inifinband message ordering).
+         * Thus, if the memory is currently being used for transmission,
+         * then abort the attempt to unregister and try again
+         * later the next time a completion is received for this memory.
+         */
+        clear_bit(chunk, block->unregister_bitmap);
+
+        if (test_bit(chunk, block->transit_bitmap)) {
+            DDPRINTF("Cannot unregister inflight chunk: %" PRIu64 "\n", chunk);
+            continue;
+        }
+
+        DDPRINTF("Sending unregister for chunk: %" PRIu64 "\n", chunk);
+
+        ret = ibv_dereg_mr(block->pmr[chunk]);
+        block->pmr[chunk] = NULL;
+        block->remote_keys[chunk] = 0;
+
+        if (ret != 0) {
+            perror("unregistration chunk failed");
+            return -ret;
+        }
+        rdma->total_registrations--;
+
+        reg.key.chunk = chunk;
+        register_to_network(&reg);
+        ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
+                                &resp, NULL, NULL);
+        if (ret < 0) {
+            return ret;
+        }
+
+        DDPRINTF("Unregister for chunk: %" PRIu64 " complete.\n", chunk);
+    }
+
+    return 0;
+}
+
+static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
+                                         uint64_t chunk)
+{
+    uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
+
+    result |= (index << RDMA_WRID_BLOCK_SHIFT);
+    result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
+
+    return result;
+}
+
+/*
+ * Set bit for unregistration in the next iteration.
+ * We cannot transmit right here, but will unpin later.
+ */
+static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
+                                        uint64_t chunk, uint64_t wr_id)
+{
+    if (rdma->unregistrations[rdma->unregister_next] != 0) {
+        fprintf(stderr, "rdma migration: queue is full!\n");
+    } else {
+        RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
+
+        if (!test_and_set_bit(chunk, block->unregister_bitmap)) {
+            DDPRINTF("Appending unregister chunk %" PRIu64
+                    " at position %d\n", chunk, rdma->unregister_next);
+
+            rdma->unregistrations[rdma->unregister_next++] =
+                    qemu_rdma_make_wrid(wr_id, index, chunk);
+
+            if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) {
+                rdma->unregister_next = 0;
+            }
+        } else {
+            DDPRINTF("Unregister chunk %" PRIu64 " already in queue.\n",
+                    chunk);
+        }
+    }
+}
+
+/*
+ * Consult the connection manager to see a work request
+ * (of any kind) has completed.
+ * Return the work request ID that completed.
+ */
+static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out)
+{
+    int ret;
+    struct ibv_wc wc;
+    uint64_t wr_id;
+
+    ret = ibv_poll_cq(rdma->cq, 1, &wc);
+
+    if (!ret) {
+        *wr_id_out = RDMA_WRID_NONE;
+        return 0;
+    }
+
+    if (ret < 0) {
+        fprintf(stderr, "ibv_poll_cq return %d!\n", ret);
+        return ret;
+    }
+
+    wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
+
+    if (wc.status != IBV_WC_SUCCESS) {
+        fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
+                        wc.status, ibv_wc_status_str(wc.status));
+        fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
+
+        return -1;
+    }
+
+    if (rdma->control_ready_expected &&
+        (wr_id >= RDMA_WRID_RECV_CONTROL)) {
+        DDDPRINTF("completion %s #%" PRId64 " received (%" PRId64 ")"
+                  " left %d\n", wrid_desc[RDMA_WRID_RECV_CONTROL],
+                  wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
+        rdma->control_ready_expected = 0;
+    }
+
+    if (wr_id == RDMA_WRID_RDMA_WRITE) {
+        uint64_t chunk =
+            (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
+        uint64_t index =
+            (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
+        RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
+
+        DDDPRINTF("completions %s (%" PRId64 ") left %d, "
+                 "block %" PRIu64 ", chunk: %" PRIu64 " %p %p\n",
+                 print_wrid(wr_id), wr_id, rdma->nb_sent, index, chunk,
+                 block->local_host_addr, (void *)block->remote_host_addr);
+
+        clear_bit(chunk, block->transit_bitmap);
+
+        if (rdma->nb_sent > 0) {
+            rdma->nb_sent--;
+        }
+
+        if (!rdma->pin_all) {
+            /*
+             * FYI: If one wanted to signal a specific chunk to be unregistered
+             * using LRU or workload-specific information, this is the function
+             * you would call to do so. That chunk would then get asynchronously
+             * unregistered later.
+             */
+#ifdef RDMA_UNREGISTRATION_EXAMPLE
+            qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id);
+#endif
+        }
+    } else {
+        DDDPRINTF("other completion %s (%" PRId64 ") received left %d\n",
+            print_wrid(wr_id), wr_id, rdma->nb_sent);
+    }
+
+    *wr_id_out = wc.wr_id;
+
+    return  0;
+}
+
+/*
+ * Block until the next work request has completed.
+ *
+ * First poll to see if a work request has already completed,
+ * otherwise block.
+ *
+ * If we encounter completed work requests for IDs other than
+ * the one we're interested in, then that's generally an error.
+ *
+ * The only exception is actual RDMA Write completions. These
+ * completions only need to be recorded, but do not actually
+ * need further processing.
+ */
+static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested)
+{
+    int num_cq_events = 0, ret = 0;
+    struct ibv_cq *cq;
+    void *cq_ctx;
+    uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
+
+    if (ibv_req_notify_cq(rdma->cq, 0)) {
+        return -1;
+    }
+    /* poll cq first */
+    while (wr_id != wrid_requested) {
+        ret = qemu_rdma_poll(rdma, &wr_id_in);
+        if (ret < 0) {
+            return ret;
+        }
+
+        wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
+
+        if (wr_id == RDMA_WRID_NONE) {
+            break;
+        }
+        if (wr_id != wrid_requested) {
+            DDDPRINTF("A Wanted wrid %s (%d) but got %s (%" PRIu64 ")\n",
+                print_wrid(wrid_requested),
+                wrid_requested, print_wrid(wr_id), wr_id);
+        }
+    }
+
+    if (wr_id == wrid_requested) {
+        return 0;
+    }
+
+    while (1) {
+        /*
+         * Coroutine doesn't start until process_incoming_migration()
+         * so don't yield unless we know we're running inside of a coroutine.
+         */
+        if (rdma->migration_started_on_destination) {
+            yield_until_fd_readable(rdma->comp_channel->fd);
+        }
+
+        if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) {
+            perror("ibv_get_cq_event");
+            goto err_block_for_wrid;
+        }
+
+        num_cq_events++;
+
+        if (ibv_req_notify_cq(cq, 0)) {
+            goto err_block_for_wrid;
+        }
+
+        while (wr_id != wrid_requested) {
+            ret = qemu_rdma_poll(rdma, &wr_id_in);
+            if (ret < 0) {
+                goto err_block_for_wrid;
+            }
+
+            wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
+
+            if (wr_id == RDMA_WRID_NONE) {
+                break;
+            }
+            if (wr_id != wrid_requested) {
+                DDDPRINTF("B Wanted wrid %s (%d) but got %s (%" PRIu64 ")\n",
+                    print_wrid(wrid_requested), wrid_requested,
+                    print_wrid(wr_id), wr_id);
+            }
+        }
+
+        if (wr_id == wrid_requested) {
+            goto success_block_for_wrid;
+        }
+    }
+
+success_block_for_wrid:
+    if (num_cq_events) {
+        ibv_ack_cq_events(cq, num_cq_events);
+    }
+    return 0;
+
+err_block_for_wrid:
+    if (num_cq_events) {
+        ibv_ack_cq_events(cq, num_cq_events);
+    }
+    return ret;
+}
+
+/*
+ * Post a SEND message work request for the control channel
+ * containing some data and block until the post completes.
+ */
+static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
+                                       RDMAControlHeader *head)
+{
+    int ret = 0;
+    RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_MAX];
+    struct ibv_send_wr *bad_wr;
+    struct ibv_sge sge = {
+                           .addr = (uint64_t)(wr->control),
+                           .length = head->len + sizeof(RDMAControlHeader),
+                           .lkey = wr->control_mr->lkey,
+                         };
+    struct ibv_send_wr send_wr = {
+                                   .wr_id = RDMA_WRID_SEND_CONTROL,
+                                   .opcode = IBV_WR_SEND,
+                                   .send_flags = IBV_SEND_SIGNALED,
+                                   .sg_list = &sge,
+                                   .num_sge = 1,
+                                };
+
+    DDDPRINTF("CONTROL: sending %s..\n", control_desc[head->type]);
+
+    /*
+     * We don't actually need to do a memcpy() in here if we used
+     * the "sge" properly, but since we're only sending control messages
+     * (not RAM in a performance-critical path), then its OK for now.
+     *
+     * The copy makes the RDMAControlHeader simpler to manipulate
+     * for the time being.
+     */
+    memcpy(wr->control, head, sizeof(RDMAControlHeader));
+    control_to_network((void *) wr->control);
+
+    if (buf) {
+        memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
+    }
+
+
+    if (ibv_post_send(rdma->qp, &send_wr, &bad_wr)) {
+        return -1;
+    }
+
+    if (ret < 0) {
+        fprintf(stderr, "Failed to use post IB SEND for control!\n");
+        return ret;
+    }
+
+    ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL);
+    if (ret < 0) {
+        fprintf(stderr, "rdma migration: send polling control error!\n");
+    }
+
+    return ret;
+}
+
+/*
+ * Post a RECV work request in anticipation of some future receipt
+ * of data on the control channel.
+ */
+static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
+{
+    struct ibv_recv_wr *bad_wr;
+    struct ibv_sge sge = {
+                            .addr = (uint64_t)(rdma->wr_data[idx].control),
+                            .length = RDMA_CONTROL_MAX_BUFFER,
+                            .lkey = rdma->wr_data[idx].control_mr->lkey,
+                         };
+
+    struct ibv_recv_wr recv_wr = {
+                                    .wr_id = RDMA_WRID_RECV_CONTROL + idx,
+                                    .sg_list = &sge,
+                                    .num_sge = 1,
+                                 };
+
+
+    if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
+        return -1;
+    }
+
+    return 0;
+}
+
+/*
+ * Block and wait for a RECV control channel message to arrive.
+ */
+static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
+                RDMAControlHeader *head, int expecting, int idx)
+{
+    int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx);
+
+    if (ret < 0) {
+        fprintf(stderr, "rdma migration: recv polling control error!\n");
+        return ret;
+    }
+
+    network_to_control((void *) rdma->wr_data[idx].control);
+    memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
+
+    DDDPRINTF("CONTROL: %s receiving...\n", control_desc[expecting]);
+
+    if (expecting == RDMA_CONTROL_NONE) {
+        DDDPRINTF("Surprise: got %s (%d)\n",
+                  control_desc[head->type], head->type);
+    } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
+        fprintf(stderr, "Was expecting a %s (%d) control message"
+                ", but got: %s (%d), length: %d\n",
+                control_desc[expecting], expecting,
+                control_desc[head->type], head->type, head->len);
+        return -EIO;
+    }
+
+    return 0;
+}
+
+/*
+ * When a RECV work request has completed, the work request's
+ * buffer is pointed at the header.
+ *
+ * This will advance the pointer to the data portion
+ * of the control message of the work request's buffer that
+ * was populated after the work request finished.
+ */
+static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
+                                  RDMAControlHeader *head)
+{
+    rdma->wr_data[idx].control_len = head->len;
+    rdma->wr_data[idx].control_curr =
+        rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
+}
+
+/*
+ * This is an 'atomic' high-level operation to deliver a single, unified
+ * control-channel message.
+ *
+ * Additionally, if the user is expecting some kind of reply to this message,
+ * they can request a 'resp' response message be filled in by posting an
+ * additional work request on behalf of the user and waiting for an additional
+ * completion.
+ *
+ * The extra (optional) response is used during registration to us from having
+ * to perform an *additional* exchange of message just to provide a response by
+ * instead piggy-backing on the acknowledgement.
+ */
+static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
+                                   uint8_t *data, RDMAControlHeader *resp,
+                                   int *resp_idx,
+                                   int (*callback)(RDMAContext *rdma))
+{
+    int ret = 0;
+
+    /*
+     * Wait until the dest is ready before attempting to deliver the message
+     * by waiting for a READY message.
+     */
+    if (rdma->control_ready_expected) {
+        RDMAControlHeader resp;
+        ret = qemu_rdma_exchange_get_response(rdma,
+                                    &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
+        if (ret < 0) {
+            return ret;
+        }
+    }
+
+    /*
+     * If the user is expecting a response, post a WR in anticipation of it.
+     */
+    if (resp) {
+        ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
+        if (ret) {
+            fprintf(stderr, "rdma migration: error posting"
+                    " extra control recv for anticipated result!");
+            return ret;
+        }
+    }
+
+    /*
+     * Post a WR to replace the one we just consumed for the READY message.
+     */
+    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
+    if (ret) {
+        fprintf(stderr, "rdma migration: error posting first control recv!");
+        return ret;
+    }
+
+    /*
+     * Deliver the control message that was requested.
+     */
+    ret = qemu_rdma_post_send_control(rdma, data, head);
+
+    if (ret < 0) {
+        fprintf(stderr, "Failed to send control buffer!\n");
+        return ret;
+    }
+
+    /*
+     * If we're expecting a response, block and wait for it.
+     */
+    if (resp) {
+        if (callback) {
+            DDPRINTF("Issuing callback before receiving response...\n");
+            ret = callback(rdma);
+            if (ret < 0) {
+                return ret;
+            }
+        }
+
+        DDPRINTF("Waiting for response %s\n", control_desc[resp->type]);
+        ret = qemu_rdma_exchange_get_response(rdma, resp,
+                                              resp->type, RDMA_WRID_DATA);
+
+        if (ret < 0) {
+            return ret;
+        }
+
+        qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
+        if (resp_idx) {
+            *resp_idx = RDMA_WRID_DATA;
+        }
+        DDPRINTF("Response %s received.\n", control_desc[resp->type]);
+    }
+
+    rdma->control_ready_expected = 1;
+
+    return 0;
+}
+
+/*
+ * This is an 'atomic' high-level operation to receive a single, unified
+ * control-channel message.
+ */
+static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
+                                int expecting)
+{
+    RDMAControlHeader ready = {
+                                .len = 0,
+                                .type = RDMA_CONTROL_READY,
+                                .repeat = 1,
+                              };
+    int ret;
+
+    /*
+     * Inform the source that we're ready to receive a message.
+     */
+    ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
+
+    if (ret < 0) {
+        fprintf(stderr, "Failed to send control buffer!\n");
+        return ret;
+    }
+
+    /*
+     * Block and wait for the message.
+     */
+    ret = qemu_rdma_exchange_get_response(rdma, head,
+                                          expecting, RDMA_WRID_READY);
+
+    if (ret < 0) {
+        return ret;
+    }
+
+    qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
+
+    /*
+     * Post a new RECV work request to replace the one we just consumed.
+     */
+    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
+    if (ret) {
+        fprintf(stderr, "rdma migration: error posting second control recv!");
+        return ret;
+    }
+
+    return 0;
+}
+
+/*
+ * Write an actual chunk of memory using RDMA.
+ *
+ * If we're using dynamic registration on the dest-side, we have to
+ * send a registration command first.
+ */
+static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
+                               int current_index, uint64_t current_addr,
+                               uint64_t length)
+{
+    struct ibv_sge sge;
+    struct ibv_send_wr send_wr = { 0 };
+    struct ibv_send_wr *bad_wr;
+    int reg_result_idx, ret, count = 0;
+    uint64_t chunk, chunks;
+    uint8_t *chunk_start, *chunk_end;
+    RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
+    RDMARegister reg;
+    RDMARegisterResult *reg_result;
+    RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
+    RDMAControlHeader head = { .len = sizeof(RDMARegister),
+                               .type = RDMA_CONTROL_REGISTER_REQUEST,
+                               .repeat = 1,
+                             };
+
+retry:
+    sge.addr = (uint64_t)(block->local_host_addr +
+                            (current_addr - block->offset));
+    sge.length = length;
+
+    chunk = ram_chunk_index(block->local_host_addr, (uint8_t *) sge.addr);
+    chunk_start = ram_chunk_start(block, chunk);
+
+    if (block->is_ram_block) {
+        chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
+
+        if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
+            chunks--;
+        }
+    } else {
+        chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
+
+        if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
+            chunks--;
+        }
+    }
+
+    DDPRINTF("Writing %" PRIu64 " chunks, (%" PRIu64 " MB)\n",
+        chunks + 1, (chunks + 1) * (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
+
+    chunk_end = ram_chunk_end(block, chunk + chunks);
+
+    if (!rdma->pin_all) {
+#ifdef RDMA_UNREGISTRATION_EXAMPLE
+        qemu_rdma_unregister_waiting(rdma);
+#endif
+    }
+
+    while (test_bit(chunk, block->transit_bitmap)) {
+        (void)count;
+        DDPRINTF("(%d) Not clobbering: block: %d chunk %" PRIu64
+                " current %" PRIu64 " len %" PRIu64 " %d %d\n",
+                count++, current_index, chunk,
+                sge.addr, length, rdma->nb_sent, block->nb_chunks);
+
+        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE);
+
+        if (ret < 0) {
+            fprintf(stderr, "Failed to Wait for previous write to complete "
+                    "block %d chunk %" PRIu64
+                    " current %" PRIu64 " len %" PRIu64 " %d\n",
+                    current_index, chunk, sge.addr, length, rdma->nb_sent);
+            return ret;
+        }
+    }
+
+    if (!rdma->pin_all || !block->is_ram_block) {
+        if (!block->remote_keys[chunk]) {
+            /*
+             * This chunk has not yet been registered, so first check to see
+             * if the entire chunk is zero. If so, tell the other size to
+             * memset() + madvise() the entire chunk without RDMA.
+             */
+
+            if (can_use_buffer_find_nonzero_offset((void *)sge.addr, length)
+                   && buffer_find_nonzero_offset((void *)sge.addr,
+                                                    length) == length) {
+                RDMACompress comp = {
+                                        .offset = current_addr,
+                                        .value = 0,
+                                        .block_idx = current_index,
+                                        .length = length,
+                                    };
+
+                head.len = sizeof(comp);
+                head.type = RDMA_CONTROL_COMPRESS;
+
+                DDPRINTF("Entire chunk is zero, sending compress: %"
+                    PRIu64 " for %d "
+                    "bytes, index: %d, offset: %" PRId64 "...\n",
+                    chunk, sge.length, current_index, current_addr);
+
+                compress_to_network(&comp);
+                ret = qemu_rdma_exchange_send(rdma, &head,
+                                (uint8_t *) &comp, NULL, NULL, NULL);
+
+                if (ret < 0) {
+                    return -EIO;
+                }
+
+                acct_update_position(f, sge.length, true);
+
+                return 1;
+            }
+
+            /*
+             * Otherwise, tell other side to register.
+             */
+            reg.current_index = current_index;
+            if (block->is_ram_block) {
+                reg.key.current_addr = current_addr;
+            } else {
+                reg.key.chunk = chunk;
+            }
+            reg.chunks = chunks;
+
+            DDPRINTF("Sending registration request chunk %" PRIu64 " for %d "
+                    "bytes, index: %d, offset: %" PRId64 "...\n",
+                    chunk, sge.length, current_index, current_addr);
+
+            register_to_network(&reg);
+            ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
+                                    &resp, &reg_result_idx, NULL);
+            if (ret < 0) {
+                return ret;
+            }
+
+            /* try to overlap this single registration with the one we sent. */
+            if (qemu_rdma_register_and_get_keys(rdma, block,
+                                                (uint8_t *) sge.addr,
+                                                &sge.lkey, NULL, chunk,
+                                                chunk_start, chunk_end)) {
+                fprintf(stderr, "cannot get lkey!\n");
+                return -EINVAL;
+            }
+
+            reg_result = (RDMARegisterResult *)
+                    rdma->wr_data[reg_result_idx].control_curr;
+
+            network_to_result(reg_result);
+
+            DDPRINTF("Received registration result:"
+                    " my key: %x their key %x, chunk %" PRIu64 "\n",
+                    block->remote_keys[chunk], reg_result->rkey, chunk);
+
+            block->remote_keys[chunk] = reg_result->rkey;
+            block->remote_host_addr = reg_result->host_addr;
+        } else {
+            /* already registered before */
+            if (qemu_rdma_register_and_get_keys(rdma, block,
+                                                (uint8_t *)sge.addr,
+                                                &sge.lkey, NULL, chunk,
+                                                chunk_start, chunk_end)) {
+                fprintf(stderr, "cannot get lkey!\n");
+                return -EINVAL;
+            }
+        }
+
+        send_wr.wr.rdma.rkey = block->remote_keys[chunk];
+    } else {
+        send_wr.wr.rdma.rkey = block->remote_rkey;
+
+        if (qemu_rdma_register_and_get_keys(rdma, block, (uint8_t *)sge.addr,
+                                                     &sge.lkey, NULL, chunk,
+                                                     chunk_start, chunk_end)) {
+            fprintf(stderr, "cannot get lkey!\n");
+            return -EINVAL;
+        }
+    }
+
+    /*
+     * Encode the ram block index and chunk within this wrid.
+     * We will use this information at the time of completion
+     * to figure out which bitmap to check against and then which
+     * chunk in the bitmap to look for.
+     */
+    send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
+                                        current_index, chunk);
+
+    send_wr.opcode = IBV_WR_RDMA_WRITE;
+    send_wr.send_flags = IBV_SEND_SIGNALED;
+    send_wr.sg_list = &sge;
+    send_wr.num_sge = 1;
+    send_wr.wr.rdma.remote_addr = block->remote_host_addr +
+                                (current_addr - block->offset);
+
+    DDDPRINTF("Posting chunk: %" PRIu64 ", addr: %lx"
+              " remote: %lx, bytes %" PRIu32 "\n",
+              chunk, sge.addr, send_wr.wr.rdma.remote_addr,
+              sge.length);
+
+    /*
+     * ibv_post_send() does not return negative error numbers,
+     * per the specification they are positive - no idea why.
+     */
+    ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
+
+    if (ret == ENOMEM) {
+        DDPRINTF("send queue is full. wait a little....\n");
+        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE);
+        if (ret < 0) {
+            fprintf(stderr, "rdma migration: failed to make "
+                            "room in full send queue! %d\n", ret);
+            return ret;
+        }
+
+        goto retry;
+
+    } else if (ret > 0) {
+        perror("rdma migration: post rdma write failed");
+        return -ret;
+    }
+
+    set_bit(chunk, block->transit_bitmap);
+    acct_update_position(f, sge.length, false);
+    rdma->total_writes++;
+
+    return 0;
+}
+
+/*
+ * Push out any unwritten RDMA operations.
+ *
+ * We support sending out multiple chunks at the same time.
+ * Not all of them need to get signaled in the completion queue.
+ */
+static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
+{
+    int ret;
+
+    if (!rdma->current_length) {
+        return 0;
+    }
+
+    ret = qemu_rdma_write_one(f, rdma,
+            rdma->current_index, rdma->current_addr, rdma->current_length);
+
+    if (ret < 0) {
+        return ret;
+    }
+
+    if (ret == 0) {
+        rdma->nb_sent++;
+        DDDPRINTF("sent total: %d\n", rdma->nb_sent);
+    }
+
+    rdma->current_length = 0;
+    rdma->current_addr = 0;
+
+    return 0;
+}
+
+static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
+                    uint64_t offset, uint64_t len)
+{
+    RDMALocalBlock *block =
+        &(rdma->local_ram_blocks.block[rdma->current_index]);
+    uint8_t *host_addr = block->local_host_addr + (offset - block->offset);
+    uint8_t *chunk_end = ram_chunk_end(block, rdma->current_chunk);
+
+    if (rdma->current_length == 0) {
+        return 0;
+    }
+
+    /*
+     * Only merge into chunk sequentially.
+     */
+    if (offset != (rdma->current_addr + rdma->current_length)) {
+        return 0;
+    }
+
+    if (rdma->current_index < 0) {
+        return 0;
+    }
+
+    if (offset < block->offset) {
+        return 0;
+    }
+
+    if ((offset + len) > (block->offset + block->length)) {
+        return 0;
+    }
+
+    if (rdma->current_chunk < 0) {
+        return 0;
+    }
+
+    if ((host_addr + len) > chunk_end) {
+        return 0;
+    }
+
+    return 1;
+}
+
+/*
+ * We're not actually writing here, but doing three things:
+ *
+ * 1. Identify the chunk the buffer belongs to.
+ * 2. If the chunk is full or the buffer doesn't belong to the current
+ *    chunk, then start a new chunk and flush() the old chunk.
+ * 3. To keep the hardware busy, we also group chunks into batches
+ *    and only require that a batch gets acknowledged in the completion
+ *    qeueue instead of each individual chunk.
+ */
+static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
+                           uint64_t block_offset, uint64_t offset,
+                           uint64_t len)
+{
+    uint64_t current_addr = block_offset + offset;
+    uint64_t index = rdma->current_index;
+    uint64_t chunk = rdma->current_chunk;
+    int ret;
+
+    /* If we cannot merge it, we flush the current buffer first. */
+    if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
+        ret = qemu_rdma_write_flush(f, rdma);
+        if (ret) {
+            return ret;
+        }
+        rdma->current_length = 0;
+        rdma->current_addr = current_addr;
+
+        ret = qemu_rdma_search_ram_block(rdma, block_offset,
+                                         offset, len, &index, &chunk);
+        if (ret) {
+            fprintf(stderr, "ram block search failed\n");
+            return ret;
+        }
+        rdma->current_index = index;
+        rdma->current_chunk = chunk;
+    }
+
+    /* merge it */
+    rdma->current_length += len;
+
+    /* flush it if buffer is too large */
+    if (rdma->current_length >= RDMA_MERGE_MAX) {
+        return qemu_rdma_write_flush(f, rdma);
+    }
+
+    return 0;
+}
+
+static void qemu_rdma_cleanup(RDMAContext *rdma)
+{
+    struct rdma_cm_event *cm_event;
+    int ret, idx;
+
+    if (rdma->cm_id) {
+        if (rdma->error_state) {
+            RDMAControlHeader head = { .len = 0,
+                                       .type = RDMA_CONTROL_ERROR,
+                                       .repeat = 1,
+                                     };
+            fprintf(stderr, "Early error. Sending error.\n");
+            qemu_rdma_post_send_control(rdma, NULL, &head);
+        }
+
+        ret = rdma_disconnect(rdma->cm_id);
+        if (!ret) {
+            DDPRINTF("waiting for disconnect\n");
+            ret = rdma_get_cm_event(rdma->channel, &cm_event);
+            if (!ret) {
+                rdma_ack_cm_event(cm_event);
+            }
+        }
+        DDPRINTF("Disconnected.\n");
+        rdma->cm_id = NULL;
+    }
+
+    g_free(rdma->block);
+    rdma->block = NULL;
+
+    for (idx = 0; idx <= RDMA_WRID_MAX; idx++) {
+        if (rdma->wr_data[idx].control_mr) {
+            rdma->total_registrations--;
+            ibv_dereg_mr(rdma->wr_data[idx].control_mr);
+        }
+        rdma->wr_data[idx].control_mr = NULL;
+    }
+
+    if (rdma->local_ram_blocks.block) {
+        while (rdma->local_ram_blocks.nb_blocks) {
+            __qemu_rdma_delete_block(rdma,
+                    rdma->local_ram_blocks.block->offset);
+        }
+    }
+
+    if (rdma->qp) {
+        ibv_destroy_qp(rdma->qp);
+        rdma->qp = NULL;
+    }
+    if (rdma->cq) {
+        ibv_destroy_cq(rdma->cq);
+        rdma->cq = NULL;
+    }
+    if (rdma->comp_channel) {
+        ibv_destroy_comp_channel(rdma->comp_channel);
+        rdma->comp_channel = NULL;
+    }
+    if (rdma->pd) {
+        ibv_dealloc_pd(rdma->pd);
+        rdma->pd = NULL;
+    }
+    if (rdma->listen_id) {
+        rdma_destroy_id(rdma->listen_id);
+        rdma->listen_id = NULL;
+    }
+    if (rdma->cm_id) {
+        rdma_destroy_id(rdma->cm_id);
+        rdma->cm_id = NULL;
+    }
+    if (rdma->channel) {
+        rdma_destroy_event_channel(rdma->channel);
+        rdma->channel = NULL;
+    }
+}
+
+
+static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all)
+{
+    int ret, idx;
+    Error *local_err = NULL, **temp = &local_err;
+
+    /*
+     * Will be validated against destination's actual capabilities
+     * after the connect() completes.
+     */
+    rdma->pin_all = pin_all;
+
+    ret = qemu_rdma_resolve_host(rdma, temp);
+    if (ret) {
+        goto err_rdma_source_init;
+    }
+
+    ret = qemu_rdma_alloc_pd_cq(rdma);
+    if (ret) {
+        ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
+                    " limits may be too low. Please check $ ulimit -a # and "
+                    "search for 'ulimit -l' in the output\n");
+        goto err_rdma_source_init;
+    }
+
+    ret = qemu_rdma_alloc_qp(rdma);
+    if (ret) {
+        ERROR(temp, "rdma migration: error allocating qp!\n");
+        goto err_rdma_source_init;
+    }
+
+    ret = qemu_rdma_init_ram_blocks(rdma);
+    if (ret) {
+        ERROR(temp, "rdma migration: error initializing ram blocks!\n");
+        goto err_rdma_source_init;
+    }
+
+    for (idx = 0; idx <= RDMA_WRID_MAX; idx++) {
+        ret = qemu_rdma_reg_control(rdma, idx);
+        if (ret) {
+            ERROR(temp, "rdma migration: error registering %d control!\n",
+                                                            idx);
+            goto err_rdma_source_init;
+        }
+    }
+
+    return 0;
+
+err_rdma_source_init:
+    error_propagate(errp, local_err);
+    qemu_rdma_cleanup(rdma);
+    return -1;
+}
+
+static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
+{
+    RDMACapabilities cap = {
+                                .version = RDMA_CONTROL_VERSION_CURRENT,
+                                .flags = 0,
+                           };
+    struct rdma_conn_param conn_param = { .initiator_depth = 2,
+                                          .retry_count = 5,
+                                          .private_data = &cap,
+                                          .private_data_len = sizeof(cap),
+                                        };
+    struct rdma_cm_event *cm_event;
+    int ret;
+
+    /*
+     * Only negotiate the capability with destination if the user
+     * on the source first requested the capability.
+     */
+    if (rdma->pin_all) {
+        DPRINTF("Server pin-all memory requested.\n");
+        cap.flags |= RDMA_CAPABILITY_PIN_ALL;
+    }
+
+    caps_to_network(&cap);
+
+    ret = rdma_connect(rdma->cm_id, &conn_param);
+    if (ret) {
+        perror("rdma_connect");
+        ERROR(errp, "connecting to destination!\n");
+        rdma_destroy_id(rdma->cm_id);
+        rdma->cm_id = NULL;
+        goto err_rdma_source_connect;
+    }
+
+    ret = rdma_get_cm_event(rdma->channel, &cm_event);
+    if (ret) {
+        perror("rdma_get_cm_event after rdma_connect");
+        ERROR(errp, "connecting to destination!\n");
+        rdma_ack_cm_event(cm_event);
+        rdma_destroy_id(rdma->cm_id);
+        rdma->cm_id = NULL;
+        goto err_rdma_source_connect;
+    }
+
+    if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
+        perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
+        ERROR(errp, "connecting to destination!\n");
+        rdma_ack_cm_event(cm_event);
+        rdma_destroy_id(rdma->cm_id);
+        rdma->cm_id = NULL;
+        goto err_rdma_source_connect;
+    }
+
+    memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
+    network_to_caps(&cap);
+
+    /*
+     * Verify that the *requested* capabilities are supported by the destination
+     * and disable them otherwise.
+     */
+    if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
+        ERROR(errp, "Server cannot support pinning all memory. "
+                        "Will register memory dynamically.\n");
+        rdma->pin_all = false;
+    }
+
+    DPRINTF("Pin all memory: %s\n", rdma->pin_all ? "enabled" : "disabled");
+
+    rdma_ack_cm_event(cm_event);
+
+    ret = qemu_rdma_post_recv_control(rdma, 0);
+    if (ret) {
+        ERROR(errp, "posting second control recv!\n");
+        goto err_rdma_source_connect;
+    }
+
+    rdma->control_ready_expected = 1;
+    rdma->nb_sent = 0;
+    return 0;
+
+err_rdma_source_connect:
+    qemu_rdma_cleanup(rdma);
+    return -1;
+}
+
+static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
+{
+    int ret = -EINVAL, idx;
+    struct sockaddr_in sin;
+    struct rdma_cm_id *listen_id;
+    char ip[40] = "unknown";
+
+    for (idx = 0; idx <= RDMA_WRID_MAX; idx++) {
+        rdma->wr_data[idx].control_len = 0;
+        rdma->wr_data[idx].control_curr = NULL;
+    }
+
+    if (rdma->host == NULL) {
+        ERROR(errp, "RDMA host is not set!\n");
+        rdma->error_state = -EINVAL;
+        return -1;
+    }
+    /* create CM channel */
+    rdma->channel = rdma_create_event_channel();
+    if (!rdma->channel) {
+        ERROR(errp, "could not create rdma event channel\n");
+        rdma->error_state = -EINVAL;
+        return -1;
+    }
+
+    /* create CM id */
+    ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
+    if (ret) {
+        ERROR(errp, "could not create cm_id!\n");
+        goto err_dest_init_create_listen_id;
+    }
+
+    memset(&sin, 0, sizeof(sin));
+    sin.sin_family = AF_INET;
+    sin.sin_port = htons(rdma->port);
+
+    if (rdma->host && strcmp("", rdma->host)) {
+        struct hostent *dest_addr;
+        dest_addr = gethostbyname(rdma->host);
+        if (!dest_addr) {
+            ERROR(errp, "migration could not gethostbyname!\n");
+            ret = -EINVAL;
+            goto err_dest_init_bind_addr;
+        }
+        memcpy(&sin.sin_addr.s_addr, dest_addr->h_addr,
+                dest_addr->h_length);
+        inet_ntop(AF_INET, dest_addr->h_addr, ip, sizeof ip);
+    } else {
+        sin.sin_addr.s_addr = INADDR_ANY;
+    }
+
+    DPRINTF("%s => %s\n", rdma->host, ip);
+
+    ret = rdma_bind_addr(listen_id, (struct sockaddr *)&sin);
+    if (ret) {
+        ERROR(errp, "Error: could not rdma_bind_addr!\n");
+        goto err_dest_init_bind_addr;
+    }
+
+    rdma->listen_id = listen_id;
+    qemu_rdma_dump_gid("dest_init", listen_id);
+    return 0;
+
+err_dest_init_bind_addr:
+    rdma_destroy_id(listen_id);
+err_dest_init_create_listen_id:
+    rdma_destroy_event_channel(rdma->channel);
+    rdma->channel = NULL;
+    rdma->error_state = ret;
+    return ret;
+
+}
+
+static void *qemu_rdma_data_init(const char *host_port, Error **errp)
+{
+    RDMAContext *rdma = NULL;
+    InetSocketAddress *addr;
+
+    if (host_port) {
+        rdma = g_malloc0(sizeof(RDMAContext));
+        memset(rdma, 0, sizeof(RDMAContext));
+        rdma->current_index = -1;
+        rdma->current_chunk = -1;
+
+        addr = inet_parse(host_port, NULL);
+        if (addr != NULL) {
+            rdma->port = atoi(addr->port);
+            rdma->host = g_strdup(addr->host);
+        } else {
+            ERROR(errp, "bad RDMA migration address '%s'", host_port);
+            g_free(rdma);
+            return NULL;
+        }
+    }
+
+    return rdma;
+}
+
+/*
+ * QEMUFile interface to the control channel.
+ * SEND messages for control only.
+ * pc.ram is handled with regular RDMA messages.
+ */
+static int qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
+                                int64_t pos, int size)
+{
+    QEMUFileRDMA *r = opaque;
+    QEMUFile *f = r->file;
+    RDMAContext *rdma = r->rdma;
+    size_t remaining = size;
+    uint8_t * data = (void *) buf;
+    int ret;
+
+    CHECK_ERROR_STATE();
+
+    /*
+     * Push out any writes that
+     * we're queued up for pc.ram.
+     */
+    ret = qemu_rdma_write_flush(f, rdma);
+    if (ret < 0) {
+        rdma->error_state = ret;
+        return ret;
+    }
+
+    while (remaining) {
+        RDMAControlHeader head;
+
+        r->len = MIN(remaining, RDMA_SEND_INCREMENT);
+        remaining -= r->len;
+
+        head.len = r->len;
+        head.type = RDMA_CONTROL_QEMU_FILE;
+
+        ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
+
+        if (ret < 0) {
+            rdma->error_state = ret;
+            return ret;
+        }
+
+        data += r->len;
+    }
+
+    return size;
+}
+
+static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
+                             int size, int idx)
+{
+    size_t len = 0;
+
+    if (rdma->wr_data[idx].control_len) {
+        DDDPRINTF("RDMA %" PRId64 " of %d bytes already in buffer\n",
+                    rdma->wr_data[idx].control_len, size);
+
+        len = MIN(size, rdma->wr_data[idx].control_len);
+        memcpy(buf, rdma->wr_data[idx].control_curr, len);
+        rdma->wr_data[idx].control_curr += len;
+        rdma->wr_data[idx].control_len -= len;
+    }
+
+    return len;
+}
+
+/*
+ * QEMUFile interface to the control channel.
+ * RDMA links don't use bytestreams, so we have to
+ * return bytes to QEMUFile opportunistically.
+ */
+static int qemu_rdma_get_buffer(void *opaque, uint8_t *buf,
+                                int64_t pos, int size)
+{
+    QEMUFileRDMA *r = opaque;
+    RDMAContext *rdma = r->rdma;
+    RDMAControlHeader head;
+    int ret = 0;
+
+    CHECK_ERROR_STATE();
+
+    /*
+     * First, we hold on to the last SEND message we
+     * were given and dish out the bytes until we run
+     * out of bytes.
+     */
+    r->len = qemu_rdma_fill(r->rdma, buf, size, 0);
+    if (r->len) {
+        return r->len;
+    }
+
+    /*
+     * Once we run out, we block and wait for another
+     * SEND message to arrive.
+     */
+    ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
+
+    if (ret < 0) {
+        rdma->error_state = ret;
+        return ret;
+    }
+
+    /*
+     * SEND was received with new bytes, now try again.
+     */
+    return qemu_rdma_fill(r->rdma, buf, size, 0);
+}
+
+/*
+ * Block until all the outstanding chunks have been delivered by the hardware.
+ */
+static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
+{
+    int ret;
+
+    if (qemu_rdma_write_flush(f, rdma) < 0) {
+        return -EIO;
+    }
+
+    while (rdma->nb_sent) {
+        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE);
+        if (ret < 0) {
+            fprintf(stderr, "rdma migration: complete polling error!\n");
+            return -EIO;
+        }
+    }
+
+    qemu_rdma_unregister_waiting(rdma);
+
+    return 0;
+}
+
+static int qemu_rdma_close(void *opaque)
+{
+    DPRINTF("Shutting down connection.\n");
+    QEMUFileRDMA *r = opaque;
+    if (r->rdma) {
+        qemu_rdma_cleanup(r->rdma);
+        g_free(r->rdma);
+    }
+    g_free(r);
+    return 0;
+}
+
+/*
+ * Parameters:
+ *    @offset == 0 :
+ *        This means that 'block_offset' is a full virtual address that does not
+ *        belong to a RAMBlock of the virtual machine and instead
+ *        represents a private malloc'd memory area that the caller wishes to
+ *        transfer.
+ *
+ *    @offset != 0 :
+ *        Offset is an offset to be added to block_offset and used
+ *        to also lookup the corresponding RAMBlock.
+ *
+ *    @size > 0 :
+ *        Initiate an transfer this size.
+ *
+ *    @size == 0 :
+ *        A 'hint' or 'advice' that means that we wish to speculatively
+ *        and asynchronously unregister this memory. In this case, there is no
+ *        gaurantee that the unregister will actually happen, for example,
+ *        if the memory is being actively transmitted. Additionally, the memory
+ *        may be re-registered at any future time if a write within the same
+ *        chunk was requested again, even if you attempted to unregister it
+ *        here.
+ *
+ *    @size < 0 : TODO, not yet supported
+ *        Unregister the memory NOW. This means that the caller does not
+ *        expect there to be any future RDMA transfers and we just want to clean
+ *        things up. This is used in case the upper layer owns the memory and
+ *        cannot wait for qemu_fclose() to occur.
+ *
+ *    @bytes_sent : User-specificed pointer to indicate how many bytes were
+ *                  sent. Usually, this will not be more than a few bytes of
+ *                  the protocol because most transfers are sent asynchronously.
+ */
+static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
+                                  ram_addr_t block_offset, ram_addr_t offset,
+                                  size_t size, int *bytes_sent)
+{
+    QEMUFileRDMA *rfile = opaque;
+    RDMAContext *rdma = rfile->rdma;
+    int ret;
+
+    CHECK_ERROR_STATE();
+
+    qemu_fflush(f);
+
+    if (size > 0) {
+        /*
+         * Add this page to the current 'chunk'. If the chunk
+         * is full, or the page doen't belong to the current chunk,
+         * an actual RDMA write will occur and a new chunk will be formed.
+         */
+        ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
+        if (ret < 0) {
+            fprintf(stderr, "rdma migration: write error! %d\n", ret);
+            goto err;
+        }
+
+        /*
+         * We always return 1 bytes because the RDMA
+         * protocol is completely asynchronous. We do not yet know
+         * whether an  identified chunk is zero or not because we're
+         * waiting for other pages to potentially be merged with
+         * the current chunk. So, we have to call qemu_update_position()
+         * later on when the actual write occurs.
+         */
+        if (bytes_sent) {
+            *bytes_sent = 1;
+        }
+    } else {
+        uint64_t index, chunk;
+
+        /* TODO: Change QEMUFileOps prototype to be signed: size_t => long
+        if (size < 0) {
+            ret = qemu_rdma_drain_cq(f, rdma);
+            if (ret < 0) {
+                fprintf(stderr, "rdma: failed to synchronously drain"
+                                " completion queue before unregistration.\n");
+                goto err;
+            }
+        }
+        */
+
+        ret = qemu_rdma_search_ram_block(rdma, block_offset,
+                                         offset, size, &index, &chunk);
+
+        if (ret) {
+            fprintf(stderr, "ram block search failed\n");
+            goto err;
+        }
+
+        qemu_rdma_signal_unregister(rdma, index, chunk, 0);
+
+        /*
+         * TODO: Synchronous, gauranteed unregistration (should not occur during
+         * fast-path). Otherwise, unregisters will process on the next call to
+         * qemu_rdma_drain_cq()
+        if (size < 0) {
+            qemu_rdma_unregister_waiting(rdma);
+        }
+        */
+    }
+
+    /*
+     * Drain the Completion Queue if possible, but do not block,
+     * just poll.
+     *
+     * If nothing to poll, the end of the iteration will do this
+     * again to make sure we don't overflow the request queue.
+     */
+    while (1) {
+        uint64_t wr_id, wr_id_in;
+        int ret = qemu_rdma_poll(rdma, &wr_id_in);
+        if (ret < 0) {
+            fprintf(stderr, "rdma migration: polling error! %d\n", ret);
+            goto err;
+        }
+
+        wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
+
+        if (wr_id == RDMA_WRID_NONE) {
+            break;
+        }
+    }
+
+    return RAM_SAVE_CONTROL_DELAYED;
+err:
+    rdma->error_state = ret;
+    return ret;
+}
+
+static int qemu_rdma_accept(RDMAContext *rdma)
+{
+    RDMACapabilities cap;
+    struct rdma_conn_param conn_param = {
+                                            .responder_resources = 2,
+                                            .private_data = &cap,
+                                            .private_data_len = sizeof(cap),
+                                         };
+    struct rdma_cm_event *cm_event;
+    struct ibv_context *verbs;
+    int ret = -EINVAL;
+    int idx;
+
+    ret = rdma_get_cm_event(rdma->channel, &cm_event);
+    if (ret) {
+        goto err_rdma_dest_wait;
+    }
+
+    if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
+        rdma_ack_cm_event(cm_event);
+        goto err_rdma_dest_wait;
+    }
+
+    memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
+
+    network_to_caps(&cap);
+
+    if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
+            fprintf(stderr, "Unknown source RDMA version: %d, bailing...\n",
+                            cap.version);
+            rdma_ack_cm_event(cm_event);
+            goto err_rdma_dest_wait;
+    }
+
+    /*
+     * Respond with only the capabilities this version of QEMU knows about.
+     */
+    cap.flags &= known_capabilities;
+
+    /*
+     * Enable the ones that we do know about.
+     * Add other checks here as new ones are introduced.
+     */
+    if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
+        rdma->pin_all = true;
+    }
+
+    rdma->cm_id = cm_event->id;
+    verbs = cm_event->id->verbs;
+
+    rdma_ack_cm_event(cm_event);
+
+    DPRINTF("Memory pin all: %s\n", rdma->pin_all ? "enabled" : "disabled");
+
+    caps_to_network(&cap);
+
+    DPRINTF("verbs context after listen: %p\n", verbs);
+
+    if (!rdma->verbs) {
+        rdma->verbs = verbs;
+    } else if (rdma->verbs != verbs) {
+            fprintf(stderr, "ibv context not matching %p, %p!\n",
+                    rdma->verbs, verbs);
+            goto err_rdma_dest_wait;
+    }
+
+    qemu_rdma_dump_id("dest_init", verbs);
+
+    ret = qemu_rdma_alloc_pd_cq(rdma);
+    if (ret) {
+        fprintf(stderr, "rdma migration: error allocating pd and cq!\n");
+        goto err_rdma_dest_wait;
+    }
+
+    ret = qemu_rdma_alloc_qp(rdma);
+    if (ret) {
+        fprintf(stderr, "rdma migration: error allocating qp!\n");
+        goto err_rdma_dest_wait;
+    }
+
+    ret = qemu_rdma_init_ram_blocks(rdma);
+    if (ret) {
+        fprintf(stderr, "rdma migration: error initializing ram blocks!\n");
+        goto err_rdma_dest_wait;
+    }
+
+    for (idx = 0; idx <= RDMA_WRID_MAX; idx++) {
+        ret = qemu_rdma_reg_control(rdma, idx);
+        if (ret) {
+            fprintf(stderr, "rdma: error registering %d control!\n", idx);
+            goto err_rdma_dest_wait;
+        }
+    }
+
+    qemu_set_fd_handler2(rdma->channel->fd, NULL, NULL, NULL, NULL);
+
+    ret = rdma_accept(rdma->cm_id, &conn_param);
+    if (ret) {
+        fprintf(stderr, "rdma_accept returns %d!\n", ret);
+        goto err_rdma_dest_wait;
+    }
+
+    ret = rdma_get_cm_event(rdma->channel, &cm_event);
+    if (ret) {
+        fprintf(stderr, "rdma_accept get_cm_event failed %d!\n", ret);
+        goto err_rdma_dest_wait;
+    }
+
+    if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
+        fprintf(stderr, "rdma_accept not event established!\n");
+        rdma_ack_cm_event(cm_event);
+        goto err_rdma_dest_wait;
+    }
+
+    rdma_ack_cm_event(cm_event);
+
+    ret = qemu_rdma_post_recv_control(rdma, 0);
+    if (ret) {
+        fprintf(stderr, "rdma migration: error posting second control recv!\n");
+        goto err_rdma_dest_wait;
+    }
+
+    qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
+
+    return 0;
+
+err_rdma_dest_wait:
+    rdma->error_state = ret;
+    qemu_rdma_cleanup(rdma);
+    return ret;
+}
+
+/*
+ * During each iteration of the migration, we listen for instructions
+ * by the source VM to perform dynamic page registrations before they
+ * can perform RDMA operations.
+ *
+ * We respond with the 'rkey'.
+ *
+ * Keep doing this until the source tells us to stop.
+ */
+static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
+                                         uint64_t flags)
+{
+    RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
+                               .type = RDMA_CONTROL_REGISTER_RESULT,
+                               .repeat = 0,
+                             };
+    RDMAControlHeader unreg_resp = { .len = 0,
+                               .type = RDMA_CONTROL_UNREGISTER_FINISHED,
+                               .repeat = 0,
+                             };
+    RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
+                                 .repeat = 1 };
+    QEMUFileRDMA *rfile = opaque;
+    RDMAContext *rdma = rfile->rdma;
+    RDMALocalBlocks *local = &rdma->local_ram_blocks;
+    RDMAControlHeader head;
+    RDMARegister *reg, *registers;
+    RDMACompress *comp;
+    RDMARegisterResult *reg_result;
+    static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
+    RDMALocalBlock *block;
+    void *host_addr;
+    int ret = 0;
+    int idx = 0;
+    int count = 0;
+    int i = 0;
+
+    CHECK_ERROR_STATE();
+
+    do {
+        DDDPRINTF("Waiting for next request %" PRIu64 "...\n", flags);
+
+        ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
+
+        if (ret < 0) {
+            break;
+        }
+
+        if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
+            fprintf(stderr, "rdma: Too many requests in this message (%d)."
+                            "Bailing.\n", head.repeat);
+            ret = -EIO;
+            break;
+        }
+
+        switch (head.type) {
+        case RDMA_CONTROL_COMPRESS:
+            comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
+            network_to_compress(comp);
+
+            DDPRINTF("Zapping zero chunk: %" PRId64
+                    " bytes, index %d, offset %" PRId64 "\n",
+                    comp->length, comp->block_idx, comp->offset);
+            block = &(rdma->local_ram_blocks.block[comp->block_idx]);
+
+            host_addr = block->local_host_addr +
+                            (comp->offset - block->offset);
+
+            ram_handle_compressed(host_addr, comp->value, comp->length);
+            break;
+
+        case RDMA_CONTROL_REGISTER_FINISHED:
+            DDDPRINTF("Current registrations complete.\n");
+            goto out;
+
+        case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
+            DPRINTF("Initial setup info requested.\n");
+
+            if (rdma->pin_all) {
+                ret = qemu_rdma_reg_whole_ram_blocks(rdma);
+                if (ret) {
+                    fprintf(stderr, "rdma migration: error dest "
+                                    "registering ram blocks!\n");
+                    goto out;
+                }
+            }
+
+            /*
+             * Dest uses this to prepare to transmit the RAMBlock descriptions
+             * to the source VM after connection setup.
+             * Both sides use the "remote" structure to communicate and update
+             * their "local" descriptions with what was sent.
+             */
+            for (i = 0; i < local->nb_blocks; i++) {
+                rdma->block[i].remote_host_addr =
+                    (uint64_t)(local->block[i].local_host_addr);
+
+                if (rdma->pin_all) {
+                    rdma->block[i].remote_rkey = local->block[i].mr->rkey;
+                }
+
+                rdma->block[i].offset = local->block[i].offset;
+                rdma->block[i].length = local->block[i].length;
+
+                remote_block_to_network(&rdma->block[i]);
+            }
+
+            blocks.len = rdma->local_ram_blocks.nb_blocks
+                                                * sizeof(RDMARemoteBlock);
+
+
+            ret = qemu_rdma_post_send_control(rdma,
+                                        (uint8_t *) rdma->block, &blocks);
+
+            if (ret < 0) {
+                fprintf(stderr, "rdma migration: error sending remote info!\n");
+                goto out;
+            }
+
+            break;
+        case RDMA_CONTROL_REGISTER_REQUEST:
+            DDPRINTF("There are %d registration requests\n", head.repeat);
+
+            reg_resp.repeat = head.repeat;
+            registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
+
+            for (count = 0; count < head.repeat; count++) {
+                uint64_t chunk;
+                uint8_t *chunk_start, *chunk_end;
+
+                reg = &registers[count];
+                network_to_register(reg);
+
+                reg_result = &results[count];
+
+                DDPRINTF("Registration request (%d): index %d, current_addr %"
+                         PRIu64 " chunks: %" PRIu64 "\n", count,
+                         reg->current_index, reg->key.current_addr, reg->chunks);
+
+                block = &(rdma->local_ram_blocks.block[reg->current_index]);
+                if (block->is_ram_block) {
+                    host_addr = (block->local_host_addr +
+                                (reg->key.current_addr - block->offset));
+                    chunk = ram_chunk_index(block->local_host_addr,
+                                            (uint8_t *) host_addr);
+                } else {
+                    chunk = reg->key.chunk;
+                    host_addr = block->local_host_addr +
+                        (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
+                }
+                chunk_start = ram_chunk_start(block, chunk);
+                chunk_end = ram_chunk_end(block, chunk + reg->chunks);
+                if (qemu_rdma_register_and_get_keys(rdma, block,
+                            (uint8_t *)host_addr, NULL, &reg_result->rkey,
+                            chunk, chunk_start, chunk_end)) {
+                    fprintf(stderr, "cannot get rkey!\n");
+                    ret = -EINVAL;
+                    goto out;
+                }
+
+                reg_result->host_addr = (uint64_t) block->local_host_addr;
+
+                DDPRINTF("Registered rkey for this request: %x\n",
+                                reg_result->rkey);
+
+                result_to_network(reg_result);
+            }
+
+            ret = qemu_rdma_post_send_control(rdma,
+                            (uint8_t *) results, &reg_resp);
+
+            if (ret < 0) {
+                fprintf(stderr, "Failed to send control buffer!\n");
+                goto out;
+            }
+            break;
+        case RDMA_CONTROL_UNREGISTER_REQUEST:
+            DDPRINTF("There are %d unregistration requests\n", head.repeat);
+            unreg_resp.repeat = head.repeat;
+            registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
+
+            for (count = 0; count < head.repeat; count++) {
+                reg = &registers[count];
+                network_to_register(reg);
+
+                DDPRINTF("Unregistration request (%d): "
+                         " index %d, chunk %" PRIu64 "\n",
+                         count, reg->current_index, reg->key.chunk);
+
+                block = &(rdma->local_ram_blocks.block[reg->current_index]);
+
+                ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
+                block->pmr[reg->key.chunk] = NULL;
+
+                if (ret != 0) {
+                    perror("rdma unregistration chunk failed");
+                    ret = -ret;
+                    goto out;
+                }
+
+                rdma->total_registrations--;
+
+                DDPRINTF("Unregistered chunk %" PRIu64 " successfully.\n",
+                            reg->key.chunk);
+            }
+
+            ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
+
+            if (ret < 0) {
+                fprintf(stderr, "Failed to send control buffer!\n");
+                goto out;
+            }
+            break;
+        case RDMA_CONTROL_REGISTER_RESULT:
+            fprintf(stderr, "Invalid RESULT message at dest.\n");
+            ret = -EIO;
+            goto out;
+        default:
+            fprintf(stderr, "Unknown control message %s\n",
+                                control_desc[head.type]);
+            ret = -EIO;
+            goto out;
+        }
+    } while (1);
+out:
+    if (ret < 0) {
+        rdma->error_state = ret;
+    }
+    return ret;
+}
+
+static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
+                                        uint64_t flags)
+{
+    QEMUFileRDMA *rfile = opaque;
+    RDMAContext *rdma = rfile->rdma;
+
+    CHECK_ERROR_STATE();
+
+    DDDPRINTF("start section: %" PRIu64 "\n", flags);
+    qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
+    qemu_fflush(f);
+
+    return 0;
+}
+
+/*
+ * Inform dest that dynamic registrations are done for now.
+ * First, flush writes, if any.
+ */
+static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
+                                       uint64_t flags)
+{
+    Error *local_err = NULL, **errp = &local_err;
+    QEMUFileRDMA *rfile = opaque;
+    RDMAContext *rdma = rfile->rdma;
+    RDMAControlHeader head = { .len = 0, .repeat = 1 };
+    int ret = 0;
+
+    CHECK_ERROR_STATE();
+
+    qemu_fflush(f);
+    ret = qemu_rdma_drain_cq(f, rdma);
+
+    if (ret < 0) {
+        goto err;
+    }
+
+    if (flags == RAM_CONTROL_SETUP) {
+        RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
+        RDMALocalBlocks *local = &rdma->local_ram_blocks;
+        int reg_result_idx, i, j, nb_remote_blocks;
+
+        head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
+        DPRINTF("Sending registration setup for ram blocks...\n");
+
+        /*
+         * Make sure that we parallelize the pinning on both sides.
+         * For very large guests, doing this serially takes a really
+         * long time, so we have to 'interleave' the pinning locally
+         * with the control messages by performing the pinning on this
+         * side before we receive the control response from the other
+         * side that the pinning has completed.
+         */
+        ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
+                    &reg_result_idx, rdma->pin_all ?
+                    qemu_rdma_reg_whole_ram_blocks : NULL);
+        if (ret < 0) {
+            ERROR(errp, "receiving remote info!\n");
+            return ret;
+        }
+
+        qemu_rdma_move_header(rdma, reg_result_idx, &resp);
+        memcpy(rdma->block,
+            rdma->wr_data[reg_result_idx].control_curr, resp.len);
+
+        nb_remote_blocks = resp.len / sizeof(RDMARemoteBlock);
+
+        /*
+         * The protocol uses two different sets of rkeys (mutually exclusive):
+         * 1. One key to represent the virtual address of the entire ram block.
+         *    (dynamic chunk registration disabled - pin everything with one rkey.)
+         * 2. One to represent individual chunks within a ram block.
+         *    (dynamic chunk registration enabled - pin individual chunks.)
+         *
+         * Once the capability is successfully negotiated, the destination transmits
+         * the keys to use (or sends them later) including the virtual addresses
+         * and then propagates the remote ram block descriptions to his local copy.
+         */
+
+        if (local->nb_blocks != nb_remote_blocks) {
+            ERROR(errp, "ram blocks mismatch #1! "
+                        "Your QEMU command line parameters are probably "
+                        "not identical on both the source and destination.\n");
+            return -EINVAL;
+        }
+
+        for (i = 0; i < nb_remote_blocks; i++) {
+            network_to_remote_block(&rdma->block[i]);
+
+            /* search local ram blocks */
+            for (j = 0; j < local->nb_blocks; j++) {
+                if (rdma->block[i].offset != local->block[j].offset) {
+                    continue;
+                }
+
+                if (rdma->block[i].length != local->block[j].length) {
+                    ERROR(errp, "ram blocks mismatch #2! "
+                        "Your QEMU command line parameters are probably "
+                        "not identical on both the source and destination.\n");
+                    return -EINVAL;
+                }
+                local->block[j].remote_host_addr =
+                        rdma->block[i].remote_host_addr;
+                local->block[j].remote_rkey = rdma->block[i].remote_rkey;
+                break;
+            }
+
+            if (j >= local->nb_blocks) {
+                ERROR(errp, "ram blocks mismatch #3! "
+                        "Your QEMU command line parameters are probably "
+                        "not identical on both the source and destination.\n");
+                return -EINVAL;
+            }
+        }
+    }
+
+    DDDPRINTF("Sending registration finish %" PRIu64 "...\n", flags);
+
+    head.type = RDMA_CONTROL_REGISTER_FINISHED;
+    ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
+
+    if (ret < 0) {
+        goto err;
+    }
+
+    return 0;
+err:
+    rdma->error_state = ret;
+    return ret;
+}
+
+static int qemu_rdma_get_fd(void *opaque)
+{
+    QEMUFileRDMA *rfile = opaque;
+    RDMAContext *rdma = rfile->rdma;
+
+    return rdma->comp_channel->fd;
+}
+
+const QEMUFileOps rdma_read_ops = {
+    .get_buffer    = qemu_rdma_get_buffer,
+    .get_fd        = qemu_rdma_get_fd,
+    .close         = qemu_rdma_close,
+    .hook_ram_load = qemu_rdma_registration_handle,
+};
+
+const QEMUFileOps rdma_write_ops = {
+    .put_buffer         = qemu_rdma_put_buffer,
+    .close              = qemu_rdma_close,
+    .before_ram_iterate = qemu_rdma_registration_start,
+    .after_ram_iterate  = qemu_rdma_registration_stop,
+    .save_page          = qemu_rdma_save_page,
+};
+
+static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
+{
+    QEMUFileRDMA *r = g_malloc0(sizeof(QEMUFileRDMA));
+
+    if (qemu_file_mode_is_not_valid(mode)) {
+        return NULL;
+    }
+
+    r->rdma = rdma;
+
+    if (mode[0] == 'w') {
+        r->file = qemu_fopen_ops(r, &rdma_write_ops);
+    } else {
+        r->file = qemu_fopen_ops(r, &rdma_read_ops);
+    }
+
+    return r->file;
+}
+
+static void rdma_accept_incoming_migration(void *opaque)
+{
+    RDMAContext *rdma = opaque;
+    int ret;
+    QEMUFile *f;
+    Error *local_err = NULL, **errp = &local_err;
+
+    DPRINTF("Accepting rdma connection...\n");
+    ret = qemu_rdma_accept(rdma);
+
+    if (ret) {
+        ERROR(errp, "RDMA Migration initialization failed!\n");
+        return;
+    }
+
+    DPRINTF("Accepted migration\n");
+
+    f = qemu_fopen_rdma(rdma, "rb");
+    if (f == NULL) {
+        ERROR(errp, "could not qemu_fopen_rdma!\n");
+        qemu_rdma_cleanup(rdma);
+        return;
+    }
+
+    rdma->migration_started_on_destination = 1;
+    process_incoming_migration(f);
+}
+
+void rdma_start_incoming_migration(const char *host_port, Error **errp)
+{
+    int ret;
+    RDMAContext *rdma;
+    Error *local_err = NULL;
+
+    DPRINTF("Starting RDMA-based incoming migration\n");
+    rdma = qemu_rdma_data_init(host_port, &local_err);
+
+    if (rdma == NULL) {
+        goto err;
+    }
+
+    ret = qemu_rdma_dest_init(rdma, &local_err);
+
+    if (ret) {
+        goto err;
+    }
+
+    DPRINTF("qemu_rdma_dest_init success\n");
+
+    ret = rdma_listen(rdma->listen_id, 5);
+
+    if (ret) {
+        ERROR(errp, "listening on socket!\n");
+        goto err;
+    }
+
+    DPRINTF("rdma_listen success\n");
+
+    qemu_set_fd_handler2(rdma->channel->fd, NULL,
+                         rdma_accept_incoming_migration, NULL,
+                            (void *)(intptr_t) rdma);
+    return;
+err:
+    error_propagate(errp, local_err);
+    g_free(rdma);
+}
+
+void rdma_start_outgoing_migration(void *opaque,
+                            const char *host_port, Error **errp)
+{
+    MigrationState *s = opaque;
+    Error *local_err = NULL, **temp = &local_err;
+    RDMAContext *rdma = qemu_rdma_data_init(host_port, &local_err);
+    int ret = 0;
+
+    if (rdma == NULL) {
+        ERROR(temp, "Failed to initialize RDMA data structures! %d\n", ret);
+        goto err;
+    }
+
+    ret = qemu_rdma_source_init(rdma, &local_err,
+        s->enabled_capabilities[MIGRATION_CAPABILITY_X_RDMA_PIN_ALL]);
+
+    if (ret) {
+        goto err;
+    }
+
+    DPRINTF("qemu_rdma_source_init success\n");
+    ret = qemu_rdma_connect(rdma, &local_err);
+
+    if (ret) {
+        goto err;
+    }
+
+    DPRINTF("qemu_rdma_source_connect success\n");
+
+    s->file = qemu_fopen_rdma(rdma, "wb");
+    migrate_fd_connect(s);
+    return;
+err:
+    error_propagate(errp, local_err);
+    g_free(rdma);
+    migrate_fd_error(s);
+}
diff --git a/migration.c b/migration.c
index 635a7e7..2bfe245 100644
--- a/migration.c
+++ b/migration.c
@@ -78,6 +78,10 @@ void qemu_start_incoming_migration(const char *uri, Error **errp)
 
     if (strstart(uri, "tcp:", &p))
         tcp_start_incoming_migration(p, errp);
+#ifdef CONFIG_RDMA
+    else if (strstart(uri, "x-rdma:", &p))
+        rdma_start_incoming_migration(p, errp);
+#endif
 #if !defined(WIN32)
     else if (strstart(uri, "exec:", &p))
         exec_start_incoming_migration(p, errp);
@@ -406,6 +410,10 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk,
 
     if (strstart(uri, "tcp:", &p)) {
         tcp_start_outgoing_migration(s, p, &local_err);
+#ifdef CONFIG_RDMA
+    } else if (strstart(uri, "x-rdma:", &p)) {
+        rdma_start_outgoing_migration(s, p, &local_err);
+#endif
 #if !defined(WIN32)
     } else if (strstart(uri, "exec:", &p)) {
         exec_start_outgoing_migration(s, p, &local_err);
-- 
1.7.10.4

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [Qemu-devel] [PATCH v3 resend 5/8] rdma: send pc.ram
  2013-07-16 16:48 [Qemu-devel] [PATCH v3 resend 0/8] rdma: core logic mrhines
                   ` (3 preceding siblings ...)
  2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 4/8] rdma: core logic mrhines
@ 2013-07-16 16:48 ` mrhines
  2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 6/8] rdma: allow state transitions between other states besides ACTIVE mrhines
                   ` (2 subsequent siblings)
  7 siblings, 0 replies; 13+ messages in thread
From: mrhines @ 2013-07-16 16:48 UTC (permalink / raw)
  To: qemu-devel
  Cc: aliguori, quintela, knoel, owasserm, abali, mrhines, gokul,
	pbonzini, chegu_vinod

From: "Michael R. Hines" <mrhines@us.ibm.com>

This takes advantages of the previous patches:

1. use the new QEMUFileOps hook 'save_page'

2. call out to the right accessor methods to invoke
   the iteration hooks defined in QEMUFileOps

Reviewed-by: Paolo Bonzini <pbonzini@redhat.com>
Reviewed-by: Chegu Vinod <chegu_vinod@hp.com>
Tested-by: Chegu Vinod <chegu_vinod@hp.com>
Tested-by: Michael R. Hines <mrhines@us.ibm.com>
Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
---
 arch_init.c |   33 ++++++++++++++++++++++++++++++++-
 1 file changed, 32 insertions(+), 1 deletion(-)

diff --git a/arch_init.c b/arch_init.c
index 6a96bbd..17b0c0f 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -115,6 +115,7 @@ const uint32_t arch_type = QEMU_ARCH;
 #define RAM_SAVE_FLAG_EOS      0x10
 #define RAM_SAVE_FLAG_CONTINUE 0x20
 #define RAM_SAVE_FLAG_XBZRLE   0x40
+/* 0x80 is reserved in migration.h start with 0x100 next */
 
 
 static struct defconfig_file {
@@ -447,6 +448,7 @@ static int ram_save_block(QEMUFile *f, bool last_stage)
                 ram_bulk_stage = false;
             }
         } else {
+            int ret;
             uint8_t *p;
             int cont = (block == last_sent_block) ?
                 RAM_SAVE_FLAG_CONTINUE : 0;
@@ -455,7 +457,18 @@ static int ram_save_block(QEMUFile *f, bool last_stage)
 
             /* In doubt sent page as normal */
             bytes_sent = -1;
-            if (is_zero_page(p)) {
+            ret = ram_control_save_page(f, block->offset,
+                               offset, TARGET_PAGE_SIZE, &bytes_sent);
+
+            if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
+                if (ret != RAM_SAVE_CONTROL_DELAYED) {
+                    if (bytes_sent > 0) {
+                        acct_info.norm_pages++;
+                    } else if (bytes_sent == 0) {
+                        acct_info.dup_pages++;
+                    }
+                }
+            } else if (is_zero_page(p)) {
                 acct_info.dup_pages++;
                 bytes_sent = save_block_hdr(f, block, offset, cont,
                                             RAM_SAVE_FLAG_COMPRESS);
@@ -605,6 +618,10 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
     }
 
     qemu_mutex_unlock_ramlist();
+
+    ram_control_before_iterate(f, RAM_CONTROL_SETUP);
+    ram_control_after_iterate(f, RAM_CONTROL_SETUP);
+
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
 
     return 0;
@@ -623,6 +640,8 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
         reset_ram_globals();
     }
 
+    ram_control_before_iterate(f, RAM_CONTROL_ROUND);
+
     t0 = qemu_get_clock_ns(rt_clock);
     i = 0;
     while ((ret = qemu_file_rate_limit(f)) == 0) {
@@ -653,6 +672,12 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
 
     qemu_mutex_unlock_ramlist();
 
+    /*
+     * Must occur before EOS (or any QEMUFile operation)
+     * because of RDMA protocol.
+     */
+    ram_control_after_iterate(f, RAM_CONTROL_ROUND);
+
     if (ret < 0) {
         bytes_transferred += total_sent;
         return ret;
@@ -670,6 +695,8 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
     qemu_mutex_lock_ramlist();
     migration_bitmap_sync();
 
+    ram_control_before_iterate(f, RAM_CONTROL_FINISH);
+
     /* try transferring iterative blocks of memory */
 
     /* flush all remaining blocks regardless of rate limiting */
@@ -683,6 +710,8 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
         }
         bytes_transferred += bytes_sent;
     }
+
+    ram_control_after_iterate(f, RAM_CONTROL_FINISH);
     migration_end();
 
     qemu_mutex_unlock_ramlist();
@@ -886,6 +915,8 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
                 ret = -EINVAL;
                 goto done;
             }
+        } else if (flags & RAM_SAVE_FLAG_HOOK) {
+            ram_control_load_hook(f, flags);
         }
         error = qemu_file_get_error(f);
         if (error) {
-- 
1.7.10.4

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [Qemu-devel] [PATCH v3 resend 6/8] rdma: allow state transitions between other states besides ACTIVE
  2013-07-16 16:48 [Qemu-devel] [PATCH v3 resend 0/8] rdma: core logic mrhines
                   ` (4 preceding siblings ...)
  2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 5/8] rdma: send pc.ram mrhines
@ 2013-07-16 16:48 ` mrhines
  2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 7/8] rdma: introduce MIG_STATE_NONE and change MIG_STATE_SETUP state transition mrhines
  2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 8/8] rdma: account for the time spent in MIG_STATE_SETUP through QMP mrhines
  7 siblings, 0 replies; 13+ messages in thread
From: mrhines @ 2013-07-16 16:48 UTC (permalink / raw)
  To: qemu-devel
  Cc: aliguori, quintela, knoel, owasserm, abali, mrhines, gokul,
	pbonzini, chegu_vinod

From: "Michael R. Hines" <mrhines@us.ibm.com>

This patch is in preparation for the next ones: Until now the MIG_STATE_SETUP
state was not really a 'formal' state. It has been used as a 'zero' state
and QEMU has been unconditionally transitioning into this state when
the QMP migrate command was called. In preparation for timing this state,
we have to make this state a a 'real' state which actually gets transitioned
from later in the migration_thread() from SETUP => ACTIVE, rather than just
automatically dropping into this state at the beginninig of the migration.

This means that the state transition function (migration_finish_set_state())
needs to be capable of transitioning from valid states _other_ than just
MIG_STATE_ACTIVE.

The function is in fact already capable of doing that, but was not allowing the
old state to be a parameter specified as an input.

This patch fixes that and only makes the transition if the current state
matches the old state that the caller intended to transition from.

Reviewed-by: Juan Quintela <quintela@redhat.com>
Tested-by: Michael R. Hines <mrhines@us.ibm.com>
Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
---
 migration.c |   10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/migration.c b/migration.c
index 2bfe245..d212871 100644
--- a/migration.c
+++ b/migration.c
@@ -295,9 +295,9 @@ static void migrate_fd_cleanup(void *opaque)
     notifier_list_notify(&migration_state_notifiers, s);
 }
 
-static void migrate_finish_set_state(MigrationState *s, int new_state)
+static void migrate_set_state(MigrationState *s, int old_state, int new_state)
 {
-    if (atomic_cmpxchg(&s->state, MIG_STATE_ACTIVE, new_state) == new_state) {
+    if (atomic_cmpxchg(&s->state, old_state, new_state) == new_state) {
         trace_migrate_set_state(new_state);
     }
 }
@@ -315,7 +315,7 @@ static void migrate_fd_cancel(MigrationState *s)
 {
     DPRINTF("cancelling migration\n");
 
-    migrate_finish_set_state(s, MIG_STATE_CANCELLED);
+    migrate_set_state(s, s->state, MIG_STATE_CANCELLED);
 }
 
 void add_migration_state_change_notifier(Notifier *notify)
@@ -545,14 +545,14 @@ static void *migration_thread(void *opaque)
                 qemu_savevm_state_complete(s->file);
                 qemu_mutex_unlock_iothread();
                 if (!qemu_file_get_error(s->file)) {
-                    migrate_finish_set_state(s, MIG_STATE_COMPLETED);
+                    migrate_set_state(s, MIG_STATE_ACTIVE, MIG_STATE_COMPLETED);
                     break;
                 }
             }
         }
 
         if (qemu_file_get_error(s->file)) {
-            migrate_finish_set_state(s, MIG_STATE_ERROR);
+            migrate_set_state(s, MIG_STATE_ACTIVE, MIG_STATE_ERROR);
             break;
         }
         current_time = qemu_get_clock_ms(rt_clock);
-- 
1.7.10.4

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [Qemu-devel] [PATCH v3 resend 7/8] rdma: introduce MIG_STATE_NONE and change MIG_STATE_SETUP state transition
  2013-07-16 16:48 [Qemu-devel] [PATCH v3 resend 0/8] rdma: core logic mrhines
                   ` (5 preceding siblings ...)
  2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 6/8] rdma: allow state transitions between other states besides ACTIVE mrhines
@ 2013-07-16 16:48 ` mrhines
  2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 8/8] rdma: account for the time spent in MIG_STATE_SETUP through QMP mrhines
  7 siblings, 0 replies; 13+ messages in thread
From: mrhines @ 2013-07-16 16:48 UTC (permalink / raw)
  To: qemu-devel
  Cc: aliguori, quintela, knoel, owasserm, abali, mrhines, gokul,
	pbonzini, chegu_vinod

From: "Michael R. Hines" <mrhines@us.ibm.com>

As described in the previous patch, until now, the MIG_STATE_SETUP
state was not really a 'formal' state. It has been used as a 'zero' state
(what we're calling 'NONE' here) and QEMU has been unconditionally transitioning
into this state when the QMP migration command was called. Instead we want to
introduce MIG_STATE_NONE, which is our starting state in the state machine, and
then immediately transition into the MIG_STATE_SETUP state when the QMP migrate
command is issued.

In order to do this, we must delay the transition into MIG_STATE_ACTIVE until
later in the migration_thread(). This is done to be able to timestamp the amount of
time spent in the SETUP state for proper accounting to the user during
an RDMA migration.

Furthermore, the management software, until now, has never been aware of the
existence of the SETUP state whatsoever. This must change, because, timing of this
state implies that the state actually exists.

These two patches cannot be separated because the 'query_migrate' QMP
switch statement needs to know how to handle this new state transition.

Reviewed-by: Juan Quintela <quintela@redhat.com>
Tested-by: Michael R. Hines <mrhines@us.ibm.com>
Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
---
 migration.c |   21 ++++++++++++++-------
 1 file changed, 14 insertions(+), 7 deletions(-)

diff --git a/migration.c b/migration.c
index d212871..c259d93 100644
--- a/migration.c
+++ b/migration.c
@@ -36,7 +36,8 @@
 #endif
 
 enum {
-    MIG_STATE_ERROR,
+    MIG_STATE_ERROR = -1,
+    MIG_STATE_NONE,
     MIG_STATE_SETUP,
     MIG_STATE_CANCELLED,
     MIG_STATE_ACTIVE,
@@ -63,7 +64,7 @@ static NotifierList migration_state_notifiers =
 MigrationState *migrate_get_current(void)
 {
     static MigrationState current_migration = {
-        .state = MIG_STATE_SETUP,
+        .state = MIG_STATE_NONE,
         .bandwidth_limit = MAX_THROTTLE,
         .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
         .mbps = -1,
@@ -184,9 +185,13 @@ MigrationInfo *qmp_query_migrate(Error **errp)
     MigrationState *s = migrate_get_current();
 
     switch (s->state) {
-    case MIG_STATE_SETUP:
+    case MIG_STATE_NONE:
         /* no migration has happened ever */
         break;
+    case MIG_STATE_SETUP:
+        info->has_status = true;
+        info->status = g_strdup("setup");
+        break;
     case MIG_STATE_ACTIVE:
         info->has_status = true;
         info->status = g_strdup("active");
@@ -257,7 +262,7 @@ void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params,
     MigrationState *s = migrate_get_current();
     MigrationCapabilityStatusList *cap;
 
-    if (s->state == MIG_STATE_ACTIVE) {
+    if (s->state == MIG_STATE_ACTIVE || s->state == MIG_STATE_SETUP) {
         error_set(errp, QERR_MIGRATION_ACTIVE);
         return;
     }
@@ -392,7 +397,7 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk,
     params.blk = blk;
     params.shared = inc;
 
-    if (s->state == MIG_STATE_ACTIVE) {
+    if (s->state == MIG_STATE_ACTIVE || s->state == MIG_STATE_SETUP) {
         error_set(errp, QERR_MIGRATION_ACTIVE);
         return;
     }
@@ -524,6 +529,8 @@ static void *migration_thread(void *opaque)
     DPRINTF("beginning savevm\n");
     qemu_savevm_state_begin(s->file, &s->params);
 
+    migrate_set_state(s, MIG_STATE_SETUP, MIG_STATE_ACTIVE);
+
     while (s->state == MIG_STATE_ACTIVE) {
         int64_t current_time;
         uint64_t pending_size;
@@ -603,8 +610,8 @@ static void *migration_thread(void *opaque)
 
 void migrate_fd_connect(MigrationState *s)
 {
-    s->state = MIG_STATE_ACTIVE;
-    trace_migrate_set_state(MIG_STATE_ACTIVE);
+    s->state = MIG_STATE_SETUP;
+    trace_migrate_set_state(MIG_STATE_SETUP);
 
     /* This is a best 1st approximation. ns to ms */
     s->expected_downtime = max_downtime/1000000;
-- 
1.7.10.4

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [Qemu-devel] [PATCH v3 resend 8/8] rdma: account for the time spent in MIG_STATE_SETUP through QMP
  2013-07-16 16:48 [Qemu-devel] [PATCH v3 resend 0/8] rdma: core logic mrhines
                   ` (6 preceding siblings ...)
  2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 7/8] rdma: introduce MIG_STATE_NONE and change MIG_STATE_SETUP state transition mrhines
@ 2013-07-16 16:48 ` mrhines
  7 siblings, 0 replies; 13+ messages in thread
From: mrhines @ 2013-07-16 16:48 UTC (permalink / raw)
  To: qemu-devel
  Cc: aliguori, quintela, knoel, owasserm, abali, mrhines, gokul,
	pbonzini, chegu_vinod

From: "Michael R. Hines" <mrhines@us.ibm.com>

Using the previous patches, we're now able to timestamp the SETUP
state. Once we have this time, let the user know about it in the
schema.

Reviewed-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Eric Blake <eblake@redhat.com>
Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
---
 hmp.c                         |    4 ++++
 include/migration/migration.h |    1 +
 migration.c                   |    9 +++++++++
 qapi-schema.json              |    9 ++++++++-
 4 files changed, 22 insertions(+), 1 deletion(-)

diff --git a/hmp.c b/hmp.c
index 2daed43..2ac5429 100644
--- a/hmp.c
+++ b/hmp.c
@@ -164,6 +164,10 @@ void hmp_info_migrate(Monitor *mon, const QDict *qdict)
             monitor_printf(mon, "downtime: %" PRIu64 " milliseconds\n",
                            info->downtime);
         }
+        if (info->has_setup_time) {
+            monitor_printf(mon, "setup: %" PRIu64 " milliseconds\n",
+                           info->setup_time);
+        }
     }
 
     if (info->has_ram) {
diff --git a/include/migration/migration.h b/include/migration/migration.h
index b5e413a..71dbe54 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -49,6 +49,7 @@ struct MigrationState
     int64_t dirty_bytes_rate;
     bool enabled_capabilities[MIGRATION_CAPABILITY_MAX];
     int64_t xbzrle_cache_size;
+    int64_t setup_time;
 };
 
 void process_incoming_migration(QEMUFile *f);
diff --git a/migration.c b/migration.c
index c259d93..29b9864 100644
--- a/migration.c
+++ b/migration.c
@@ -191,6 +191,7 @@ MigrationInfo *qmp_query_migrate(Error **errp)
     case MIG_STATE_SETUP:
         info->has_status = true;
         info->status = g_strdup("setup");
+        info->has_total_time = false;
         break;
     case MIG_STATE_ACTIVE:
         info->has_status = true;
@@ -200,6 +201,8 @@ MigrationInfo *qmp_query_migrate(Error **errp)
             - s->total_time;
         info->has_expected_downtime = true;
         info->expected_downtime = s->expected_downtime;
+        info->has_setup_time = true;
+        info->setup_time = s->setup_time;
 
         info->has_ram = true;
         info->ram = g_malloc0(sizeof(*info->ram));
@@ -231,6 +234,8 @@ MigrationInfo *qmp_query_migrate(Error **errp)
         info->total_time = s->total_time;
         info->has_downtime = true;
         info->downtime = s->downtime;
+        info->has_setup_time = true;
+        info->setup_time = s->setup_time;
 
         info->has_ram = true;
         info->ram = g_malloc0(sizeof(*info->ram));
@@ -521,6 +526,7 @@ static void *migration_thread(void *opaque)
 {
     MigrationState *s = opaque;
     int64_t initial_time = qemu_get_clock_ms(rt_clock);
+    int64_t setup_start = qemu_get_clock_ms(host_clock);
     int64_t initial_bytes = 0;
     int64_t max_size = 0;
     int64_t start_time = initial_time;
@@ -529,8 +535,11 @@ static void *migration_thread(void *opaque)
     DPRINTF("beginning savevm\n");
     qemu_savevm_state_begin(s->file, &s->params);
 
+    s->setup_time = qemu_get_clock_ms(host_clock) - setup_start;
     migrate_set_state(s, MIG_STATE_SETUP, MIG_STATE_ACTIVE);
 
+    DPRINTF("setup complete\n");
+
     while (s->state == MIG_STATE_ACTIVE) {
         int64_t current_time;
         uint64_t pending_size;
diff --git a/qapi-schema.json b/qapi-schema.json
index b251d28..c60deb4 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -578,6 +578,12 @@
 #        expected downtime in milliseconds for the guest in last walk
 #        of the dirty bitmap. (since 1.3)
 #
+# @setup-time: #optional amount of setup time in milliseconds _before_ the
+#        iterations begin but _after_ the QMP command is issued. This is designed
+#        to provide an accounting of any activities (such as RDMA pinning) which
+#        may be expensive, but do not actually occur during the iterative
+#        migration rounds themselves. (since 1.6)
+#
 # Since: 0.14.0
 ##
 { 'type': 'MigrationInfo',
@@ -586,7 +592,8 @@
            '*xbzrle-cache': 'XBZRLECacheStats',
            '*total-time': 'int',
            '*expected-downtime': 'int',
-           '*downtime': 'int'} }
+           '*downtime': 'int',
+           '*setup-time': 'int'} }
 
 ##
 # @query-migrate
-- 
1.7.10.4

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* Re: [Qemu-devel] [PATCH v3 resend 4/8] rdma: core logic
  2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 4/8] rdma: core logic mrhines
@ 2013-07-18  7:30   ` Marcel Apfelbaum
  2013-07-18 12:07     ` Michael R. Hines
  0 siblings, 1 reply; 13+ messages in thread
From: Marcel Apfelbaum @ 2013-07-18  7:30 UTC (permalink / raw)
  To: mrhines
  Cc: aliguori, quintela, qemu-devel, owasserm, abali, mrhines, gokul,
	pbonzini, chegu_vinod, knoel

On Tue, 2013-07-16 at 12:48 -0400, mrhines@linux.vnet.ibm.com wrote:
> From: "Michael R. Hines" <mrhines@us.ibm.com>
> 
> Code that does need to be visible is kept
> well contained inside this file and this is the only
> new additional file to the entire patch.
> 
> This file includes the entire protocol and interfaces
> required to perform RDMA migration.
> 
> Also, the configure and Makefile modifications to link
> this file are included.
> 
> Full documentation is in docs/rdma.txt
> 

This patch is too big (in my opinion).
I would split it into at least 3 patches:
1. Generic RDMA code (this part can be reused by everyone who will need RDMA in the future) 
2. RDMA transfer protocol (separating this will give us possibility for optimization without touching the rest of the code)
3. Migration related code



> + */
> +#define RDMA_WRID_TYPE_SHIFT  0UL
> +#define RDMA_WRID_BLOCK_SHIFT 16UL
> +#define RDMA_WRID_CHUNK_SHIFT 30UL

If I understand correctly each RDMA write is exactly 1MB.
I think that it is crucial from the performance point of view to make this configurable.
Sometimes the time to register the MR(and unregister) on both machines may be the same as transmission of 1MB.
Bottom line, this cannot be hard-coded because it depends on the connection speed.


> +/*
> + * RDMA requires memory registration (mlock/pinning), but this is not good for
> + * overcommitment.
> + *
> + * In preparation for the future where LRU information or workload-specific
> + * writable writable working set memory access behavior is available to QEMU
> + * it would be nice to have in place the ability to UN-register/UN-pin
> + * particular memory regions from the RDMA hardware when it is determine that
> + * those regions of memory will likely not be accessed again in the near future.
> + *
> + * While we do not yet have such information right now, the following
> + * compile-time option allows us to perform a non-optimized version of this
> + * behavior.
> + *
> + * By uncommenting this option, you will cause *all* RDMA transfers to be
> + * unregistered immediately after the transfer completes on both sides of the
> + * connection. This has no effect in 'rdma-pin-all' mode, only regular mode.
> + *
> + * This will have a terrible impact on migration performance, so until future
> + * workload information or LRU information is available, do not attempt to use
> + * this feature except for basic testing.
> + */
> +//#define RDMA_UNREGISTRATION_EXAMPLE
> +
> +/*
> + * Perform a non-optimized memory unregistration after every transfer
> + * for demonsration purposes, only if pin-all is not requested.
> + *
> + * Potential optimizations:
> + * 1. Start a new thread to run this function continuously
> +        - for bit clearing
> +        - and for receipt of unregister messages
> + * 2. Use an LRU.
> + * 3. Use workload hints.
> + */

I think that if we work with chunks large enough, in such way that the time to pass 
the pages between hosts is much bigger the MR registration/unregistration, it may solve
the issue as least partially (it will not be such an impact).
After that, optimization is a good idea.


Nice work!
Marcel

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [Qemu-devel] [PATCH v3 resend 4/8] rdma: core logic
  2013-07-18  7:30   ` Marcel Apfelbaum
@ 2013-07-18 12:07     ` Michael R. Hines
  2013-07-18 12:22       ` Paolo Bonzini
  0 siblings, 1 reply; 13+ messages in thread
From: Michael R. Hines @ 2013-07-18 12:07 UTC (permalink / raw)
  To: Marcel Apfelbaum
  Cc: aliguori, quintela, qemu-devel, owasserm, abali, mrhines, gokul,
	pbonzini, chegu_vinod, knoel

On 07/18/2013 03:30 AM, Marcel Apfelbaum wrote:
> On Tue, 2013-07-16 at 12:48 -0400, mrhines@linux.vnet.ibm.com wrote:
>> From: "Michael R. Hines" <mrhines@us.ibm.com>
>>
>> Code that does need to be visible is kept
>> well contained inside this file and this is the only
>> new additional file to the entire patch.
>>
>> This file includes the entire protocol and interfaces
>> required to perform RDMA migration.
>>
>> Also, the configure and Makefile modifications to link
>> this file are included.
>>
>> Full documentation is in docs/rdma.txt
>>
> This patch is too big (in my opinion).
> I would split it into at least 3 patches:
> 1. Generic RDMA code (this part can be reused by everyone who will need RDMA in the future)
> 2. RDMA transfer protocol (separating this will give us possibility for optimization without touching the rest of the code)
> 3. Migration related code
>

Don't let the "v3" mislead you =). The patch actually *used* to look 
just like what
you described (3 different ones), but after more than a dozen reviews 
since January
I was told to join all the code into a single file by the reviewers.

>
>> + */
>> +#define RDMA_WRID_TYPE_SHIFT  0UL
>> +#define RDMA_WRID_BLOCK_SHIFT 16UL
>> +#define RDMA_WRID_CHUNK_SHIFT 30UL
> If I understand correctly each RDMA write is exactly 1MB.
> I think that it is crucial from the performance point of view to make this configurable.
> Sometimes the time to register the MR(and unregister) on both machines may be the same as transmission of 1MB.
> Bottom line, this cannot be hard-coded because it depends on the connection speed.

Yes, that's correct, but you probably missed reading the whole file 
(it's big, I know). Search for "mergable" in the patch, and you'll see 
the function.

Chunks are only 1MB *contiguous*. That means that if ram_save_block() 
does not gives us contiguous
pages up to 1MB, then we do not send 1MB - we only send what we were 
given. For example: if we were
given 0.5 of contiguous transfers to perform and then the next page 
address is far far away, then we transmit
that "chunk" immediately without worrying about waiting for anything else.


>
>
>> +/*
>> + * RDMA requires memory registration (mlock/pinning), but this is not good for
>> + * overcommitment.
>> + *
>> + * In preparation for the future where LRU information or workload-specific
>> + * writable writable working set memory access behavior is available to QEMU
>> + * it would be nice to have in place the ability to UN-register/UN-pin
>> + * particular memory regions from the RDMA hardware when it is determine that
>> + * those regions of memory will likely not be accessed again in the near future.
>> + *
>> + * While we do not yet have such information right now, the following
>> + * compile-time option allows us to perform a non-optimized version of this
>> + * behavior.
>> + *
>> + * By uncommenting this option, you will cause *all* RDMA transfers to be
>> + * unregistered immediately after the transfer completes on both sides of the
>> + * connection. This has no effect in 'rdma-pin-all' mode, only regular mode.
>> + *
>> + * This will have a terrible impact on migration performance, so until future
>> + * workload information or LRU information is available, do not attempt to use
>> + * this feature except for basic testing.
>> + */
>> +//#define RDMA_UNREGISTRATION_EXAMPLE
>> +
>> +/*
>> + * Perform a non-optimized memory unregistration after every transfer
>> + * for demonsration purposes, only if pin-all is not requested.
>> + *
>> + * Potential optimizations:
>> + * 1. Start a new thread to run this function continuously
>> +        - for bit clearing
>> +        - and for receipt of unregister messages
>> + * 2. Use an LRU.
>> + * 3. Use workload hints.
>> + */
> I think that if we work with chunks large enough, in such way that the time to pass
> the pages between hosts is much bigger the MR registration/unregistration, it may solve
> the issue as least partially (it will not be such an impact).
> After that, optimization is a good idea.
>
>
> Nice work!
> Marcel
>

Agreed. Thank you.

- Michael

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [Qemu-devel] [PATCH v3 resend 4/8] rdma: core logic
  2013-07-18 12:07     ` Michael R. Hines
@ 2013-07-18 12:22       ` Paolo Bonzini
  0 siblings, 0 replies; 13+ messages in thread
From: Paolo Bonzini @ 2013-07-18 12:22 UTC (permalink / raw)
  To: Michael R. Hines
  Cc: aliguori, quintela, knoel, qemu-devel, owasserm, abali, mrhines,
	Marcel Apfelbaum, gokul, chegu_vinod

Il 18/07/2013 14:07, Michael R. Hines ha scritto:
> On 07/18/2013 03:30 AM, Marcel Apfelbaum wrote:
>> On Tue, 2013-07-16 at 12:48 -0400, mrhines@linux.vnet.ibm.com wrote:
>>> From: "Michael R. Hines" <mrhines@us.ibm.com>
>>>
>>> Code that does need to be visible is kept
>>> well contained inside this file and this is the only
>>> new additional file to the entire patch.
>>>
>>> This file includes the entire protocol and interfaces
>>> required to perform RDMA migration.
>>>
>>> Also, the configure and Makefile modifications to link
>>> this file are included.
>>>
>>> Full documentation is in docs/rdma.txt
>>>
>> This patch is too big (in my opinion).
>> I would split it into at least 3 patches:
>> 1. Generic RDMA code (this part can be reused by everyone who will
>> need RDMA in the future)
>> 2. RDMA transfer protocol (separating this will give us possibility
>> for optimization without touching the rest of the code)
>> 3. Migration related code
>>
> 
> Don't let the "v3" mislead you =). The patch actually *used* to look
> just like what
> you described (3 different ones), but after more than a dozen reviews
> since January
> I was told to join all the code into a single file by the reviewers.

Yeah, I guess I owe some explanation to Marcel (who is not RDMA-impaired
at all).  Because the reviewers (me especially) did not know much about
RDMA, we initially concentrated on having the right interfaces between
migration-rdma.c and the migration core.

Once we had something that core developers considered to be the "right"
interfaces, Michael's original split in 1/2/3 didn't make much sense
anymore, so in the end it was easier to just merge everything in a
single patch and treat it almost as a black box.

Having generic RDMA code in a separate file would be a nice thing, but I
don't think it's absolutely necessary in order to merge this code (which
I would like to have in 1.6).  We have already done "chainsaw" passes on
files in the past, at this point I prefer "release early, release often".

Paolo

^ permalink raw reply	[flat|nested] 13+ messages in thread

* [Qemu-devel] [PATCH v3 resend 4/8] rdma: core logic
  2013-07-22 14:01 [Qemu-devel] [PATCH v3 resend 0/8] rdma: core logic mrhines
@ 2013-07-22 14:01 ` mrhines
  0 siblings, 0 replies; 13+ messages in thread
From: mrhines @ 2013-07-22 14:01 UTC (permalink / raw)
  To: qemu-devel
  Cc: aliguori, quintela, knoel, owasserm, abali, mrhines, gokul,
	pbonzini, chegu_vinod

From: "Michael R. Hines" <mrhines@us.ibm.com>

Code that does need to be visible is kept
well contained inside this file and this is the only
new additional file to the entire patch.

This file includes the entire protocol and interfaces
required to perform RDMA migration.

Also, the configure and Makefile modifications to link
this file are included.

Full documentation is in docs/rdma.txt

Reviewed-by: Paolo Bonzini <pbonzini@redhat.com>
Reviewed-by: Chegu Vinod <chegu_vinod@hp.com>
Tested-by: Chegu Vinod <chegu_vinod@hp.com>
Tested-by: Michael R. Hines <mrhines@us.ibm.com>
Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
---
 Makefile.objs                 |    1 +
 configure                     |   40 +
 include/migration/migration.h |    4 +
 migration-rdma.c              | 3249 +++++++++++++++++++++++++++++++++++++++++
 migration.c                   |    8 +
 5 files changed, 3302 insertions(+)
 create mode 100644 migration-rdma.c

diff --git a/Makefile.objs b/Makefile.objs
index 5b288ba..9928542 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -51,6 +51,7 @@ common-obj-$(CONFIG_POSIX) += os-posix.o
 common-obj-$(CONFIG_LINUX) += fsdev/
 
 common-obj-y += migration.o migration-tcp.o
+common-obj-$(CONFIG_RDMA) += migration-rdma.o
 common-obj-y += qemu-char.o #aio.o
 common-obj-y += block-migration.o
 common-obj-y += page_cache.o xbzrle.o
diff --git a/configure b/configure
index cb0f870..2e4b04a 100755
--- a/configure
+++ b/configure
@@ -180,6 +180,7 @@ xfs=""
 vhost_net="no"
 vhost_scsi="no"
 kvm="no"
+rdma=""
 gprof="no"
 debug_tcg="no"
 debug="no"
@@ -936,6 +937,10 @@ for opt do
   ;;
   --enable-gtk) gtk="yes"
   ;;
+  --enable-rdma) rdma="yes"
+  ;;
+  --disable-rdma) rdma="no"
+  ;;
   --with-gtkabi=*) gtkabi="$optarg"
   ;;
   --enable-tpm) tpm="yes"
@@ -1094,6 +1099,8 @@ echo "  --enable-bluez           enable bluez stack connectivity"
 echo "  --disable-slirp          disable SLIRP userspace network connectivity"
 echo "  --disable-kvm            disable KVM acceleration support"
 echo "  --enable-kvm             enable KVM acceleration support"
+echo "  --disable-rdma           disable RDMA-based migration support"
+echo "  --enable-rdma            enable RDMA-based migration support"
 echo "  --enable-tcg-interpreter enable TCG with bytecode interpreter (TCI)"
 echo "  --disable-nptl           disable usermode NPTL support"
 echo "  --enable-nptl            enable usermode NPTL support"
@@ -1797,6 +1804,30 @@ EOF
 fi
 
 ##########################################
+# RDMA needs OpenFabrics libraries
+if test "$rdma" != "no" ; then
+  cat > $TMPC <<EOF
+#include <rdma/rdma_cma.h>
+int main(void) { return 0; }
+EOF
+  rdma_libs="-lrdmacm -libverbs"
+  if compile_prog "" "$rdma_libs" ; then
+    rdma="yes"
+    libs_softmmu="$libs_softmmu $rdma_libs"
+  else
+    if test "$rdma" = "yes" ; then
+        error_exit \
+            " OpenFabrics librdmacm/libibverbs not present." \
+            " Your options:" \
+            "  (1) Fast: Install infiniband packages from your distro." \
+            "  (2) Cleanest: Install libraries from www.openfabrics.org" \
+            "  (3) Also: Install softiwarp if you don't have RDMA hardware"
+    fi
+    rdma="no"
+  fi
+fi
+
+##########################################
 # VNC TLS/WS detection
 if test "$vnc" = "yes" -a \( "$vnc_tls" != "no" -o "$vnc_ws" != "no" \) ; then
   cat > $TMPC <<EOF
@@ -3555,6 +3586,7 @@ echo "Linux AIO support $linux_aio"
 echo "ATTR/XATTR support $attr"
 echo "Install blobs     $blobs"
 echo "KVM support       $kvm"
+echo "RDMA support      $rdma"
 echo "TCG interpreter   $tcg_interpreter"
 echo "fdt support       $fdt"
 echo "preadv support    $preadv"
@@ -4039,6 +4071,10 @@ if test "$trace_default" = "yes"; then
   echo "CONFIG_TRACE_DEFAULT=y" >> $config_host_mak
 fi
 
+if test "$rdma" = "yes" ; then
+  echo "CONFIG_RDMA=y" >> $config_host_mak
+fi
+
 if test "$tcg_interpreter" = "yes"; then
   QEMU_INCLUDES="-I\$(SRC_PATH)/tcg/tci $QEMU_INCLUDES"
 elif test "$ARCH" = "sparc64" ; then
@@ -4478,6 +4514,10 @@ if [ "$pixman" = "internal" ]; then
   echo "config-host.h: subdir-pixman" >> $config_host_mak
 fi
 
+if test "$rdma" = "yes" ; then
+echo "CONFIG_RDMA=y" >> $config_host_mak
+fi
+
 if [ "$dtc_internal" = "yes" ]; then
   echo "config-host.h: subdir-dtc" >> $config_host_mak
 fi
diff --git a/include/migration/migration.h b/include/migration/migration.h
index 9d3cc85..b5e413a 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -77,6 +77,10 @@ void fd_start_incoming_migration(const char *path, Error **errp);
 
 void fd_start_outgoing_migration(MigrationState *s, const char *fdname, Error **errp);
 
+void rdma_start_outgoing_migration(void *opaque, const char *host_port, Error **errp);
+
+void rdma_start_incoming_migration(const char *host_port, Error **errp);
+
 void migrate_fd_error(MigrationState *s);
 
 void migrate_fd_connect(MigrationState *s);
diff --git a/migration-rdma.c b/migration-rdma.c
new file mode 100644
index 0000000..d044830
--- /dev/null
+++ b/migration-rdma.c
@@ -0,0 +1,3249 @@
+/*
+ * RDMA protocol and interfaces
+ *
+ * Copyright IBM, Corp. 2010-2013
+ *
+ * Authors:
+ *  Michael R. Hines <mrhines@us.ibm.com>
+ *  Jiuxing Liu <jl@us.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later.  See the COPYING file in the top-level directory.
+ *
+ */
+#include "qemu-common.h"
+#include "migration/migration.h"
+#include "migration/qemu-file.h"
+#include "exec/cpu-common.h"
+#include "qemu/main-loop.h"
+#include "qemu/sockets.h"
+#include "qemu/bitmap.h"
+#include "block/coroutine.h"
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <arpa/inet.h>
+#include <string.h>
+#include <rdma/rdma_cma.h>
+
+#define DEBUG_RDMA
+//#define DEBUG_RDMA_VERBOSE
+//#define DEBUG_RDMA_REALLY_VERBOSE
+
+#ifdef DEBUG_RDMA
+#define DPRINTF(fmt, ...) \
+    do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
+#else
+#define DPRINTF(fmt, ...) \
+    do { } while (0)
+#endif
+
+#ifdef DEBUG_RDMA_VERBOSE
+#define DDPRINTF(fmt, ...) \
+    do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
+#else
+#define DDPRINTF(fmt, ...) \
+    do { } while (0)
+#endif
+
+#ifdef DEBUG_RDMA_REALLY_VERBOSE
+#define DDDPRINTF(fmt, ...) \
+    do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
+#else
+#define DDDPRINTF(fmt, ...) \
+    do { } while (0)
+#endif
+
+/*
+ * Print and error on both the Monitor and the Log file.
+ */
+#define ERROR(errp, fmt, ...) \
+    do { \
+        fprintf(stderr, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
+        if (errp && (*(errp) == NULL)) { \
+            error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
+        } \
+    } while (0)
+
+#define RDMA_RESOLVE_TIMEOUT_MS 10000
+
+/* Do not merge data if larger than this. */
+#define RDMA_MERGE_MAX (2 * 1024 * 1024)
+#define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
+
+#define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
+
+/*
+ * This is only for non-live state being migrated.
+ * Instead of RDMA_WRITE messages, we use RDMA_SEND
+ * messages for that state, which requires a different
+ * delivery design than main memory.
+ */
+#define RDMA_SEND_INCREMENT 32768
+
+/*
+ * Maximum size infiniband SEND message
+ */
+#define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
+#define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
+
+#define RDMA_CONTROL_VERSION_CURRENT 1
+/*
+ * Capabilities for negotiation.
+ */
+#define RDMA_CAPABILITY_PIN_ALL 0x01
+
+/*
+ * Add the other flags above to this list of known capabilities
+ * as they are introduced.
+ */
+static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
+
+#define CHECK_ERROR_STATE() \
+    do { \
+        if (rdma->error_state) { \
+            if (!rdma->error_reported) { \
+                fprintf(stderr, "RDMA is in an error state waiting migration" \
+                                " to abort!\n"); \
+                rdma->error_reported = 1; \
+            } \
+            return rdma->error_state; \
+        } \
+    } while (0);
+
+/*
+ * A work request ID is 64-bits and we split up these bits
+ * into 3 parts:
+ *
+ * bits 0-15 : type of control message, 2^16
+ * bits 16-29: ram block index, 2^14
+ * bits 30-63: ram block chunk number, 2^34
+ *
+ * The last two bit ranges are only used for RDMA writes,
+ * in order to track their completion and potentially
+ * also track unregistration status of the message.
+ */
+#define RDMA_WRID_TYPE_SHIFT  0UL
+#define RDMA_WRID_BLOCK_SHIFT 16UL
+#define RDMA_WRID_CHUNK_SHIFT 30UL
+
+#define RDMA_WRID_TYPE_MASK \
+    ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
+
+#define RDMA_WRID_BLOCK_MASK \
+    (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
+
+#define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
+
+/*
+ * RDMA migration protocol:
+ * 1. RDMA Writes (data messages, i.e. RAM)
+ * 2. IB Send/Recv (control channel messages)
+ */
+enum {
+    RDMA_WRID_NONE = 0,
+    RDMA_WRID_RDMA_WRITE = 1,
+    RDMA_WRID_SEND_CONTROL = 2000,
+    RDMA_WRID_RECV_CONTROL = 4000,
+};
+
+const char *wrid_desc[] = {
+    [RDMA_WRID_NONE] = "NONE",
+    [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
+    [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
+    [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
+};
+
+/*
+ * Work request IDs for IB SEND messages only (not RDMA writes).
+ * This is used by the migration protocol to transmit
+ * control messages (such as device state and registration commands)
+ *
+ * We could use more WRs, but we have enough for now.
+ */
+enum {
+    RDMA_WRID_READY = 0,
+    RDMA_WRID_DATA,
+    RDMA_WRID_CONTROL,
+    RDMA_WRID_MAX,
+};
+
+/*
+ * SEND/RECV IB Control Messages.
+ */
+enum {
+    RDMA_CONTROL_NONE = 0,
+    RDMA_CONTROL_ERROR,
+    RDMA_CONTROL_READY,               /* ready to receive */
+    RDMA_CONTROL_QEMU_FILE,           /* QEMUFile-transmitted bytes */
+    RDMA_CONTROL_RAM_BLOCKS_REQUEST,  /* RAMBlock synchronization */
+    RDMA_CONTROL_RAM_BLOCKS_RESULT,   /* RAMBlock synchronization */
+    RDMA_CONTROL_COMPRESS,            /* page contains repeat values */
+    RDMA_CONTROL_REGISTER_REQUEST,    /* dynamic page registration */
+    RDMA_CONTROL_REGISTER_RESULT,     /* key to use after registration */
+    RDMA_CONTROL_REGISTER_FINISHED,   /* current iteration finished */
+    RDMA_CONTROL_UNREGISTER_REQUEST,  /* dynamic UN-registration */
+    RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
+};
+
+const char *control_desc[] = {
+    [RDMA_CONTROL_NONE] = "NONE",
+    [RDMA_CONTROL_ERROR] = "ERROR",
+    [RDMA_CONTROL_READY] = "READY",
+    [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
+    [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
+    [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
+    [RDMA_CONTROL_COMPRESS] = "COMPRESS",
+    [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
+    [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
+    [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
+    [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
+    [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
+};
+
+/*
+ * Memory and MR structures used to represent an IB Send/Recv work request.
+ * This is *not* used for RDMA writes, only IB Send/Recv.
+ */
+typedef struct {
+    uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
+    struct   ibv_mr *control_mr;               /* registration metadata */
+    size_t   control_len;                      /* length of the message */
+    uint8_t *control_curr;                     /* start of unconsumed bytes */
+} RDMAWorkRequestData;
+
+/*
+ * Negotiate RDMA capabilities during connection-setup time.
+ */
+typedef struct {
+    uint32_t version;
+    uint32_t flags;
+} RDMACapabilities;
+
+static void caps_to_network(RDMACapabilities *cap)
+{
+    cap->version = htonl(cap->version);
+    cap->flags = htonl(cap->flags);
+}
+
+static void network_to_caps(RDMACapabilities *cap)
+{
+    cap->version = ntohl(cap->version);
+    cap->flags = ntohl(cap->flags);
+}
+
+/*
+ * Representation of a RAMBlock from an RDMA perspective.
+ * This is not transmitted, only local.
+ * This and subsequent structures cannot be linked lists
+ * because we're using a single IB message to transmit
+ * the information. It's small anyway, so a list is overkill.
+ */
+typedef struct RDMALocalBlock {
+    uint8_t  *local_host_addr; /* local virtual address */
+    uint64_t remote_host_addr; /* remote virtual address */
+    uint64_t offset;
+    uint64_t length;
+    struct   ibv_mr **pmr;     /* MRs for chunk-level registration */
+    struct   ibv_mr *mr;       /* MR for non-chunk-level registration */
+    uint32_t *remote_keys;     /* rkeys for chunk-level registration */
+    uint32_t remote_rkey;      /* rkeys for non-chunk-level registration */
+    int      index;            /* which block are we */
+    bool     is_ram_block;
+    int      nb_chunks;
+    unsigned long *transit_bitmap;
+    unsigned long *unregister_bitmap;
+} RDMALocalBlock;
+
+/*
+ * Also represents a RAMblock, but only on the dest.
+ * This gets transmitted by the dest during connection-time
+ * to the source VM and then is used to populate the
+ * corresponding RDMALocalBlock with
+ * the information needed to perform the actual RDMA.
+ */
+typedef struct QEMU_PACKED RDMARemoteBlock {
+    uint64_t remote_host_addr;
+    uint64_t offset;
+    uint64_t length;
+    uint32_t remote_rkey;
+    uint32_t padding;
+} RDMARemoteBlock;
+
+static uint64_t htonll(uint64_t v)
+{
+    union { uint32_t lv[2]; uint64_t llv; } u;
+    u.lv[0] = htonl(v >> 32);
+    u.lv[1] = htonl(v & 0xFFFFFFFFULL);
+    return u.llv;
+}
+
+static uint64_t ntohll(uint64_t v) {
+    union { uint32_t lv[2]; uint64_t llv; } u;
+    u.llv = v;
+    return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
+}
+
+static void remote_block_to_network(RDMARemoteBlock *rb)
+{
+    rb->remote_host_addr = htonll(rb->remote_host_addr);
+    rb->offset = htonll(rb->offset);
+    rb->length = htonll(rb->length);
+    rb->remote_rkey = htonl(rb->remote_rkey);
+}
+
+static void network_to_remote_block(RDMARemoteBlock *rb)
+{
+    rb->remote_host_addr = ntohll(rb->remote_host_addr);
+    rb->offset = ntohll(rb->offset);
+    rb->length = ntohll(rb->length);
+    rb->remote_rkey = ntohl(rb->remote_rkey);
+}
+
+/*
+ * Virtual address of the above structures used for transmitting
+ * the RAMBlock descriptions at connection-time.
+ * This structure is *not* transmitted.
+ */
+typedef struct RDMALocalBlocks {
+    int nb_blocks;
+    bool     init;             /* main memory init complete */
+    RDMALocalBlock *block;
+} RDMALocalBlocks;
+
+/*
+ * Main data structure for RDMA state.
+ * While there is only one copy of this structure being allocated right now,
+ * this is the place where one would start if you wanted to consider
+ * having more than one RDMA connection open at the same time.
+ */
+typedef struct RDMAContext {
+    char *host;
+    int port;
+
+    RDMAWorkRequestData wr_data[RDMA_WRID_MAX + 1];
+
+    /*
+     * This is used by *_exchange_send() to figure out whether or not
+     * the initial "READY" message has already been received or not.
+     * This is because other functions may potentially poll() and detect
+     * the READY message before send() does, in which case we need to
+     * know if it completed.
+     */
+    int control_ready_expected;
+
+    /* number of outstanding writes */
+    int nb_sent;
+
+    /* store info about current buffer so that we can
+       merge it with future sends */
+    uint64_t current_addr;
+    uint64_t current_length;
+    /* index of ram block the current buffer belongs to */
+    int current_index;
+    /* index of the chunk in the current ram block */
+    int current_chunk;
+
+    bool pin_all;
+
+    /*
+     * infiniband-specific variables for opening the device
+     * and maintaining connection state and so forth.
+     *
+     * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
+     * cm_id->verbs, cm_id->channel, and cm_id->qp.
+     */
+    struct rdma_cm_id *cm_id;               /* connection manager ID */
+    struct rdma_cm_id *listen_id;
+
+    struct ibv_context          *verbs;
+    struct rdma_event_channel   *channel;
+    struct ibv_qp *qp;                      /* queue pair */
+    struct ibv_comp_channel *comp_channel;  /* completion channel */
+    struct ibv_pd *pd;                      /* protection domain */
+    struct ibv_cq *cq;                      /* completion queue */
+
+    /*
+     * If a previous write failed (perhaps because of a failed
+     * memory registration, then do not attempt any future work
+     * and remember the error state.
+     */
+    int error_state;
+    int error_reported;
+
+    /*
+     * Description of ram blocks used throughout the code.
+     */
+    RDMALocalBlocks local_ram_blocks;
+    RDMARemoteBlock *block;
+
+    /*
+     * Migration on *destination* started.
+     * Then use coroutine yield function.
+     * Source runs in a thread, so we don't care.
+     */
+    int migration_started_on_destination;
+
+    int total_registrations;
+    int total_writes;
+
+    int unregister_current, unregister_next;
+    uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
+
+    GHashTable *blockmap;
+} RDMAContext;
+
+/*
+ * Interface to the rest of the migration call stack.
+ */
+typedef struct QEMUFileRDMA {
+    RDMAContext *rdma;
+    size_t len;
+    void *file;
+} QEMUFileRDMA;
+
+/*
+ * Main structure for IB Send/Recv control messages.
+ * This gets prepended at the beginning of every Send/Recv.
+ */
+typedef struct QEMU_PACKED {
+    uint32_t len;     /* Total length of data portion */
+    uint32_t type;    /* which control command to perform */
+    uint32_t repeat;  /* number of commands in data portion of same type */
+    uint32_t padding;
+} RDMAControlHeader;
+
+static void control_to_network(RDMAControlHeader *control)
+{
+    control->type = htonl(control->type);
+    control->len = htonl(control->len);
+    control->repeat = htonl(control->repeat);
+}
+
+static void network_to_control(RDMAControlHeader *control)
+{
+    control->type = ntohl(control->type);
+    control->len = ntohl(control->len);
+    control->repeat = ntohl(control->repeat);
+}
+
+/*
+ * Register a single Chunk.
+ * Information sent by the source VM to inform the dest
+ * to register an single chunk of memory before we can perform
+ * the actual RDMA operation.
+ */
+typedef struct QEMU_PACKED {
+    union QEMU_PACKED {
+        uint64_t current_addr;  /* offset into the ramblock of the chunk */
+        uint64_t chunk;         /* chunk to lookup if unregistering */
+    } key;
+    uint32_t current_index; /* which ramblock the chunk belongs to */
+    uint32_t padding;
+    uint64_t chunks;            /* how many sequential chunks to register */
+} RDMARegister;
+
+static void register_to_network(RDMARegister *reg)
+{
+    reg->key.current_addr = htonll(reg->key.current_addr);
+    reg->current_index = htonl(reg->current_index);
+    reg->chunks = htonll(reg->chunks);
+}
+
+static void network_to_register(RDMARegister *reg)
+{
+    reg->key.current_addr = ntohll(reg->key.current_addr);
+    reg->current_index = ntohl(reg->current_index);
+    reg->chunks = ntohll(reg->chunks);
+}
+
+typedef struct QEMU_PACKED {
+    uint32_t value;     /* if zero, we will madvise() */
+    uint32_t block_idx; /* which ram block index */
+    uint64_t offset;    /* where in the remote ramblock this chunk */
+    uint64_t length;    /* length of the chunk */
+} RDMACompress;
+
+static void compress_to_network(RDMACompress *comp)
+{
+    comp->value = htonl(comp->value);
+    comp->block_idx = htonl(comp->block_idx);
+    comp->offset = htonll(comp->offset);
+    comp->length = htonll(comp->length);
+}
+
+static void network_to_compress(RDMACompress *comp)
+{
+    comp->value = ntohl(comp->value);
+    comp->block_idx = ntohl(comp->block_idx);
+    comp->offset = ntohll(comp->offset);
+    comp->length = ntohll(comp->length);
+}
+
+/*
+ * The result of the dest's memory registration produces an "rkey"
+ * which the source VM must reference in order to perform
+ * the RDMA operation.
+ */
+typedef struct QEMU_PACKED {
+    uint32_t rkey;
+    uint32_t padding;
+    uint64_t host_addr;
+} RDMARegisterResult;
+
+static void result_to_network(RDMARegisterResult *result)
+{
+    result->rkey = htonl(result->rkey);
+    result->host_addr = htonll(result->host_addr);
+};
+
+static void network_to_result(RDMARegisterResult *result)
+{
+    result->rkey = ntohl(result->rkey);
+    result->host_addr = ntohll(result->host_addr);
+};
+
+const char *print_wrid(int wrid);
+static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
+                                   uint8_t *data, RDMAControlHeader *resp,
+                                   int *resp_idx,
+                                   int (*callback)(RDMAContext *rdma));
+
+static inline uint64_t ram_chunk_index(uint8_t *start, uint8_t *host)
+{
+    return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
+}
+
+static inline uint8_t *ram_chunk_start(RDMALocalBlock *rdma_ram_block,
+                                       uint64_t i)
+{
+    return (uint8_t *) (((uintptr_t) rdma_ram_block->local_host_addr)
+                                    + (i << RDMA_REG_CHUNK_SHIFT));
+}
+
+static inline uint8_t *ram_chunk_end(RDMALocalBlock *rdma_ram_block, uint64_t i)
+{
+    uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
+                                         (1UL << RDMA_REG_CHUNK_SHIFT);
+
+    if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
+        result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
+    }
+
+    return result;
+}
+
+static int __qemu_rdma_add_block(RDMAContext *rdma, void *host_addr,
+                         ram_addr_t block_offset, uint64_t length)
+{
+    RDMALocalBlocks *local = &rdma->local_ram_blocks;
+    RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
+        (void *) block_offset);
+    RDMALocalBlock *old = local->block;
+
+    assert(block == NULL);
+
+    local->block = g_malloc0(sizeof(RDMALocalBlock) * (local->nb_blocks + 1));
+
+    if (local->nb_blocks) {
+        int x;
+
+        for (x = 0; x < local->nb_blocks; x++) {
+            g_hash_table_remove(rdma->blockmap, (void *)old[x].offset);
+            g_hash_table_insert(rdma->blockmap, (void *)old[x].offset,
+                                                &local->block[x]);
+        }
+        memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
+        g_free(old);
+    }
+
+    block = &local->block[local->nb_blocks];
+
+    block->local_host_addr = host_addr;
+    block->offset = block_offset;
+    block->length = length;
+    block->index = local->nb_blocks;
+    block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
+    block->transit_bitmap = bitmap_new(block->nb_chunks);
+    bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
+    block->unregister_bitmap = bitmap_new(block->nb_chunks);
+    bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
+    block->remote_keys = g_malloc0(block->nb_chunks * sizeof(uint32_t));
+
+    block->is_ram_block = local->init ? false : true;
+
+    g_hash_table_insert(rdma->blockmap, (void *) block_offset, block);
+
+    DDPRINTF("Added Block: %d, addr: %" PRIu64 ", offset: %" PRIu64
+           " length: %" PRIu64 " end: %" PRIu64 " bits %" PRIu64 " chunks %d\n",
+            local->nb_blocks, (uint64_t) block->local_host_addr, block->offset,
+            block->length, (uint64_t) (block->local_host_addr + block->length),
+                BITS_TO_LONGS(block->nb_chunks) *
+                    sizeof(unsigned long) * 8, block->nb_chunks);
+
+    local->nb_blocks++;
+
+    return 0;
+}
+
+/*
+ * Memory regions need to be registered with the device and queue pairs setup
+ * in advanced before the migration starts. This tells us where the RAM blocks
+ * are so that we can register them individually.
+ */
+static void qemu_rdma_init_one_block(void *host_addr,
+    ram_addr_t block_offset, ram_addr_t length, void *opaque)
+{
+    __qemu_rdma_add_block(opaque, host_addr, block_offset, length);
+}
+
+/*
+ * Identify the RAMBlocks and their quantity. They will be references to
+ * identify chunk boundaries inside each RAMBlock and also be referenced
+ * during dynamic page registration.
+ */
+static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
+{
+    RDMALocalBlocks *local = &rdma->local_ram_blocks;
+
+    assert(rdma->blockmap == NULL);
+    rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
+    memset(local, 0, sizeof *local);
+    qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma);
+    DPRINTF("Allocated %d local ram block structures\n", local->nb_blocks);
+    rdma->block = (RDMARemoteBlock *) g_malloc0(sizeof(RDMARemoteBlock) *
+                        rdma->local_ram_blocks.nb_blocks);
+    local->init = true;
+    return 0;
+}
+
+static int __qemu_rdma_delete_block(RDMAContext *rdma, ram_addr_t block_offset)
+{
+    RDMALocalBlocks *local = &rdma->local_ram_blocks;
+    RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
+        (void *) block_offset);
+    RDMALocalBlock *old = local->block;
+    int x;
+
+    assert(block);
+
+    if (block->pmr) {
+        int j;
+
+        for (j = 0; j < block->nb_chunks; j++) {
+            if (!block->pmr[j]) {
+                continue;
+            }
+            ibv_dereg_mr(block->pmr[j]);
+            rdma->total_registrations--;
+        }
+        g_free(block->pmr);
+        block->pmr = NULL;
+    }
+
+    if (block->mr) {
+        ibv_dereg_mr(block->mr);
+        rdma->total_registrations--;
+        block->mr = NULL;
+    }
+
+    g_free(block->transit_bitmap);
+    block->transit_bitmap = NULL;
+
+    g_free(block->unregister_bitmap);
+    block->unregister_bitmap = NULL;
+
+    g_free(block->remote_keys);
+    block->remote_keys = NULL;
+
+    for (x = 0; x < local->nb_blocks; x++) {
+        g_hash_table_remove(rdma->blockmap, (void *)old[x].offset);
+    }
+
+    if (local->nb_blocks > 1) {
+
+        local->block = g_malloc0(sizeof(RDMALocalBlock) *
+                                    (local->nb_blocks - 1));
+
+        if (block->index) {
+            memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
+        }
+
+        if (block->index < (local->nb_blocks - 1)) {
+            memcpy(local->block + block->index, old + (block->index + 1),
+                sizeof(RDMALocalBlock) *
+                    (local->nb_blocks - (block->index + 1)));
+        }
+    } else {
+        assert(block == local->block);
+        local->block = NULL;
+    }
+
+    DDPRINTF("Deleted Block: %d, addr: %" PRIu64 ", offset: %" PRIu64
+           " length: %" PRIu64 " end: %" PRIu64 " bits %" PRIu64 " chunks %d\n",
+            local->nb_blocks, (uint64_t) block->local_host_addr, block->offset,
+            block->length, (uint64_t) (block->local_host_addr + block->length),
+                BITS_TO_LONGS(block->nb_chunks) *
+                    sizeof(unsigned long) * 8, block->nb_chunks);
+
+    g_free(old);
+
+    local->nb_blocks--;
+
+    if (local->nb_blocks) {
+        for (x = 0; x < local->nb_blocks; x++) {
+            g_hash_table_insert(rdma->blockmap, (void *)local->block[x].offset,
+                                                &local->block[x]);
+        }
+    }
+
+    return 0;
+}
+
+/*
+ * Put in the log file which RDMA device was opened and the details
+ * associated with that device.
+ */
+static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
+{
+    printf("%s RDMA Device opened: kernel name %s "
+           "uverbs device name %s, "
+           "infiniband_verbs class device path %s,"
+           " infiniband class device path %s\n",
+                who,
+                verbs->device->name,
+                verbs->device->dev_name,
+                verbs->device->dev_path,
+                verbs->device->ibdev_path);
+}
+
+/*
+ * Put in the log file the RDMA gid addressing information,
+ * useful for folks who have trouble understanding the
+ * RDMA device hierarchy in the kernel.
+ */
+static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
+{
+    char sgid[33];
+    char dgid[33];
+    inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
+    inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
+    DPRINTF("%s Source GID: %s, Dest GID: %s\n", who, sgid, dgid);
+}
+
+/*
+ * Figure out which RDMA device corresponds to the requested IP hostname
+ * Also create the initial connection manager identifiers for opening
+ * the connection.
+ */
+static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
+{
+    int ret;
+    struct addrinfo *res;
+    char port_str[16];
+    struct rdma_cm_event *cm_event;
+    char ip[40] = "unknown";
+
+    if (rdma->host == NULL || !strcmp(rdma->host, "")) {
+        ERROR(errp, "RDMA hostname has not been set\n");
+        return -1;
+    }
+
+    /* create CM channel */
+    rdma->channel = rdma_create_event_channel();
+    if (!rdma->channel) {
+        ERROR(errp, "could not create CM channel\n");
+        return -1;
+    }
+
+    /* create CM id */
+    ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
+    if (ret) {
+        ERROR(errp, "could not create channel id\n");
+        goto err_resolve_create_id;
+    }
+
+    snprintf(port_str, 16, "%d", rdma->port);
+    port_str[15] = '\0';
+
+    ret = getaddrinfo(rdma->host, port_str, NULL, &res);
+    if (ret < 0) {
+        ERROR(errp, "could not getaddrinfo address %s\n", rdma->host);
+        goto err_resolve_get_addr;
+    }
+
+    inet_ntop(AF_INET, &((struct sockaddr_in *) res->ai_addr)->sin_addr,
+                                ip, sizeof ip);
+    DPRINTF("%s => %s\n", rdma->host, ip);
+
+    /* resolve the first address */
+    ret = rdma_resolve_addr(rdma->cm_id, NULL, res->ai_addr,
+            RDMA_RESOLVE_TIMEOUT_MS);
+    if (ret) {
+        ERROR(errp, "could not resolve address %s\n", rdma->host);
+        goto err_resolve_get_addr;
+    }
+
+    qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
+
+    ret = rdma_get_cm_event(rdma->channel, &cm_event);
+    if (ret) {
+        ERROR(errp, "could not perform event_addr_resolved\n");
+        goto err_resolve_get_addr;
+    }
+
+    if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
+        ERROR(errp, "result not equal to event_addr_resolved %s\n",
+                rdma_event_str(cm_event->event));
+        perror("rdma_resolve_addr");
+        goto err_resolve_get_addr;
+    }
+    rdma_ack_cm_event(cm_event);
+
+    /* resolve route */
+    ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
+    if (ret) {
+        ERROR(errp, "could not resolve rdma route\n");
+        goto err_resolve_get_addr;
+    }
+
+    ret = rdma_get_cm_event(rdma->channel, &cm_event);
+    if (ret) {
+        ERROR(errp, "could not perform event_route_resolved\n");
+        goto err_resolve_get_addr;
+    }
+    if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
+        ERROR(errp, "result not equal to event_route_resolved: %s\n",
+                        rdma_event_str(cm_event->event));
+        rdma_ack_cm_event(cm_event);
+        goto err_resolve_get_addr;
+    }
+    rdma_ack_cm_event(cm_event);
+    rdma->verbs = rdma->cm_id->verbs;
+    qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
+    qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
+    return 0;
+
+err_resolve_get_addr:
+    rdma_destroy_id(rdma->cm_id);
+    rdma->cm_id = NULL;
+err_resolve_create_id:
+    rdma_destroy_event_channel(rdma->channel);
+    rdma->channel = NULL;
+
+    return -1;
+}
+
+/*
+ * Create protection domain and completion queues
+ */
+static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
+{
+    /* allocate pd */
+    rdma->pd = ibv_alloc_pd(rdma->verbs);
+    if (!rdma->pd) {
+        fprintf(stderr, "failed to allocate protection domain\n");
+        return -1;
+    }
+
+    /* create completion channel */
+    rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
+    if (!rdma->comp_channel) {
+        fprintf(stderr, "failed to allocate completion channel\n");
+        goto err_alloc_pd_cq;
+    }
+
+    /*
+     * Completion queue can be filled by both read and write work requests,
+     * so must reflect the sum of both possible queue sizes.
+     */
+    rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
+            NULL, rdma->comp_channel, 0);
+    if (!rdma->cq) {
+        fprintf(stderr, "failed to allocate completion queue\n");
+        goto err_alloc_pd_cq;
+    }
+
+    return 0;
+
+err_alloc_pd_cq:
+    if (rdma->pd) {
+        ibv_dealloc_pd(rdma->pd);
+    }
+    if (rdma->comp_channel) {
+        ibv_destroy_comp_channel(rdma->comp_channel);
+    }
+    rdma->pd = NULL;
+    rdma->comp_channel = NULL;
+    return -1;
+
+}
+
+/*
+ * Create queue pairs.
+ */
+static int qemu_rdma_alloc_qp(RDMAContext *rdma)
+{
+    struct ibv_qp_init_attr attr = { 0 };
+    int ret;
+
+    attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
+    attr.cap.max_recv_wr = 3;
+    attr.cap.max_send_sge = 1;
+    attr.cap.max_recv_sge = 1;
+    attr.send_cq = rdma->cq;
+    attr.recv_cq = rdma->cq;
+    attr.qp_type = IBV_QPT_RC;
+
+    ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
+    if (ret) {
+        return -1;
+    }
+
+    rdma->qp = rdma->cm_id->qp;
+    return 0;
+}
+
+static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
+{
+    int i;
+    RDMALocalBlocks *local = &rdma->local_ram_blocks;
+
+    for (i = 0; i < local->nb_blocks; i++) {
+        local->block[i].mr =
+            ibv_reg_mr(rdma->pd,
+                    local->block[i].local_host_addr,
+                    local->block[i].length,
+                    IBV_ACCESS_LOCAL_WRITE |
+                    IBV_ACCESS_REMOTE_WRITE
+                    );
+        if (!local->block[i].mr) {
+            perror("Failed to register local dest ram block!\n");
+            break;
+        }
+        rdma->total_registrations++;
+    }
+
+    if (i >= local->nb_blocks) {
+        return 0;
+    }
+
+    for (i--; i >= 0; i--) {
+        ibv_dereg_mr(local->block[i].mr);
+        rdma->total_registrations--;
+    }
+
+    return -1;
+
+}
+
+/*
+ * Find the ram block that corresponds to the page requested to be
+ * transmitted by QEMU.
+ *
+ * Once the block is found, also identify which 'chunk' within that
+ * block that the page belongs to.
+ *
+ * This search cannot fail or the migration will fail.
+ */
+static int qemu_rdma_search_ram_block(RDMAContext *rdma,
+                                      uint64_t block_offset,
+                                      uint64_t offset,
+                                      uint64_t length,
+                                      uint64_t *block_index,
+                                      uint64_t *chunk_index)
+{
+    uint64_t current_addr = block_offset + offset;
+    RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
+                                                (void *) block_offset);
+    assert(block);
+    assert(current_addr >= block->offset);
+    assert((current_addr + length) <= (block->offset + block->length));
+
+    *block_index = block->index;
+    *chunk_index = ram_chunk_index(block->local_host_addr,
+                block->local_host_addr + (current_addr - block->offset));
+
+    return 0;
+}
+
+/*
+ * Register a chunk with IB. If the chunk was already registered
+ * previously, then skip.
+ *
+ * Also return the keys associated with the registration needed
+ * to perform the actual RDMA operation.
+ */
+static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
+        RDMALocalBlock *block, uint8_t *host_addr,
+        uint32_t *lkey, uint32_t *rkey, int chunk,
+        uint8_t *chunk_start, uint8_t *chunk_end)
+{
+    if (block->mr) {
+        if (lkey) {
+            *lkey = block->mr->lkey;
+        }
+        if (rkey) {
+            *rkey = block->mr->rkey;
+        }
+        return 0;
+    }
+
+    /* allocate memory to store chunk MRs */
+    if (!block->pmr) {
+        block->pmr = g_malloc0(block->nb_chunks * sizeof(struct ibv_mr *));
+        if (!block->pmr) {
+            return -1;
+        }
+    }
+
+    /*
+     * If 'rkey', then we're the destination, so grant access to the source.
+     *
+     * If 'lkey', then we're the source VM, so grant access only to ourselves.
+     */
+    if (!block->pmr[chunk]) {
+        uint64_t len = chunk_end - chunk_start;
+
+        DDPRINTF("Registering %" PRIu64 " bytes @ %p\n",
+                 len, chunk_start);
+
+        block->pmr[chunk] = ibv_reg_mr(rdma->pd,
+                chunk_start, len,
+                (rkey ? (IBV_ACCESS_LOCAL_WRITE |
+                        IBV_ACCESS_REMOTE_WRITE) : 0));
+
+        if (!block->pmr[chunk]) {
+            perror("Failed to register chunk!");
+            fprintf(stderr, "Chunk details: block: %d chunk index %d"
+                            " start %" PRIu64 " end %" PRIu64 " host %" PRIu64
+                            " local %" PRIu64 " registrations: %d\n",
+                            block->index, chunk, (uint64_t) chunk_start,
+                            (uint64_t) chunk_end, (uint64_t) host_addr,
+                            (uint64_t) block->local_host_addr,
+                            rdma->total_registrations);
+            return -1;
+        }
+        rdma->total_registrations++;
+    }
+
+    if (lkey) {
+        *lkey = block->pmr[chunk]->lkey;
+    }
+    if (rkey) {
+        *rkey = block->pmr[chunk]->rkey;
+    }
+    return 0;
+}
+
+/*
+ * Register (at connection time) the memory used for control
+ * channel messages.
+ */
+static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
+{
+    rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
+            rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
+            IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
+    if (rdma->wr_data[idx].control_mr) {
+        rdma->total_registrations++;
+        return 0;
+    }
+    fprintf(stderr, "qemu_rdma_reg_control failed!\n");
+    return -1;
+}
+
+const char *print_wrid(int wrid)
+{
+    if (wrid >= RDMA_WRID_RECV_CONTROL) {
+        return wrid_desc[RDMA_WRID_RECV_CONTROL];
+    }
+    return wrid_desc[wrid];
+}
+
+/*
+ * RDMA requires memory registration (mlock/pinning), but this is not good for
+ * overcommitment.
+ *
+ * In preparation for the future where LRU information or workload-specific
+ * writable writable working set memory access behavior is available to QEMU
+ * it would be nice to have in place the ability to UN-register/UN-pin
+ * particular memory regions from the RDMA hardware when it is determine that
+ * those regions of memory will likely not be accessed again in the near future.
+ *
+ * While we do not yet have such information right now, the following
+ * compile-time option allows us to perform a non-optimized version of this
+ * behavior.
+ *
+ * By uncommenting this option, you will cause *all* RDMA transfers to be
+ * unregistered immediately after the transfer completes on both sides of the
+ * connection. This has no effect in 'rdma-pin-all' mode, only regular mode.
+ *
+ * This will have a terrible impact on migration performance, so until future
+ * workload information or LRU information is available, do not attempt to use
+ * this feature except for basic testing.
+ */
+//#define RDMA_UNREGISTRATION_EXAMPLE
+
+/*
+ * Perform a non-optimized memory unregistration after every transfer
+ * for demonsration purposes, only if pin-all is not requested.
+ *
+ * Potential optimizations:
+ * 1. Start a new thread to run this function continuously
+        - for bit clearing
+        - and for receipt of unregister messages
+ * 2. Use an LRU.
+ * 3. Use workload hints.
+ */
+static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
+{
+    while (rdma->unregistrations[rdma->unregister_current]) {
+        int ret;
+        uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
+        uint64_t chunk =
+            (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
+        uint64_t index =
+            (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
+        RDMALocalBlock *block =
+            &(rdma->local_ram_blocks.block[index]);
+        RDMARegister reg = { .current_index = index };
+        RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
+                                 };
+        RDMAControlHeader head = { .len = sizeof(RDMARegister),
+                                   .type = RDMA_CONTROL_UNREGISTER_REQUEST,
+                                   .repeat = 1,
+                                 };
+
+        DDPRINTF("Processing unregister for chunk: %" PRIu64
+                 " at position %d\n", chunk, rdma->unregister_current);
+
+        rdma->unregistrations[rdma->unregister_current] = 0;
+        rdma->unregister_current++;
+
+        if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
+            rdma->unregister_current = 0;
+        }
+
+
+        /*
+         * Unregistration is speculative (because migration is single-threaded
+         * and we cannot break the protocol's inifinband message ordering).
+         * Thus, if the memory is currently being used for transmission,
+         * then abort the attempt to unregister and try again
+         * later the next time a completion is received for this memory.
+         */
+        clear_bit(chunk, block->unregister_bitmap);
+
+        if (test_bit(chunk, block->transit_bitmap)) {
+            DDPRINTF("Cannot unregister inflight chunk: %" PRIu64 "\n", chunk);
+            continue;
+        }
+
+        DDPRINTF("Sending unregister for chunk: %" PRIu64 "\n", chunk);
+
+        ret = ibv_dereg_mr(block->pmr[chunk]);
+        block->pmr[chunk] = NULL;
+        block->remote_keys[chunk] = 0;
+
+        if (ret != 0) {
+            perror("unregistration chunk failed");
+            return -ret;
+        }
+        rdma->total_registrations--;
+
+        reg.key.chunk = chunk;
+        register_to_network(&reg);
+        ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
+                                &resp, NULL, NULL);
+        if (ret < 0) {
+            return ret;
+        }
+
+        DDPRINTF("Unregister for chunk: %" PRIu64 " complete.\n", chunk);
+    }
+
+    return 0;
+}
+
+static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
+                                         uint64_t chunk)
+{
+    uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
+
+    result |= (index << RDMA_WRID_BLOCK_SHIFT);
+    result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
+
+    return result;
+}
+
+/*
+ * Set bit for unregistration in the next iteration.
+ * We cannot transmit right here, but will unpin later.
+ */
+static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
+                                        uint64_t chunk, uint64_t wr_id)
+{
+    if (rdma->unregistrations[rdma->unregister_next] != 0) {
+        fprintf(stderr, "rdma migration: queue is full!\n");
+    } else {
+        RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
+
+        if (!test_and_set_bit(chunk, block->unregister_bitmap)) {
+            DDPRINTF("Appending unregister chunk %" PRIu64
+                    " at position %d\n", chunk, rdma->unregister_next);
+
+            rdma->unregistrations[rdma->unregister_next++] =
+                    qemu_rdma_make_wrid(wr_id, index, chunk);
+
+            if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) {
+                rdma->unregister_next = 0;
+            }
+        } else {
+            DDPRINTF("Unregister chunk %" PRIu64 " already in queue.\n",
+                    chunk);
+        }
+    }
+}
+
+/*
+ * Consult the connection manager to see a work request
+ * (of any kind) has completed.
+ * Return the work request ID that completed.
+ */
+static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out)
+{
+    int ret;
+    struct ibv_wc wc;
+    uint64_t wr_id;
+
+    ret = ibv_poll_cq(rdma->cq, 1, &wc);
+
+    if (!ret) {
+        *wr_id_out = RDMA_WRID_NONE;
+        return 0;
+    }
+
+    if (ret < 0) {
+        fprintf(stderr, "ibv_poll_cq return %d!\n", ret);
+        return ret;
+    }
+
+    wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
+
+    if (wc.status != IBV_WC_SUCCESS) {
+        fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
+                        wc.status, ibv_wc_status_str(wc.status));
+        fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
+
+        return -1;
+    }
+
+    if (rdma->control_ready_expected &&
+        (wr_id >= RDMA_WRID_RECV_CONTROL)) {
+        DDDPRINTF("completion %s #%" PRId64 " received (%" PRId64 ")"
+                  " left %d\n", wrid_desc[RDMA_WRID_RECV_CONTROL],
+                  wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
+        rdma->control_ready_expected = 0;
+    }
+
+    if (wr_id == RDMA_WRID_RDMA_WRITE) {
+        uint64_t chunk =
+            (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
+        uint64_t index =
+            (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
+        RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
+
+        DDDPRINTF("completions %s (%" PRId64 ") left %d, "
+                 "block %" PRIu64 ", chunk: %" PRIu64 " %p %p\n",
+                 print_wrid(wr_id), wr_id, rdma->nb_sent, index, chunk,
+                 block->local_host_addr, (void *)block->remote_host_addr);
+
+        clear_bit(chunk, block->transit_bitmap);
+
+        if (rdma->nb_sent > 0) {
+            rdma->nb_sent--;
+        }
+
+        if (!rdma->pin_all) {
+            /*
+             * FYI: If one wanted to signal a specific chunk to be unregistered
+             * using LRU or workload-specific information, this is the function
+             * you would call to do so. That chunk would then get asynchronously
+             * unregistered later.
+             */
+#ifdef RDMA_UNREGISTRATION_EXAMPLE
+            qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id);
+#endif
+        }
+    } else {
+        DDDPRINTF("other completion %s (%" PRId64 ") received left %d\n",
+            print_wrid(wr_id), wr_id, rdma->nb_sent);
+    }
+
+    *wr_id_out = wc.wr_id;
+
+    return  0;
+}
+
+/*
+ * Block until the next work request has completed.
+ *
+ * First poll to see if a work request has already completed,
+ * otherwise block.
+ *
+ * If we encounter completed work requests for IDs other than
+ * the one we're interested in, then that's generally an error.
+ *
+ * The only exception is actual RDMA Write completions. These
+ * completions only need to be recorded, but do not actually
+ * need further processing.
+ */
+static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested)
+{
+    int num_cq_events = 0, ret = 0;
+    struct ibv_cq *cq;
+    void *cq_ctx;
+    uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
+
+    if (ibv_req_notify_cq(rdma->cq, 0)) {
+        return -1;
+    }
+    /* poll cq first */
+    while (wr_id != wrid_requested) {
+        ret = qemu_rdma_poll(rdma, &wr_id_in);
+        if (ret < 0) {
+            return ret;
+        }
+
+        wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
+
+        if (wr_id == RDMA_WRID_NONE) {
+            break;
+        }
+        if (wr_id != wrid_requested) {
+            DDDPRINTF("A Wanted wrid %s (%d) but got %s (%" PRIu64 ")\n",
+                print_wrid(wrid_requested),
+                wrid_requested, print_wrid(wr_id), wr_id);
+        }
+    }
+
+    if (wr_id == wrid_requested) {
+        return 0;
+    }
+
+    while (1) {
+        /*
+         * Coroutine doesn't start until process_incoming_migration()
+         * so don't yield unless we know we're running inside of a coroutine.
+         */
+        if (rdma->migration_started_on_destination) {
+            yield_until_fd_readable(rdma->comp_channel->fd);
+        }
+
+        if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) {
+            perror("ibv_get_cq_event");
+            goto err_block_for_wrid;
+        }
+
+        num_cq_events++;
+
+        if (ibv_req_notify_cq(cq, 0)) {
+            goto err_block_for_wrid;
+        }
+
+        while (wr_id != wrid_requested) {
+            ret = qemu_rdma_poll(rdma, &wr_id_in);
+            if (ret < 0) {
+                goto err_block_for_wrid;
+            }
+
+            wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
+
+            if (wr_id == RDMA_WRID_NONE) {
+                break;
+            }
+            if (wr_id != wrid_requested) {
+                DDDPRINTF("B Wanted wrid %s (%d) but got %s (%" PRIu64 ")\n",
+                    print_wrid(wrid_requested), wrid_requested,
+                    print_wrid(wr_id), wr_id);
+            }
+        }
+
+        if (wr_id == wrid_requested) {
+            goto success_block_for_wrid;
+        }
+    }
+
+success_block_for_wrid:
+    if (num_cq_events) {
+        ibv_ack_cq_events(cq, num_cq_events);
+    }
+    return 0;
+
+err_block_for_wrid:
+    if (num_cq_events) {
+        ibv_ack_cq_events(cq, num_cq_events);
+    }
+    return ret;
+}
+
+/*
+ * Post a SEND message work request for the control channel
+ * containing some data and block until the post completes.
+ */
+static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
+                                       RDMAControlHeader *head)
+{
+    int ret = 0;
+    RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_MAX];
+    struct ibv_send_wr *bad_wr;
+    struct ibv_sge sge = {
+                           .addr = (uint64_t)(wr->control),
+                           .length = head->len + sizeof(RDMAControlHeader),
+                           .lkey = wr->control_mr->lkey,
+                         };
+    struct ibv_send_wr send_wr = {
+                                   .wr_id = RDMA_WRID_SEND_CONTROL,
+                                   .opcode = IBV_WR_SEND,
+                                   .send_flags = IBV_SEND_SIGNALED,
+                                   .sg_list = &sge,
+                                   .num_sge = 1,
+                                };
+
+    DDDPRINTF("CONTROL: sending %s..\n", control_desc[head->type]);
+
+    /*
+     * We don't actually need to do a memcpy() in here if we used
+     * the "sge" properly, but since we're only sending control messages
+     * (not RAM in a performance-critical path), then its OK for now.
+     *
+     * The copy makes the RDMAControlHeader simpler to manipulate
+     * for the time being.
+     */
+    memcpy(wr->control, head, sizeof(RDMAControlHeader));
+    control_to_network((void *) wr->control);
+
+    if (buf) {
+        memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
+    }
+
+
+    if (ibv_post_send(rdma->qp, &send_wr, &bad_wr)) {
+        return -1;
+    }
+
+    if (ret < 0) {
+        fprintf(stderr, "Failed to use post IB SEND for control!\n");
+        return ret;
+    }
+
+    ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL);
+    if (ret < 0) {
+        fprintf(stderr, "rdma migration: send polling control error!\n");
+    }
+
+    return ret;
+}
+
+/*
+ * Post a RECV work request in anticipation of some future receipt
+ * of data on the control channel.
+ */
+static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
+{
+    struct ibv_recv_wr *bad_wr;
+    struct ibv_sge sge = {
+                            .addr = (uint64_t)(rdma->wr_data[idx].control),
+                            .length = RDMA_CONTROL_MAX_BUFFER,
+                            .lkey = rdma->wr_data[idx].control_mr->lkey,
+                         };
+
+    struct ibv_recv_wr recv_wr = {
+                                    .wr_id = RDMA_WRID_RECV_CONTROL + idx,
+                                    .sg_list = &sge,
+                                    .num_sge = 1,
+                                 };
+
+
+    if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
+        return -1;
+    }
+
+    return 0;
+}
+
+/*
+ * Block and wait for a RECV control channel message to arrive.
+ */
+static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
+                RDMAControlHeader *head, int expecting, int idx)
+{
+    int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx);
+
+    if (ret < 0) {
+        fprintf(stderr, "rdma migration: recv polling control error!\n");
+        return ret;
+    }
+
+    network_to_control((void *) rdma->wr_data[idx].control);
+    memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
+
+    DDDPRINTF("CONTROL: %s receiving...\n", control_desc[expecting]);
+
+    if (expecting == RDMA_CONTROL_NONE) {
+        DDDPRINTF("Surprise: got %s (%d)\n",
+                  control_desc[head->type], head->type);
+    } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
+        fprintf(stderr, "Was expecting a %s (%d) control message"
+                ", but got: %s (%d), length: %d\n",
+                control_desc[expecting], expecting,
+                control_desc[head->type], head->type, head->len);
+        return -EIO;
+    }
+
+    return 0;
+}
+
+/*
+ * When a RECV work request has completed, the work request's
+ * buffer is pointed at the header.
+ *
+ * This will advance the pointer to the data portion
+ * of the control message of the work request's buffer that
+ * was populated after the work request finished.
+ */
+static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
+                                  RDMAControlHeader *head)
+{
+    rdma->wr_data[idx].control_len = head->len;
+    rdma->wr_data[idx].control_curr =
+        rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
+}
+
+/*
+ * This is an 'atomic' high-level operation to deliver a single, unified
+ * control-channel message.
+ *
+ * Additionally, if the user is expecting some kind of reply to this message,
+ * they can request a 'resp' response message be filled in by posting an
+ * additional work request on behalf of the user and waiting for an additional
+ * completion.
+ *
+ * The extra (optional) response is used during registration to us from having
+ * to perform an *additional* exchange of message just to provide a response by
+ * instead piggy-backing on the acknowledgement.
+ */
+static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
+                                   uint8_t *data, RDMAControlHeader *resp,
+                                   int *resp_idx,
+                                   int (*callback)(RDMAContext *rdma))
+{
+    int ret = 0;
+
+    /*
+     * Wait until the dest is ready before attempting to deliver the message
+     * by waiting for a READY message.
+     */
+    if (rdma->control_ready_expected) {
+        RDMAControlHeader resp;
+        ret = qemu_rdma_exchange_get_response(rdma,
+                                    &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
+        if (ret < 0) {
+            return ret;
+        }
+    }
+
+    /*
+     * If the user is expecting a response, post a WR in anticipation of it.
+     */
+    if (resp) {
+        ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
+        if (ret) {
+            fprintf(stderr, "rdma migration: error posting"
+                    " extra control recv for anticipated result!");
+            return ret;
+        }
+    }
+
+    /*
+     * Post a WR to replace the one we just consumed for the READY message.
+     */
+    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
+    if (ret) {
+        fprintf(stderr, "rdma migration: error posting first control recv!");
+        return ret;
+    }
+
+    /*
+     * Deliver the control message that was requested.
+     */
+    ret = qemu_rdma_post_send_control(rdma, data, head);
+
+    if (ret < 0) {
+        fprintf(stderr, "Failed to send control buffer!\n");
+        return ret;
+    }
+
+    /*
+     * If we're expecting a response, block and wait for it.
+     */
+    if (resp) {
+        if (callback) {
+            DDPRINTF("Issuing callback before receiving response...\n");
+            ret = callback(rdma);
+            if (ret < 0) {
+                return ret;
+            }
+        }
+
+        DDPRINTF("Waiting for response %s\n", control_desc[resp->type]);
+        ret = qemu_rdma_exchange_get_response(rdma, resp,
+                                              resp->type, RDMA_WRID_DATA);
+
+        if (ret < 0) {
+            return ret;
+        }
+
+        qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
+        if (resp_idx) {
+            *resp_idx = RDMA_WRID_DATA;
+        }
+        DDPRINTF("Response %s received.\n", control_desc[resp->type]);
+    }
+
+    rdma->control_ready_expected = 1;
+
+    return 0;
+}
+
+/*
+ * This is an 'atomic' high-level operation to receive a single, unified
+ * control-channel message.
+ */
+static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
+                                int expecting)
+{
+    RDMAControlHeader ready = {
+                                .len = 0,
+                                .type = RDMA_CONTROL_READY,
+                                .repeat = 1,
+                              };
+    int ret;
+
+    /*
+     * Inform the source that we're ready to receive a message.
+     */
+    ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
+
+    if (ret < 0) {
+        fprintf(stderr, "Failed to send control buffer!\n");
+        return ret;
+    }
+
+    /*
+     * Block and wait for the message.
+     */
+    ret = qemu_rdma_exchange_get_response(rdma, head,
+                                          expecting, RDMA_WRID_READY);
+
+    if (ret < 0) {
+        return ret;
+    }
+
+    qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
+
+    /*
+     * Post a new RECV work request to replace the one we just consumed.
+     */
+    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
+    if (ret) {
+        fprintf(stderr, "rdma migration: error posting second control recv!");
+        return ret;
+    }
+
+    return 0;
+}
+
+/*
+ * Write an actual chunk of memory using RDMA.
+ *
+ * If we're using dynamic registration on the dest-side, we have to
+ * send a registration command first.
+ */
+static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
+                               int current_index, uint64_t current_addr,
+                               uint64_t length)
+{
+    struct ibv_sge sge;
+    struct ibv_send_wr send_wr = { 0 };
+    struct ibv_send_wr *bad_wr;
+    int reg_result_idx, ret, count = 0;
+    uint64_t chunk, chunks;
+    uint8_t *chunk_start, *chunk_end;
+    RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
+    RDMARegister reg;
+    RDMARegisterResult *reg_result;
+    RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
+    RDMAControlHeader head = { .len = sizeof(RDMARegister),
+                               .type = RDMA_CONTROL_REGISTER_REQUEST,
+                               .repeat = 1,
+                             };
+
+retry:
+    sge.addr = (uint64_t)(block->local_host_addr +
+                            (current_addr - block->offset));
+    sge.length = length;
+
+    chunk = ram_chunk_index(block->local_host_addr, (uint8_t *) sge.addr);
+    chunk_start = ram_chunk_start(block, chunk);
+
+    if (block->is_ram_block) {
+        chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
+
+        if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
+            chunks--;
+        }
+    } else {
+        chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
+
+        if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
+            chunks--;
+        }
+    }
+
+    DDPRINTF("Writing %" PRIu64 " chunks, (%" PRIu64 " MB)\n",
+        chunks + 1, (chunks + 1) * (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
+
+    chunk_end = ram_chunk_end(block, chunk + chunks);
+
+    if (!rdma->pin_all) {
+#ifdef RDMA_UNREGISTRATION_EXAMPLE
+        qemu_rdma_unregister_waiting(rdma);
+#endif
+    }
+
+    while (test_bit(chunk, block->transit_bitmap)) {
+        (void)count;
+        DDPRINTF("(%d) Not clobbering: block: %d chunk %" PRIu64
+                " current %" PRIu64 " len %" PRIu64 " %d %d\n",
+                count++, current_index, chunk,
+                sge.addr, length, rdma->nb_sent, block->nb_chunks);
+
+        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE);
+
+        if (ret < 0) {
+            fprintf(stderr, "Failed to Wait for previous write to complete "
+                    "block %d chunk %" PRIu64
+                    " current %" PRIu64 " len %" PRIu64 " %d\n",
+                    current_index, chunk, sge.addr, length, rdma->nb_sent);
+            return ret;
+        }
+    }
+
+    if (!rdma->pin_all || !block->is_ram_block) {
+        if (!block->remote_keys[chunk]) {
+            /*
+             * This chunk has not yet been registered, so first check to see
+             * if the entire chunk is zero. If so, tell the other size to
+             * memset() + madvise() the entire chunk without RDMA.
+             */
+
+            if (can_use_buffer_find_nonzero_offset((void *)sge.addr, length)
+                   && buffer_find_nonzero_offset((void *)sge.addr,
+                                                    length) == length) {
+                RDMACompress comp = {
+                                        .offset = current_addr,
+                                        .value = 0,
+                                        .block_idx = current_index,
+                                        .length = length,
+                                    };
+
+                head.len = sizeof(comp);
+                head.type = RDMA_CONTROL_COMPRESS;
+
+                DDPRINTF("Entire chunk is zero, sending compress: %"
+                    PRIu64 " for %d "
+                    "bytes, index: %d, offset: %" PRId64 "...\n",
+                    chunk, sge.length, current_index, current_addr);
+
+                compress_to_network(&comp);
+                ret = qemu_rdma_exchange_send(rdma, &head,
+                                (uint8_t *) &comp, NULL, NULL, NULL);
+
+                if (ret < 0) {
+                    return -EIO;
+                }
+
+                acct_update_position(f, sge.length, true);
+
+                return 1;
+            }
+
+            /*
+             * Otherwise, tell other side to register.
+             */
+            reg.current_index = current_index;
+            if (block->is_ram_block) {
+                reg.key.current_addr = current_addr;
+            } else {
+                reg.key.chunk = chunk;
+            }
+            reg.chunks = chunks;
+
+            DDPRINTF("Sending registration request chunk %" PRIu64 " for %d "
+                    "bytes, index: %d, offset: %" PRId64 "...\n",
+                    chunk, sge.length, current_index, current_addr);
+
+            register_to_network(&reg);
+            ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
+                                    &resp, &reg_result_idx, NULL);
+            if (ret < 0) {
+                return ret;
+            }
+
+            /* try to overlap this single registration with the one we sent. */
+            if (qemu_rdma_register_and_get_keys(rdma, block,
+                                                (uint8_t *) sge.addr,
+                                                &sge.lkey, NULL, chunk,
+                                                chunk_start, chunk_end)) {
+                fprintf(stderr, "cannot get lkey!\n");
+                return -EINVAL;
+            }
+
+            reg_result = (RDMARegisterResult *)
+                    rdma->wr_data[reg_result_idx].control_curr;
+
+            network_to_result(reg_result);
+
+            DDPRINTF("Received registration result:"
+                    " my key: %x their key %x, chunk %" PRIu64 "\n",
+                    block->remote_keys[chunk], reg_result->rkey, chunk);
+
+            block->remote_keys[chunk] = reg_result->rkey;
+            block->remote_host_addr = reg_result->host_addr;
+        } else {
+            /* already registered before */
+            if (qemu_rdma_register_and_get_keys(rdma, block,
+                                                (uint8_t *)sge.addr,
+                                                &sge.lkey, NULL, chunk,
+                                                chunk_start, chunk_end)) {
+                fprintf(stderr, "cannot get lkey!\n");
+                return -EINVAL;
+            }
+        }
+
+        send_wr.wr.rdma.rkey = block->remote_keys[chunk];
+    } else {
+        send_wr.wr.rdma.rkey = block->remote_rkey;
+
+        if (qemu_rdma_register_and_get_keys(rdma, block, (uint8_t *)sge.addr,
+                                                     &sge.lkey, NULL, chunk,
+                                                     chunk_start, chunk_end)) {
+            fprintf(stderr, "cannot get lkey!\n");
+            return -EINVAL;
+        }
+    }
+
+    /*
+     * Encode the ram block index and chunk within this wrid.
+     * We will use this information at the time of completion
+     * to figure out which bitmap to check against and then which
+     * chunk in the bitmap to look for.
+     */
+    send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
+                                        current_index, chunk);
+
+    send_wr.opcode = IBV_WR_RDMA_WRITE;
+    send_wr.send_flags = IBV_SEND_SIGNALED;
+    send_wr.sg_list = &sge;
+    send_wr.num_sge = 1;
+    send_wr.wr.rdma.remote_addr = block->remote_host_addr +
+                                (current_addr - block->offset);
+
+    DDDPRINTF("Posting chunk: %" PRIu64 ", addr: %lx"
+              " remote: %lx, bytes %" PRIu32 "\n",
+              chunk, sge.addr, send_wr.wr.rdma.remote_addr,
+              sge.length);
+
+    /*
+     * ibv_post_send() does not return negative error numbers,
+     * per the specification they are positive - no idea why.
+     */
+    ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
+
+    if (ret == ENOMEM) {
+        DDPRINTF("send queue is full. wait a little....\n");
+        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE);
+        if (ret < 0) {
+            fprintf(stderr, "rdma migration: failed to make "
+                            "room in full send queue! %d\n", ret);
+            return ret;
+        }
+
+        goto retry;
+
+    } else if (ret > 0) {
+        perror("rdma migration: post rdma write failed");
+        return -ret;
+    }
+
+    set_bit(chunk, block->transit_bitmap);
+    acct_update_position(f, sge.length, false);
+    rdma->total_writes++;
+
+    return 0;
+}
+
+/*
+ * Push out any unwritten RDMA operations.
+ *
+ * We support sending out multiple chunks at the same time.
+ * Not all of them need to get signaled in the completion queue.
+ */
+static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
+{
+    int ret;
+
+    if (!rdma->current_length) {
+        return 0;
+    }
+
+    ret = qemu_rdma_write_one(f, rdma,
+            rdma->current_index, rdma->current_addr, rdma->current_length);
+
+    if (ret < 0) {
+        return ret;
+    }
+
+    if (ret == 0) {
+        rdma->nb_sent++;
+        DDDPRINTF("sent total: %d\n", rdma->nb_sent);
+    }
+
+    rdma->current_length = 0;
+    rdma->current_addr = 0;
+
+    return 0;
+}
+
+static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
+                    uint64_t offset, uint64_t len)
+{
+    RDMALocalBlock *block =
+        &(rdma->local_ram_blocks.block[rdma->current_index]);
+    uint8_t *host_addr = block->local_host_addr + (offset - block->offset);
+    uint8_t *chunk_end = ram_chunk_end(block, rdma->current_chunk);
+
+    if (rdma->current_length == 0) {
+        return 0;
+    }
+
+    /*
+     * Only merge into chunk sequentially.
+     */
+    if (offset != (rdma->current_addr + rdma->current_length)) {
+        return 0;
+    }
+
+    if (rdma->current_index < 0) {
+        return 0;
+    }
+
+    if (offset < block->offset) {
+        return 0;
+    }
+
+    if ((offset + len) > (block->offset + block->length)) {
+        return 0;
+    }
+
+    if (rdma->current_chunk < 0) {
+        return 0;
+    }
+
+    if ((host_addr + len) > chunk_end) {
+        return 0;
+    }
+
+    return 1;
+}
+
+/*
+ * We're not actually writing here, but doing three things:
+ *
+ * 1. Identify the chunk the buffer belongs to.
+ * 2. If the chunk is full or the buffer doesn't belong to the current
+ *    chunk, then start a new chunk and flush() the old chunk.
+ * 3. To keep the hardware busy, we also group chunks into batches
+ *    and only require that a batch gets acknowledged in the completion
+ *    qeueue instead of each individual chunk.
+ */
+static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
+                           uint64_t block_offset, uint64_t offset,
+                           uint64_t len)
+{
+    uint64_t current_addr = block_offset + offset;
+    uint64_t index = rdma->current_index;
+    uint64_t chunk = rdma->current_chunk;
+    int ret;
+
+    /* If we cannot merge it, we flush the current buffer first. */
+    if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
+        ret = qemu_rdma_write_flush(f, rdma);
+        if (ret) {
+            return ret;
+        }
+        rdma->current_length = 0;
+        rdma->current_addr = current_addr;
+
+        ret = qemu_rdma_search_ram_block(rdma, block_offset,
+                                         offset, len, &index, &chunk);
+        if (ret) {
+            fprintf(stderr, "ram block search failed\n");
+            return ret;
+        }
+        rdma->current_index = index;
+        rdma->current_chunk = chunk;
+    }
+
+    /* merge it */
+    rdma->current_length += len;
+
+    /* flush it if buffer is too large */
+    if (rdma->current_length >= RDMA_MERGE_MAX) {
+        return qemu_rdma_write_flush(f, rdma);
+    }
+
+    return 0;
+}
+
+static void qemu_rdma_cleanup(RDMAContext *rdma)
+{
+    struct rdma_cm_event *cm_event;
+    int ret, idx;
+
+    if (rdma->cm_id) {
+        if (rdma->error_state) {
+            RDMAControlHeader head = { .len = 0,
+                                       .type = RDMA_CONTROL_ERROR,
+                                       .repeat = 1,
+                                     };
+            fprintf(stderr, "Early error. Sending error.\n");
+            qemu_rdma_post_send_control(rdma, NULL, &head);
+        }
+
+        ret = rdma_disconnect(rdma->cm_id);
+        if (!ret) {
+            DDPRINTF("waiting for disconnect\n");
+            ret = rdma_get_cm_event(rdma->channel, &cm_event);
+            if (!ret) {
+                rdma_ack_cm_event(cm_event);
+            }
+        }
+        DDPRINTF("Disconnected.\n");
+        rdma->cm_id = NULL;
+    }
+
+    g_free(rdma->block);
+    rdma->block = NULL;
+
+    for (idx = 0; idx <= RDMA_WRID_MAX; idx++) {
+        if (rdma->wr_data[idx].control_mr) {
+            rdma->total_registrations--;
+            ibv_dereg_mr(rdma->wr_data[idx].control_mr);
+        }
+        rdma->wr_data[idx].control_mr = NULL;
+    }
+
+    if (rdma->local_ram_blocks.block) {
+        while (rdma->local_ram_blocks.nb_blocks) {
+            __qemu_rdma_delete_block(rdma,
+                    rdma->local_ram_blocks.block->offset);
+        }
+    }
+
+    if (rdma->qp) {
+        ibv_destroy_qp(rdma->qp);
+        rdma->qp = NULL;
+    }
+    if (rdma->cq) {
+        ibv_destroy_cq(rdma->cq);
+        rdma->cq = NULL;
+    }
+    if (rdma->comp_channel) {
+        ibv_destroy_comp_channel(rdma->comp_channel);
+        rdma->comp_channel = NULL;
+    }
+    if (rdma->pd) {
+        ibv_dealloc_pd(rdma->pd);
+        rdma->pd = NULL;
+    }
+    if (rdma->listen_id) {
+        rdma_destroy_id(rdma->listen_id);
+        rdma->listen_id = NULL;
+    }
+    if (rdma->cm_id) {
+        rdma_destroy_id(rdma->cm_id);
+        rdma->cm_id = NULL;
+    }
+    if (rdma->channel) {
+        rdma_destroy_event_channel(rdma->channel);
+        rdma->channel = NULL;
+    }
+}
+
+
+static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all)
+{
+    int ret, idx;
+    Error *local_err = NULL, **temp = &local_err;
+
+    /*
+     * Will be validated against destination's actual capabilities
+     * after the connect() completes.
+     */
+    rdma->pin_all = pin_all;
+
+    ret = qemu_rdma_resolve_host(rdma, temp);
+    if (ret) {
+        goto err_rdma_source_init;
+    }
+
+    ret = qemu_rdma_alloc_pd_cq(rdma);
+    if (ret) {
+        ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
+                    " limits may be too low. Please check $ ulimit -a # and "
+                    "search for 'ulimit -l' in the output\n");
+        goto err_rdma_source_init;
+    }
+
+    ret = qemu_rdma_alloc_qp(rdma);
+    if (ret) {
+        ERROR(temp, "rdma migration: error allocating qp!\n");
+        goto err_rdma_source_init;
+    }
+
+    ret = qemu_rdma_init_ram_blocks(rdma);
+    if (ret) {
+        ERROR(temp, "rdma migration: error initializing ram blocks!\n");
+        goto err_rdma_source_init;
+    }
+
+    for (idx = 0; idx <= RDMA_WRID_MAX; idx++) {
+        ret = qemu_rdma_reg_control(rdma, idx);
+        if (ret) {
+            ERROR(temp, "rdma migration: error registering %d control!\n",
+                                                            idx);
+            goto err_rdma_source_init;
+        }
+    }
+
+    return 0;
+
+err_rdma_source_init:
+    error_propagate(errp, local_err);
+    qemu_rdma_cleanup(rdma);
+    return -1;
+}
+
+static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
+{
+    RDMACapabilities cap = {
+                                .version = RDMA_CONTROL_VERSION_CURRENT,
+                                .flags = 0,
+                           };
+    struct rdma_conn_param conn_param = { .initiator_depth = 2,
+                                          .retry_count = 5,
+                                          .private_data = &cap,
+                                          .private_data_len = sizeof(cap),
+                                        };
+    struct rdma_cm_event *cm_event;
+    int ret;
+
+    /*
+     * Only negotiate the capability with destination if the user
+     * on the source first requested the capability.
+     */
+    if (rdma->pin_all) {
+        DPRINTF("Server pin-all memory requested.\n");
+        cap.flags |= RDMA_CAPABILITY_PIN_ALL;
+    }
+
+    caps_to_network(&cap);
+
+    ret = rdma_connect(rdma->cm_id, &conn_param);
+    if (ret) {
+        perror("rdma_connect");
+        ERROR(errp, "connecting to destination!\n");
+        rdma_destroy_id(rdma->cm_id);
+        rdma->cm_id = NULL;
+        goto err_rdma_source_connect;
+    }
+
+    ret = rdma_get_cm_event(rdma->channel, &cm_event);
+    if (ret) {
+        perror("rdma_get_cm_event after rdma_connect");
+        ERROR(errp, "connecting to destination!\n");
+        rdma_ack_cm_event(cm_event);
+        rdma_destroy_id(rdma->cm_id);
+        rdma->cm_id = NULL;
+        goto err_rdma_source_connect;
+    }
+
+    if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
+        perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
+        ERROR(errp, "connecting to destination!\n");
+        rdma_ack_cm_event(cm_event);
+        rdma_destroy_id(rdma->cm_id);
+        rdma->cm_id = NULL;
+        goto err_rdma_source_connect;
+    }
+
+    memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
+    network_to_caps(&cap);
+
+    /*
+     * Verify that the *requested* capabilities are supported by the destination
+     * and disable them otherwise.
+     */
+    if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
+        ERROR(errp, "Server cannot support pinning all memory. "
+                        "Will register memory dynamically.\n");
+        rdma->pin_all = false;
+    }
+
+    DPRINTF("Pin all memory: %s\n", rdma->pin_all ? "enabled" : "disabled");
+
+    rdma_ack_cm_event(cm_event);
+
+    ret = qemu_rdma_post_recv_control(rdma, 0);
+    if (ret) {
+        ERROR(errp, "posting second control recv!\n");
+        goto err_rdma_source_connect;
+    }
+
+    rdma->control_ready_expected = 1;
+    rdma->nb_sent = 0;
+    return 0;
+
+err_rdma_source_connect:
+    qemu_rdma_cleanup(rdma);
+    return -1;
+}
+
+static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
+{
+    int ret = -EINVAL, idx;
+    struct sockaddr_in sin;
+    struct rdma_cm_id *listen_id;
+    char ip[40] = "unknown";
+
+    for (idx = 0; idx <= RDMA_WRID_MAX; idx++) {
+        rdma->wr_data[idx].control_len = 0;
+        rdma->wr_data[idx].control_curr = NULL;
+    }
+
+    if (rdma->host == NULL) {
+        ERROR(errp, "RDMA host is not set!\n");
+        rdma->error_state = -EINVAL;
+        return -1;
+    }
+    /* create CM channel */
+    rdma->channel = rdma_create_event_channel();
+    if (!rdma->channel) {
+        ERROR(errp, "could not create rdma event channel\n");
+        rdma->error_state = -EINVAL;
+        return -1;
+    }
+
+    /* create CM id */
+    ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
+    if (ret) {
+        ERROR(errp, "could not create cm_id!\n");
+        goto err_dest_init_create_listen_id;
+    }
+
+    memset(&sin, 0, sizeof(sin));
+    sin.sin_family = AF_INET;
+    sin.sin_port = htons(rdma->port);
+
+    if (rdma->host && strcmp("", rdma->host)) {
+        struct hostent *dest_addr;
+        dest_addr = gethostbyname(rdma->host);
+        if (!dest_addr) {
+            ERROR(errp, "migration could not gethostbyname!\n");
+            ret = -EINVAL;
+            goto err_dest_init_bind_addr;
+        }
+        memcpy(&sin.sin_addr.s_addr, dest_addr->h_addr,
+                dest_addr->h_length);
+        inet_ntop(AF_INET, dest_addr->h_addr, ip, sizeof ip);
+    } else {
+        sin.sin_addr.s_addr = INADDR_ANY;
+    }
+
+    DPRINTF("%s => %s\n", rdma->host, ip);
+
+    ret = rdma_bind_addr(listen_id, (struct sockaddr *)&sin);
+    if (ret) {
+        ERROR(errp, "Error: could not rdma_bind_addr!\n");
+        goto err_dest_init_bind_addr;
+    }
+
+    rdma->listen_id = listen_id;
+    qemu_rdma_dump_gid("dest_init", listen_id);
+    return 0;
+
+err_dest_init_bind_addr:
+    rdma_destroy_id(listen_id);
+err_dest_init_create_listen_id:
+    rdma_destroy_event_channel(rdma->channel);
+    rdma->channel = NULL;
+    rdma->error_state = ret;
+    return ret;
+
+}
+
+static void *qemu_rdma_data_init(const char *host_port, Error **errp)
+{
+    RDMAContext *rdma = NULL;
+    InetSocketAddress *addr;
+
+    if (host_port) {
+        rdma = g_malloc0(sizeof(RDMAContext));
+        memset(rdma, 0, sizeof(RDMAContext));
+        rdma->current_index = -1;
+        rdma->current_chunk = -1;
+
+        addr = inet_parse(host_port, NULL);
+        if (addr != NULL) {
+            rdma->port = atoi(addr->port);
+            rdma->host = g_strdup(addr->host);
+        } else {
+            ERROR(errp, "bad RDMA migration address '%s'", host_port);
+            g_free(rdma);
+            return NULL;
+        }
+    }
+
+    return rdma;
+}
+
+/*
+ * QEMUFile interface to the control channel.
+ * SEND messages for control only.
+ * pc.ram is handled with regular RDMA messages.
+ */
+static int qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
+                                int64_t pos, int size)
+{
+    QEMUFileRDMA *r = opaque;
+    QEMUFile *f = r->file;
+    RDMAContext *rdma = r->rdma;
+    size_t remaining = size;
+    uint8_t * data = (void *) buf;
+    int ret;
+
+    CHECK_ERROR_STATE();
+
+    /*
+     * Push out any writes that
+     * we're queued up for pc.ram.
+     */
+    ret = qemu_rdma_write_flush(f, rdma);
+    if (ret < 0) {
+        rdma->error_state = ret;
+        return ret;
+    }
+
+    while (remaining) {
+        RDMAControlHeader head;
+
+        r->len = MIN(remaining, RDMA_SEND_INCREMENT);
+        remaining -= r->len;
+
+        head.len = r->len;
+        head.type = RDMA_CONTROL_QEMU_FILE;
+
+        ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
+
+        if (ret < 0) {
+            rdma->error_state = ret;
+            return ret;
+        }
+
+        data += r->len;
+    }
+
+    return size;
+}
+
+static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
+                             int size, int idx)
+{
+    size_t len = 0;
+
+    if (rdma->wr_data[idx].control_len) {
+        DDDPRINTF("RDMA %" PRId64 " of %d bytes already in buffer\n",
+                    rdma->wr_data[idx].control_len, size);
+
+        len = MIN(size, rdma->wr_data[idx].control_len);
+        memcpy(buf, rdma->wr_data[idx].control_curr, len);
+        rdma->wr_data[idx].control_curr += len;
+        rdma->wr_data[idx].control_len -= len;
+    }
+
+    return len;
+}
+
+/*
+ * QEMUFile interface to the control channel.
+ * RDMA links don't use bytestreams, so we have to
+ * return bytes to QEMUFile opportunistically.
+ */
+static int qemu_rdma_get_buffer(void *opaque, uint8_t *buf,
+                                int64_t pos, int size)
+{
+    QEMUFileRDMA *r = opaque;
+    RDMAContext *rdma = r->rdma;
+    RDMAControlHeader head;
+    int ret = 0;
+
+    CHECK_ERROR_STATE();
+
+    /*
+     * First, we hold on to the last SEND message we
+     * were given and dish out the bytes until we run
+     * out of bytes.
+     */
+    r->len = qemu_rdma_fill(r->rdma, buf, size, 0);
+    if (r->len) {
+        return r->len;
+    }
+
+    /*
+     * Once we run out, we block and wait for another
+     * SEND message to arrive.
+     */
+    ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
+
+    if (ret < 0) {
+        rdma->error_state = ret;
+        return ret;
+    }
+
+    /*
+     * SEND was received with new bytes, now try again.
+     */
+    return qemu_rdma_fill(r->rdma, buf, size, 0);
+}
+
+/*
+ * Block until all the outstanding chunks have been delivered by the hardware.
+ */
+static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
+{
+    int ret;
+
+    if (qemu_rdma_write_flush(f, rdma) < 0) {
+        return -EIO;
+    }
+
+    while (rdma->nb_sent) {
+        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE);
+        if (ret < 0) {
+            fprintf(stderr, "rdma migration: complete polling error!\n");
+            return -EIO;
+        }
+    }
+
+    qemu_rdma_unregister_waiting(rdma);
+
+    return 0;
+}
+
+static int qemu_rdma_close(void *opaque)
+{
+    DPRINTF("Shutting down connection.\n");
+    QEMUFileRDMA *r = opaque;
+    if (r->rdma) {
+        qemu_rdma_cleanup(r->rdma);
+        g_free(r->rdma);
+    }
+    g_free(r);
+    return 0;
+}
+
+/*
+ * Parameters:
+ *    @offset == 0 :
+ *        This means that 'block_offset' is a full virtual address that does not
+ *        belong to a RAMBlock of the virtual machine and instead
+ *        represents a private malloc'd memory area that the caller wishes to
+ *        transfer.
+ *
+ *    @offset != 0 :
+ *        Offset is an offset to be added to block_offset and used
+ *        to also lookup the corresponding RAMBlock.
+ *
+ *    @size > 0 :
+ *        Initiate an transfer this size.
+ *
+ *    @size == 0 :
+ *        A 'hint' or 'advice' that means that we wish to speculatively
+ *        and asynchronously unregister this memory. In this case, there is no
+ *        gaurantee that the unregister will actually happen, for example,
+ *        if the memory is being actively transmitted. Additionally, the memory
+ *        may be re-registered at any future time if a write within the same
+ *        chunk was requested again, even if you attempted to unregister it
+ *        here.
+ *
+ *    @size < 0 : TODO, not yet supported
+ *        Unregister the memory NOW. This means that the caller does not
+ *        expect there to be any future RDMA transfers and we just want to clean
+ *        things up. This is used in case the upper layer owns the memory and
+ *        cannot wait for qemu_fclose() to occur.
+ *
+ *    @bytes_sent : User-specificed pointer to indicate how many bytes were
+ *                  sent. Usually, this will not be more than a few bytes of
+ *                  the protocol because most transfers are sent asynchronously.
+ */
+static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
+                                  ram_addr_t block_offset, ram_addr_t offset,
+                                  size_t size, int *bytes_sent)
+{
+    QEMUFileRDMA *rfile = opaque;
+    RDMAContext *rdma = rfile->rdma;
+    int ret;
+
+    CHECK_ERROR_STATE();
+
+    qemu_fflush(f);
+
+    if (size > 0) {
+        /*
+         * Add this page to the current 'chunk'. If the chunk
+         * is full, or the page doen't belong to the current chunk,
+         * an actual RDMA write will occur and a new chunk will be formed.
+         */
+        ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
+        if (ret < 0) {
+            fprintf(stderr, "rdma migration: write error! %d\n", ret);
+            goto err;
+        }
+
+        /*
+         * We always return 1 bytes because the RDMA
+         * protocol is completely asynchronous. We do not yet know
+         * whether an  identified chunk is zero or not because we're
+         * waiting for other pages to potentially be merged with
+         * the current chunk. So, we have to call qemu_update_position()
+         * later on when the actual write occurs.
+         */
+        if (bytes_sent) {
+            *bytes_sent = 1;
+        }
+    } else {
+        uint64_t index, chunk;
+
+        /* TODO: Change QEMUFileOps prototype to be signed: size_t => long
+        if (size < 0) {
+            ret = qemu_rdma_drain_cq(f, rdma);
+            if (ret < 0) {
+                fprintf(stderr, "rdma: failed to synchronously drain"
+                                " completion queue before unregistration.\n");
+                goto err;
+            }
+        }
+        */
+
+        ret = qemu_rdma_search_ram_block(rdma, block_offset,
+                                         offset, size, &index, &chunk);
+
+        if (ret) {
+            fprintf(stderr, "ram block search failed\n");
+            goto err;
+        }
+
+        qemu_rdma_signal_unregister(rdma, index, chunk, 0);
+
+        /*
+         * TODO: Synchronous, gauranteed unregistration (should not occur during
+         * fast-path). Otherwise, unregisters will process on the next call to
+         * qemu_rdma_drain_cq()
+        if (size < 0) {
+            qemu_rdma_unregister_waiting(rdma);
+        }
+        */
+    }
+
+    /*
+     * Drain the Completion Queue if possible, but do not block,
+     * just poll.
+     *
+     * If nothing to poll, the end of the iteration will do this
+     * again to make sure we don't overflow the request queue.
+     */
+    while (1) {
+        uint64_t wr_id, wr_id_in;
+        int ret = qemu_rdma_poll(rdma, &wr_id_in);
+        if (ret < 0) {
+            fprintf(stderr, "rdma migration: polling error! %d\n", ret);
+            goto err;
+        }
+
+        wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
+
+        if (wr_id == RDMA_WRID_NONE) {
+            break;
+        }
+    }
+
+    return RAM_SAVE_CONTROL_DELAYED;
+err:
+    rdma->error_state = ret;
+    return ret;
+}
+
+static int qemu_rdma_accept(RDMAContext *rdma)
+{
+    RDMACapabilities cap;
+    struct rdma_conn_param conn_param = {
+                                            .responder_resources = 2,
+                                            .private_data = &cap,
+                                            .private_data_len = sizeof(cap),
+                                         };
+    struct rdma_cm_event *cm_event;
+    struct ibv_context *verbs;
+    int ret = -EINVAL;
+    int idx;
+
+    ret = rdma_get_cm_event(rdma->channel, &cm_event);
+    if (ret) {
+        goto err_rdma_dest_wait;
+    }
+
+    if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
+        rdma_ack_cm_event(cm_event);
+        goto err_rdma_dest_wait;
+    }
+
+    memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
+
+    network_to_caps(&cap);
+
+    if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
+            fprintf(stderr, "Unknown source RDMA version: %d, bailing...\n",
+                            cap.version);
+            rdma_ack_cm_event(cm_event);
+            goto err_rdma_dest_wait;
+    }
+
+    /*
+     * Respond with only the capabilities this version of QEMU knows about.
+     */
+    cap.flags &= known_capabilities;
+
+    /*
+     * Enable the ones that we do know about.
+     * Add other checks here as new ones are introduced.
+     */
+    if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
+        rdma->pin_all = true;
+    }
+
+    rdma->cm_id = cm_event->id;
+    verbs = cm_event->id->verbs;
+
+    rdma_ack_cm_event(cm_event);
+
+    DPRINTF("Memory pin all: %s\n", rdma->pin_all ? "enabled" : "disabled");
+
+    caps_to_network(&cap);
+
+    DPRINTF("verbs context after listen: %p\n", verbs);
+
+    if (!rdma->verbs) {
+        rdma->verbs = verbs;
+    } else if (rdma->verbs != verbs) {
+            fprintf(stderr, "ibv context not matching %p, %p!\n",
+                    rdma->verbs, verbs);
+            goto err_rdma_dest_wait;
+    }
+
+    qemu_rdma_dump_id("dest_init", verbs);
+
+    ret = qemu_rdma_alloc_pd_cq(rdma);
+    if (ret) {
+        fprintf(stderr, "rdma migration: error allocating pd and cq!\n");
+        goto err_rdma_dest_wait;
+    }
+
+    ret = qemu_rdma_alloc_qp(rdma);
+    if (ret) {
+        fprintf(stderr, "rdma migration: error allocating qp!\n");
+        goto err_rdma_dest_wait;
+    }
+
+    ret = qemu_rdma_init_ram_blocks(rdma);
+    if (ret) {
+        fprintf(stderr, "rdma migration: error initializing ram blocks!\n");
+        goto err_rdma_dest_wait;
+    }
+
+    for (idx = 0; idx <= RDMA_WRID_MAX; idx++) {
+        ret = qemu_rdma_reg_control(rdma, idx);
+        if (ret) {
+            fprintf(stderr, "rdma: error registering %d control!\n", idx);
+            goto err_rdma_dest_wait;
+        }
+    }
+
+    qemu_set_fd_handler2(rdma->channel->fd, NULL, NULL, NULL, NULL);
+
+    ret = rdma_accept(rdma->cm_id, &conn_param);
+    if (ret) {
+        fprintf(stderr, "rdma_accept returns %d!\n", ret);
+        goto err_rdma_dest_wait;
+    }
+
+    ret = rdma_get_cm_event(rdma->channel, &cm_event);
+    if (ret) {
+        fprintf(stderr, "rdma_accept get_cm_event failed %d!\n", ret);
+        goto err_rdma_dest_wait;
+    }
+
+    if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
+        fprintf(stderr, "rdma_accept not event established!\n");
+        rdma_ack_cm_event(cm_event);
+        goto err_rdma_dest_wait;
+    }
+
+    rdma_ack_cm_event(cm_event);
+
+    ret = qemu_rdma_post_recv_control(rdma, 0);
+    if (ret) {
+        fprintf(stderr, "rdma migration: error posting second control recv!\n");
+        goto err_rdma_dest_wait;
+    }
+
+    qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
+
+    return 0;
+
+err_rdma_dest_wait:
+    rdma->error_state = ret;
+    qemu_rdma_cleanup(rdma);
+    return ret;
+}
+
+/*
+ * During each iteration of the migration, we listen for instructions
+ * by the source VM to perform dynamic page registrations before they
+ * can perform RDMA operations.
+ *
+ * We respond with the 'rkey'.
+ *
+ * Keep doing this until the source tells us to stop.
+ */
+static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
+                                         uint64_t flags)
+{
+    RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
+                               .type = RDMA_CONTROL_REGISTER_RESULT,
+                               .repeat = 0,
+                             };
+    RDMAControlHeader unreg_resp = { .len = 0,
+                               .type = RDMA_CONTROL_UNREGISTER_FINISHED,
+                               .repeat = 0,
+                             };
+    RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
+                                 .repeat = 1 };
+    QEMUFileRDMA *rfile = opaque;
+    RDMAContext *rdma = rfile->rdma;
+    RDMALocalBlocks *local = &rdma->local_ram_blocks;
+    RDMAControlHeader head;
+    RDMARegister *reg, *registers;
+    RDMACompress *comp;
+    RDMARegisterResult *reg_result;
+    static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
+    RDMALocalBlock *block;
+    void *host_addr;
+    int ret = 0;
+    int idx = 0;
+    int count = 0;
+    int i = 0;
+
+    CHECK_ERROR_STATE();
+
+    do {
+        DDDPRINTF("Waiting for next request %" PRIu64 "...\n", flags);
+
+        ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
+
+        if (ret < 0) {
+            break;
+        }
+
+        if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
+            fprintf(stderr, "rdma: Too many requests in this message (%d)."
+                            "Bailing.\n", head.repeat);
+            ret = -EIO;
+            break;
+        }
+
+        switch (head.type) {
+        case RDMA_CONTROL_COMPRESS:
+            comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
+            network_to_compress(comp);
+
+            DDPRINTF("Zapping zero chunk: %" PRId64
+                    " bytes, index %d, offset %" PRId64 "\n",
+                    comp->length, comp->block_idx, comp->offset);
+            block = &(rdma->local_ram_blocks.block[comp->block_idx]);
+
+            host_addr = block->local_host_addr +
+                            (comp->offset - block->offset);
+
+            ram_handle_compressed(host_addr, comp->value, comp->length);
+            break;
+
+        case RDMA_CONTROL_REGISTER_FINISHED:
+            DDDPRINTF("Current registrations complete.\n");
+            goto out;
+
+        case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
+            DPRINTF("Initial setup info requested.\n");
+
+            if (rdma->pin_all) {
+                ret = qemu_rdma_reg_whole_ram_blocks(rdma);
+                if (ret) {
+                    fprintf(stderr, "rdma migration: error dest "
+                                    "registering ram blocks!\n");
+                    goto out;
+                }
+            }
+
+            /*
+             * Dest uses this to prepare to transmit the RAMBlock descriptions
+             * to the source VM after connection setup.
+             * Both sides use the "remote" structure to communicate and update
+             * their "local" descriptions with what was sent.
+             */
+            for (i = 0; i < local->nb_blocks; i++) {
+                rdma->block[i].remote_host_addr =
+                    (uint64_t)(local->block[i].local_host_addr);
+
+                if (rdma->pin_all) {
+                    rdma->block[i].remote_rkey = local->block[i].mr->rkey;
+                }
+
+                rdma->block[i].offset = local->block[i].offset;
+                rdma->block[i].length = local->block[i].length;
+
+                remote_block_to_network(&rdma->block[i]);
+            }
+
+            blocks.len = rdma->local_ram_blocks.nb_blocks
+                                                * sizeof(RDMARemoteBlock);
+
+
+            ret = qemu_rdma_post_send_control(rdma,
+                                        (uint8_t *) rdma->block, &blocks);
+
+            if (ret < 0) {
+                fprintf(stderr, "rdma migration: error sending remote info!\n");
+                goto out;
+            }
+
+            break;
+        case RDMA_CONTROL_REGISTER_REQUEST:
+            DDPRINTF("There are %d registration requests\n", head.repeat);
+
+            reg_resp.repeat = head.repeat;
+            registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
+
+            for (count = 0; count < head.repeat; count++) {
+                uint64_t chunk;
+                uint8_t *chunk_start, *chunk_end;
+
+                reg = &registers[count];
+                network_to_register(reg);
+
+                reg_result = &results[count];
+
+                DDPRINTF("Registration request (%d): index %d, current_addr %"
+                         PRIu64 " chunks: %" PRIu64 "\n", count,
+                         reg->current_index, reg->key.current_addr, reg->chunks);
+
+                block = &(rdma->local_ram_blocks.block[reg->current_index]);
+                if (block->is_ram_block) {
+                    host_addr = (block->local_host_addr +
+                                (reg->key.current_addr - block->offset));
+                    chunk = ram_chunk_index(block->local_host_addr,
+                                            (uint8_t *) host_addr);
+                } else {
+                    chunk = reg->key.chunk;
+                    host_addr = block->local_host_addr +
+                        (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
+                }
+                chunk_start = ram_chunk_start(block, chunk);
+                chunk_end = ram_chunk_end(block, chunk + reg->chunks);
+                if (qemu_rdma_register_and_get_keys(rdma, block,
+                            (uint8_t *)host_addr, NULL, &reg_result->rkey,
+                            chunk, chunk_start, chunk_end)) {
+                    fprintf(stderr, "cannot get rkey!\n");
+                    ret = -EINVAL;
+                    goto out;
+                }
+
+                reg_result->host_addr = (uint64_t) block->local_host_addr;
+
+                DDPRINTF("Registered rkey for this request: %x\n",
+                                reg_result->rkey);
+
+                result_to_network(reg_result);
+            }
+
+            ret = qemu_rdma_post_send_control(rdma,
+                            (uint8_t *) results, &reg_resp);
+
+            if (ret < 0) {
+                fprintf(stderr, "Failed to send control buffer!\n");
+                goto out;
+            }
+            break;
+        case RDMA_CONTROL_UNREGISTER_REQUEST:
+            DDPRINTF("There are %d unregistration requests\n", head.repeat);
+            unreg_resp.repeat = head.repeat;
+            registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
+
+            for (count = 0; count < head.repeat; count++) {
+                reg = &registers[count];
+                network_to_register(reg);
+
+                DDPRINTF("Unregistration request (%d): "
+                         " index %d, chunk %" PRIu64 "\n",
+                         count, reg->current_index, reg->key.chunk);
+
+                block = &(rdma->local_ram_blocks.block[reg->current_index]);
+
+                ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
+                block->pmr[reg->key.chunk] = NULL;
+
+                if (ret != 0) {
+                    perror("rdma unregistration chunk failed");
+                    ret = -ret;
+                    goto out;
+                }
+
+                rdma->total_registrations--;
+
+                DDPRINTF("Unregistered chunk %" PRIu64 " successfully.\n",
+                            reg->key.chunk);
+            }
+
+            ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
+
+            if (ret < 0) {
+                fprintf(stderr, "Failed to send control buffer!\n");
+                goto out;
+            }
+            break;
+        case RDMA_CONTROL_REGISTER_RESULT:
+            fprintf(stderr, "Invalid RESULT message at dest.\n");
+            ret = -EIO;
+            goto out;
+        default:
+            fprintf(stderr, "Unknown control message %s\n",
+                                control_desc[head.type]);
+            ret = -EIO;
+            goto out;
+        }
+    } while (1);
+out:
+    if (ret < 0) {
+        rdma->error_state = ret;
+    }
+    return ret;
+}
+
+static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
+                                        uint64_t flags)
+{
+    QEMUFileRDMA *rfile = opaque;
+    RDMAContext *rdma = rfile->rdma;
+
+    CHECK_ERROR_STATE();
+
+    DDDPRINTF("start section: %" PRIu64 "\n", flags);
+    qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
+    qemu_fflush(f);
+
+    return 0;
+}
+
+/*
+ * Inform dest that dynamic registrations are done for now.
+ * First, flush writes, if any.
+ */
+static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
+                                       uint64_t flags)
+{
+    Error *local_err = NULL, **errp = &local_err;
+    QEMUFileRDMA *rfile = opaque;
+    RDMAContext *rdma = rfile->rdma;
+    RDMAControlHeader head = { .len = 0, .repeat = 1 };
+    int ret = 0;
+
+    CHECK_ERROR_STATE();
+
+    qemu_fflush(f);
+    ret = qemu_rdma_drain_cq(f, rdma);
+
+    if (ret < 0) {
+        goto err;
+    }
+
+    if (flags == RAM_CONTROL_SETUP) {
+        RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
+        RDMALocalBlocks *local = &rdma->local_ram_blocks;
+        int reg_result_idx, i, j, nb_remote_blocks;
+
+        head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
+        DPRINTF("Sending registration setup for ram blocks...\n");
+
+        /*
+         * Make sure that we parallelize the pinning on both sides.
+         * For very large guests, doing this serially takes a really
+         * long time, so we have to 'interleave' the pinning locally
+         * with the control messages by performing the pinning on this
+         * side before we receive the control response from the other
+         * side that the pinning has completed.
+         */
+        ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
+                    &reg_result_idx, rdma->pin_all ?
+                    qemu_rdma_reg_whole_ram_blocks : NULL);
+        if (ret < 0) {
+            ERROR(errp, "receiving remote info!\n");
+            return ret;
+        }
+
+        qemu_rdma_move_header(rdma, reg_result_idx, &resp);
+        memcpy(rdma->block,
+            rdma->wr_data[reg_result_idx].control_curr, resp.len);
+
+        nb_remote_blocks = resp.len / sizeof(RDMARemoteBlock);
+
+        /*
+         * The protocol uses two different sets of rkeys (mutually exclusive):
+         * 1. One key to represent the virtual address of the entire ram block.
+         *    (dynamic chunk registration disabled - pin everything with one rkey.)
+         * 2. One to represent individual chunks within a ram block.
+         *    (dynamic chunk registration enabled - pin individual chunks.)
+         *
+         * Once the capability is successfully negotiated, the destination transmits
+         * the keys to use (or sends them later) including the virtual addresses
+         * and then propagates the remote ram block descriptions to his local copy.
+         */
+
+        if (local->nb_blocks != nb_remote_blocks) {
+            ERROR(errp, "ram blocks mismatch #1! "
+                        "Your QEMU command line parameters are probably "
+                        "not identical on both the source and destination.\n");
+            return -EINVAL;
+        }
+
+        for (i = 0; i < nb_remote_blocks; i++) {
+            network_to_remote_block(&rdma->block[i]);
+
+            /* search local ram blocks */
+            for (j = 0; j < local->nb_blocks; j++) {
+                if (rdma->block[i].offset != local->block[j].offset) {
+                    continue;
+                }
+
+                if (rdma->block[i].length != local->block[j].length) {
+                    ERROR(errp, "ram blocks mismatch #2! "
+                        "Your QEMU command line parameters are probably "
+                        "not identical on both the source and destination.\n");
+                    return -EINVAL;
+                }
+                local->block[j].remote_host_addr =
+                        rdma->block[i].remote_host_addr;
+                local->block[j].remote_rkey = rdma->block[i].remote_rkey;
+                break;
+            }
+
+            if (j >= local->nb_blocks) {
+                ERROR(errp, "ram blocks mismatch #3! "
+                        "Your QEMU command line parameters are probably "
+                        "not identical on both the source and destination.\n");
+                return -EINVAL;
+            }
+        }
+    }
+
+    DDDPRINTF("Sending registration finish %" PRIu64 "...\n", flags);
+
+    head.type = RDMA_CONTROL_REGISTER_FINISHED;
+    ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
+
+    if (ret < 0) {
+        goto err;
+    }
+
+    return 0;
+err:
+    rdma->error_state = ret;
+    return ret;
+}
+
+static int qemu_rdma_get_fd(void *opaque)
+{
+    QEMUFileRDMA *rfile = opaque;
+    RDMAContext *rdma = rfile->rdma;
+
+    return rdma->comp_channel->fd;
+}
+
+const QEMUFileOps rdma_read_ops = {
+    .get_buffer    = qemu_rdma_get_buffer,
+    .get_fd        = qemu_rdma_get_fd,
+    .close         = qemu_rdma_close,
+    .hook_ram_load = qemu_rdma_registration_handle,
+};
+
+const QEMUFileOps rdma_write_ops = {
+    .put_buffer         = qemu_rdma_put_buffer,
+    .close              = qemu_rdma_close,
+    .before_ram_iterate = qemu_rdma_registration_start,
+    .after_ram_iterate  = qemu_rdma_registration_stop,
+    .save_page          = qemu_rdma_save_page,
+};
+
+static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
+{
+    QEMUFileRDMA *r = g_malloc0(sizeof(QEMUFileRDMA));
+
+    if (qemu_file_mode_is_not_valid(mode)) {
+        return NULL;
+    }
+
+    r->rdma = rdma;
+
+    if (mode[0] == 'w') {
+        r->file = qemu_fopen_ops(r, &rdma_write_ops);
+    } else {
+        r->file = qemu_fopen_ops(r, &rdma_read_ops);
+    }
+
+    return r->file;
+}
+
+static void rdma_accept_incoming_migration(void *opaque)
+{
+    RDMAContext *rdma = opaque;
+    int ret;
+    QEMUFile *f;
+    Error *local_err = NULL, **errp = &local_err;
+
+    DPRINTF("Accepting rdma connection...\n");
+    ret = qemu_rdma_accept(rdma);
+
+    if (ret) {
+        ERROR(errp, "RDMA Migration initialization failed!\n");
+        return;
+    }
+
+    DPRINTF("Accepted migration\n");
+
+    f = qemu_fopen_rdma(rdma, "rb");
+    if (f == NULL) {
+        ERROR(errp, "could not qemu_fopen_rdma!\n");
+        qemu_rdma_cleanup(rdma);
+        return;
+    }
+
+    rdma->migration_started_on_destination = 1;
+    process_incoming_migration(f);
+}
+
+void rdma_start_incoming_migration(const char *host_port, Error **errp)
+{
+    int ret;
+    RDMAContext *rdma;
+    Error *local_err = NULL;
+
+    DPRINTF("Starting RDMA-based incoming migration\n");
+    rdma = qemu_rdma_data_init(host_port, &local_err);
+
+    if (rdma == NULL) {
+        goto err;
+    }
+
+    ret = qemu_rdma_dest_init(rdma, &local_err);
+
+    if (ret) {
+        goto err;
+    }
+
+    DPRINTF("qemu_rdma_dest_init success\n");
+
+    ret = rdma_listen(rdma->listen_id, 5);
+
+    if (ret) {
+        ERROR(errp, "listening on socket!\n");
+        goto err;
+    }
+
+    DPRINTF("rdma_listen success\n");
+
+    qemu_set_fd_handler2(rdma->channel->fd, NULL,
+                         rdma_accept_incoming_migration, NULL,
+                            (void *)(intptr_t) rdma);
+    return;
+err:
+    error_propagate(errp, local_err);
+    g_free(rdma);
+}
+
+void rdma_start_outgoing_migration(void *opaque,
+                            const char *host_port, Error **errp)
+{
+    MigrationState *s = opaque;
+    Error *local_err = NULL, **temp = &local_err;
+    RDMAContext *rdma = qemu_rdma_data_init(host_port, &local_err);
+    int ret = 0;
+
+    if (rdma == NULL) {
+        ERROR(temp, "Failed to initialize RDMA data structures! %d\n", ret);
+        goto err;
+    }
+
+    ret = qemu_rdma_source_init(rdma, &local_err,
+        s->enabled_capabilities[MIGRATION_CAPABILITY_X_RDMA_PIN_ALL]);
+
+    if (ret) {
+        goto err;
+    }
+
+    DPRINTF("qemu_rdma_source_init success\n");
+    ret = qemu_rdma_connect(rdma, &local_err);
+
+    if (ret) {
+        goto err;
+    }
+
+    DPRINTF("qemu_rdma_source_connect success\n");
+
+    s->file = qemu_fopen_rdma(rdma, "wb");
+    migrate_fd_connect(s);
+    return;
+err:
+    error_propagate(errp, local_err);
+    g_free(rdma);
+    migrate_fd_error(s);
+}
diff --git a/migration.c b/migration.c
index 635a7e7..2bfe245 100644
--- a/migration.c
+++ b/migration.c
@@ -78,6 +78,10 @@ void qemu_start_incoming_migration(const char *uri, Error **errp)
 
     if (strstart(uri, "tcp:", &p))
         tcp_start_incoming_migration(p, errp);
+#ifdef CONFIG_RDMA
+    else if (strstart(uri, "x-rdma:", &p))
+        rdma_start_incoming_migration(p, errp);
+#endif
 #if !defined(WIN32)
     else if (strstart(uri, "exec:", &p))
         exec_start_incoming_migration(p, errp);
@@ -406,6 +410,10 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk,
 
     if (strstart(uri, "tcp:", &p)) {
         tcp_start_outgoing_migration(s, p, &local_err);
+#ifdef CONFIG_RDMA
+    } else if (strstart(uri, "x-rdma:", &p)) {
+        rdma_start_outgoing_migration(s, p, &local_err);
+#endif
 #if !defined(WIN32)
     } else if (strstart(uri, "exec:", &p)) {
         exec_start_outgoing_migration(s, p, &local_err);
-- 
1.7.10.4

^ permalink raw reply related	[flat|nested] 13+ messages in thread

end of thread, other threads:[~2013-07-22 14:03 UTC | newest]

Thread overview: 13+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2013-07-16 16:48 [Qemu-devel] [PATCH v3 resend 0/8] rdma: core logic mrhines
2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 1/8] rdma: update documentation to reflect new unpin support mrhines
2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 2/8] rdma: bugfix: ram_control_save_page() mrhines
2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 3/8] rdma: introduce ram_handle_compressed() mrhines
2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 4/8] rdma: core logic mrhines
2013-07-18  7:30   ` Marcel Apfelbaum
2013-07-18 12:07     ` Michael R. Hines
2013-07-18 12:22       ` Paolo Bonzini
2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 5/8] rdma: send pc.ram mrhines
2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 6/8] rdma: allow state transitions between other states besides ACTIVE mrhines
2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 7/8] rdma: introduce MIG_STATE_NONE and change MIG_STATE_SETUP state transition mrhines
2013-07-16 16:48 ` [Qemu-devel] [PATCH v3 resend 8/8] rdma: account for the time spent in MIG_STATE_SETUP through QMP mrhines
2013-07-22 14:01 [Qemu-devel] [PATCH v3 resend 0/8] rdma: core logic mrhines
2013-07-22 14:01 ` [Qemu-devel] [PATCH v3 resend 4/8] " mrhines

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.