All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH v4 0/6] colo: Introduce resource agent and test suite/CI
@ 2021-02-07 15:54 Lukas Straub
  2021-02-07 15:55 ` [PATCH v4 1/6] avocado_qemu: Introduce pick_qemu_util to pick qemu utility binaries Lukas Straub
                   ` (5 more replies)
  0 siblings, 6 replies; 7+ messages in thread
From: Lukas Straub @ 2021-02-07 15:54 UTC (permalink / raw)
  To: qemu-devel
  Cc: Philippe Mathieu-Daudé, Wainer dos Santos Moschetta, Cleber Rosa

[-- Attachment #1: Type: text/plain, Size: 3041 bytes --]

Hello Everyone,
So here is v4.

Regards,
Lukas Straub

Changes:

v4:
 -use new yank api that finally has been merged
 -cleanup the test a bit by using numbers instead of "hosta" and "hostb"
 -resource-agent: Don't set master-score to 0 on invalid configuration

v3:
 -resource-agent: Don't determine local qemu state by remote master-score, query
  directly via qmp instead
 -resource-agent: Add max_queue_size parameter for colo-compare
 -resource-agent: Fix monitor action on secondary returning error during
  clean shutdown
 -resource-agent: Fix stop action setting master-score to 0 on primary on
  clean shutdown

v2:
 -use new yank api
 -drop disk_size parameter
 -introduce pick_qemu_util function and use it

Overview:

Hello Everyone,
These patches introduce a resource agent for fully automatic management of colo
and a test suite building upon the resource agent to extensively test colo.

Test suite features:
-Tests failover with peer crashing and hanging and failover during checkpoint
-Tests network using ssh and iperf3
-Quick test requires no special configuration
-Network test for testing colo-compare
-Stress test: failover all the time with network load

Resource agent features:
-Fully automatic management of colo
-Handles many failures: hanging/crashing qemu, replication error, disk error, ...
-Recovers from hanging qemu by using the "yank" oob command
-Tracks which node has up-to-date data
-Works well in clusters with more than 2 nodes

Run times on my laptop:
Quick test: 200s
Network test: 800s (tagged as slow)
Stress test: 1300s (tagged as slow)

For the last two tests, the test suite needs access to a network bridge to
properly test the network, so some parameters need to be given to the test
run. See tests/acceptance/colo.py for more information.

Regards,
Lukas Straub

Lukas Straub (6):
  avocado_qemu: Introduce pick_qemu_util to pick qemu utility binaries
  boot_linux.py: Use pick_qemu_util
  colo: Introduce resource agent
  colo: Introduce high-level test suite
  configure,Makefile: Install colo resource-agent
  MAINTAINERS: Add myself as maintainer for COLO resource agent

 MAINTAINERS                               |    6 +
 configure                                 |    7 +
 meson.build                               |    5 +
 meson_options.txt                         |    2 +
 scripts/colo-resource-agent/colo          | 1527 +++++++++++++++++++++
 scripts/colo-resource-agent/crm_master    |   44 +
 scripts/colo-resource-agent/crm_resource  |   12 +
 tests/acceptance/avocado_qemu/__init__.py |   15 +
 tests/acceptance/boot_linux.py            |   11 +-
 tests/acceptance/colo.py                  |  654 +++++++++
 10 files changed, 2274 insertions(+), 9 deletions(-)
 create mode 100755 scripts/colo-resource-agent/colo
 create mode 100755 scripts/colo-resource-agent/crm_master
 create mode 100755 scripts/colo-resource-agent/crm_resource
 create mode 100644 tests/acceptance/colo.py

--
2.30.0

[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]

^ permalink raw reply	[flat|nested] 7+ messages in thread

* [PATCH v4 1/6] avocado_qemu: Introduce pick_qemu_util to pick qemu utility binaries
  2021-02-07 15:54 [PATCH v4 0/6] colo: Introduce resource agent and test suite/CI Lukas Straub
@ 2021-02-07 15:55 ` Lukas Straub
  2021-02-07 15:55 ` [PATCH v4 2/6] boot_linux.py: Use pick_qemu_util Lukas Straub
                   ` (4 subsequent siblings)
  5 siblings, 0 replies; 7+ messages in thread
From: Lukas Straub @ 2021-02-07 15:55 UTC (permalink / raw)
  To: qemu-devel
  Cc: Philippe Mathieu-Daudé, Wainer dos Santos Moschetta, Cleber Rosa

[-- Attachment #1: Type: text/plain, Size: 1504 bytes --]

This introduces a generic function to pick qemu utility binaries
from the build dir, system or via test parameter.

Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
 tests/acceptance/avocado_qemu/__init__.py | 15 +++++++++++++++
 1 file changed, 15 insertions(+)

diff --git a/tests/acceptance/avocado_qemu/__init__.py b/tests/acceptance/avocado_qemu/__init__.py
index bf54e419da..1f8c41cee0 100644
--- a/tests/acceptance/avocado_qemu/__init__.py
+++ b/tests/acceptance/avocado_qemu/__init__.py
@@ -15,6 +15,7 @@ import uuid
 import tempfile

 import avocado
+from avocado.utils.path import find_command

 #: The QEMU build root directory.  It may also be the source directory
 #: if building from the source dir, but it's safer to use BUILD_DIR for
@@ -146,6 +147,20 @@ def exec_command_and_wait_for_pattern(test, command,
     _console_interaction(test, success_message, failure_message, command + '\r')

 class Test(avocado.Test):
+    def pick_qemu_util(self, util):
+        default = os.path.join(BUILD_DIR, util)
+        if not os.path.exists(default):
+            default = find_command(default, False)
+            if not default:
+                default = None
+
+        ret = self.params.get(util, default=default)
+
+        if ret is None:
+            self.cancel("Could not find \"%s\"" % util)
+
+        return ret
+
     def _get_unique_tag_val(self, tag_name):
         """
         Gets a tag value, if unique for a key
--
2.30.0


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]

^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH v4 2/6] boot_linux.py: Use pick_qemu_util
  2021-02-07 15:54 [PATCH v4 0/6] colo: Introduce resource agent and test suite/CI Lukas Straub
  2021-02-07 15:55 ` [PATCH v4 1/6] avocado_qemu: Introduce pick_qemu_util to pick qemu utility binaries Lukas Straub
@ 2021-02-07 15:55 ` Lukas Straub
  2021-02-07 15:55 ` [PATCH v4 3/6] colo: Introduce resource agent Lukas Straub
                   ` (3 subsequent siblings)
  5 siblings, 0 replies; 7+ messages in thread
From: Lukas Straub @ 2021-02-07 15:55 UTC (permalink / raw)
  To: qemu-devel
  Cc: Philippe Mathieu-Daudé, Wainer dos Santos Moschetta, Cleber Rosa

[-- Attachment #1: Type: text/plain, Size: 1322 bytes --]

Replace duplicate code with pick_qemu_util.

Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
 tests/acceptance/boot_linux.py | 11 ++---------
 1 file changed, 2 insertions(+), 9 deletions(-)

diff --git a/tests/acceptance/boot_linux.py b/tests/acceptance/boot_linux.py
index 1da4a53d6a..38029f8c70 100644
--- a/tests/acceptance/boot_linux.py
+++ b/tests/acceptance/boot_linux.py
@@ -31,15 +31,8 @@ class BootLinuxBase(Test):
     def download_boot(self):
         self.log.debug('Looking for and selecting a qemu-img binary to be '
                        'used to create the bootable snapshot image')
-        # If qemu-img has been built, use it, otherwise the system wide one
-        # will be used.  If none is available, the test will cancel.
-        qemu_img = os.path.join(BUILD_DIR, 'qemu-img')
-        if not os.path.exists(qemu_img):
-            qemu_img = find_command('qemu-img', False)
-        if qemu_img is False:
-            self.cancel('Could not find "qemu-img", which is required to '
-                        'create the bootable image')
-        vmimage.QEMU_IMG = qemu_img
+
+        vmimage.QEMU_IMG = self.pick_qemu_util("qemu-img")

         self.log.info('Downloading/preparing boot image')
         # Fedora 31 only provides ppc64le images
--
2.30.0


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]

^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH v4 3/6] colo: Introduce resource agent
  2021-02-07 15:54 [PATCH v4 0/6] colo: Introduce resource agent and test suite/CI Lukas Straub
  2021-02-07 15:55 ` [PATCH v4 1/6] avocado_qemu: Introduce pick_qemu_util to pick qemu utility binaries Lukas Straub
  2021-02-07 15:55 ` [PATCH v4 2/6] boot_linux.py: Use pick_qemu_util Lukas Straub
@ 2021-02-07 15:55 ` Lukas Straub
  2021-02-07 15:55 ` [PATCH v4 4/6] colo: Introduce high-level test suite Lukas Straub
                   ` (2 subsequent siblings)
  5 siblings, 0 replies; 7+ messages in thread
From: Lukas Straub @ 2021-02-07 15:55 UTC (permalink / raw)
  To: qemu-devel
  Cc: Philippe Mathieu-Daudé, Wainer dos Santos Moschetta, Cleber Rosa

[-- Attachment #1: Type: text/plain, Size: 63723 bytes --]

Introduce a resource agent which can be used to manage qemu COLO
in a pacemaker cluster.

Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
 scripts/colo-resource-agent/colo | 1527 ++++++++++++++++++++++++++++++
 1 file changed, 1527 insertions(+)
 create mode 100755 scripts/colo-resource-agent/colo

diff --git a/scripts/colo-resource-agent/colo b/scripts/colo-resource-agent/colo
new file mode 100755
index 0000000000..dc53c2e601
--- /dev/null
+++ b/scripts/colo-resource-agent/colo
@@ -0,0 +1,1527 @@
+#!/usr/bin/env python3
+
+# Resource agent for qemu COLO for use with Pacemaker CRM
+#
+# Copyright (c) Lukas Straub <lukasstraub2@web.de>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later.  See the COPYING file in the top-level directory.
+
+import subprocess
+import sys
+import os
+import os.path
+import signal
+import socket
+import select
+import json
+import re
+import time
+import logging
+import logging.handlers
+
+# Constants
+OCF_SUCCESS = 0
+OCF_ERR_GENERIC = 1
+OCF_ERR_ARGS = 2
+OCF_ERR_UNIMPLEMENTED = 3
+OCF_ERR_PERM = 4
+OCF_ERR_INSTALLED = 5
+OCF_ERR_CONFIGURED = 6
+OCF_NOT_RUNNING = 7
+OCF_RUNNING_MASTER = 8
+OCF_FAILED_MASTER = 9
+
+# Get environment variables
+OCF_RESKEY_CRM_meta_notify_type \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_type")
+OCF_RESKEY_CRM_meta_notify_operation \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_operation")
+OCF_RESKEY_CRM_meta_notify_key_operation \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_key_operation")
+OCF_RESKEY_CRM_meta_notify_start_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_start_uname", "")
+OCF_RESKEY_CRM_meta_notify_stop_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_stop_uname", "")
+OCF_RESKEY_CRM_meta_notify_active_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_active_uname", "")
+OCF_RESKEY_CRM_meta_notify_promote_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_promote_uname", "")
+OCF_RESKEY_CRM_meta_notify_demote_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_demote_uname", "")
+OCF_RESKEY_CRM_meta_notify_master_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_master_uname", "")
+OCF_RESKEY_CRM_meta_notify_slave_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_slave_uname", "")
+
+HA_RSCTMP = os.getenv("HA_RSCTMP", "/run/resource-agents")
+HA_LOGFACILITY = os.getenv("HA_LOGFACILITY")
+HA_LOGFILE = os.getenv("HA_LOGFILE")
+HA_DEBUG = os.getenv("HA_debug", "0")
+HA_DEBUGLOG = os.getenv("HA_DEBUGLOG")
+OCF_RESOURCE_INSTANCE = os.getenv("OCF_RESOURCE_INSTANCE", "default-instance")
+OCF_RESKEY_CRM_meta_timeout \
+    = os.getenv("OCF_RESKEY_CRM_meta_timeout", "60000")
+OCF_RESKEY_CRM_meta_interval \
+    = int(os.getenv("OCF_RESKEY_CRM_meta_interval", "1"))
+OCF_RESKEY_CRM_meta_clone_max \
+    = int(os.getenv("OCF_RESKEY_CRM_meta_clone_max", "1"))
+OCF_RESKEY_CRM_meta_clone_node_max \
+    = int(os.getenv("OCF_RESKEY_CRM_meta_clone_node_max", "1"))
+OCF_RESKEY_CRM_meta_master_max \
+    = int(os.getenv("OCF_RESKEY_CRM_meta_master_max", "1"))
+OCF_RESKEY_CRM_meta_master_node_max \
+    = int(os.getenv("OCF_RESKEY_CRM_meta_master_node_max", "1"))
+OCF_RESKEY_CRM_meta_notify \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify")
+OCF_RESKEY_CRM_meta_globally_unique \
+    = os.getenv("OCF_RESKEY_CRM_meta_globally_unique")
+
+HOSTNAME = os.getenv("OCF_RESKEY_CRM_meta_on_node", socket.gethostname())
+
+OCF_ACTION = os.getenv("__OCF_ACTION")
+if not OCF_ACTION and len(sys.argv) == 2:
+    OCF_ACTION = sys.argv[1]
+
+# Resource parameters
+OCF_RESKEY_qemu_binary_default = "qemu-system-x86_64"
+OCF_RESKEY_qemu_img_binary_default = "qemu-img"
+OCF_RESKEY_log_dir_default = HA_RSCTMP
+OCF_RESKEY_options_default = ""
+OCF_RESKEY_active_hidden_dir_default = ""
+OCF_RESKEY_listen_address_default = "0.0.0.0"
+OCF_RESKEY_base_port_default = "9000"
+OCF_RESKEY_checkpoint_interval_default = "20000"
+OCF_RESKEY_compare_timeout_default = "3000"
+OCF_RESKEY_expired_scan_cycle_default = "3000"
+OCF_RESKEY_max_queue_size_default = "1024"
+OCF_RESKEY_use_filter_rewriter_default = "true"
+OCF_RESKEY_vnet_hdr_default = "false"
+OCF_RESKEY_max_disk_errors_default = "1"
+OCF_RESKEY_monitor_timeout_default = "20000"
+OCF_RESKEY_yank_timeout_default = "10000"
+OCF_RESKEY_fail_fast_timeout_default = "5000"
+OCF_RESKEY_debug_default = "0"
+
+OCF_RESKEY_qemu_binary \
+    = os.getenv("OCF_RESKEY_qemu_binary", OCF_RESKEY_qemu_binary_default)
+OCF_RESKEY_qemu_img_binary \
+    = os.getenv("OCF_RESKEY_qemu_img_binary", OCF_RESKEY_qemu_img_binary_default)
+OCF_RESKEY_log_dir \
+    = os.getenv("OCF_RESKEY_log_dir", OCF_RESKEY_log_dir_default)
+OCF_RESKEY_options \
+    = os.getenv("OCF_RESKEY_options", OCF_RESKEY_options_default)
+OCF_RESKEY_active_hidden_dir \
+    = os.getenv("OCF_RESKEY_active_hidden_dir", OCF_RESKEY_active_hidden_dir_default)
+OCF_RESKEY_listen_address \
+    = os.getenv("OCF_RESKEY_listen_address", OCF_RESKEY_listen_address_default)
+OCF_RESKEY_base_port \
+    = os.getenv("OCF_RESKEY_base_port", OCF_RESKEY_base_port_default)
+OCF_RESKEY_checkpoint_interval \
+    = os.getenv("OCF_RESKEY_checkpoint_interval", OCF_RESKEY_checkpoint_interval_default)
+OCF_RESKEY_compare_timeout \
+    = os.getenv("OCF_RESKEY_compare_timeout", OCF_RESKEY_compare_timeout_default)
+OCF_RESKEY_expired_scan_cycle \
+    = os.getenv("OCF_RESKEY_expired_scan_cycle", OCF_RESKEY_expired_scan_cycle_default)
+OCF_RESKEY_max_queue_size \
+    = os.getenv("OCF_RESKEY_max_queue_size", OCF_RESKEY_max_queue_size_default)
+OCF_RESKEY_use_filter_rewriter \
+    = os.getenv("OCF_RESKEY_use_filter_rewriter", OCF_RESKEY_use_filter_rewriter_default)
+OCF_RESKEY_vnet_hdr \
+    = os.getenv("OCF_RESKEY_vnet_hdr", OCF_RESKEY_vnet_hdr_default)
+OCF_RESKEY_max_disk_errors \
+    = os.getenv("OCF_RESKEY_max_disk_errors", OCF_RESKEY_max_disk_errors_default)
+OCF_RESKEY_monitor_timeout \
+    = os.getenv("OCF_RESKEY_monitor_timeout", OCF_RESKEY_monitor_timeout_default)
+OCF_RESKEY_yank_timeout \
+    = os.getenv("OCF_RESKEY_yank_timeout", OCF_RESKEY_yank_timeout_default)
+OCF_RESKEY_fail_fast_timeout \
+    = os.getenv("OCF_RESKEY_fail_fast_timeout", OCF_RESKEY_fail_fast_timeout_default)
+OCF_RESKEY_debug \
+    = os.getenv("OCF_RESKEY_debug", OCF_RESKEY_debug_default)
+
+ACTIVE_IMAGE = os.path.join(OCF_RESKEY_active_hidden_dir, \
+                            OCF_RESOURCE_INSTANCE + "-active.qcow2")
+HIDDEN_IMAGE = os.path.join(OCF_RESKEY_active_hidden_dir, \
+                            OCF_RESOURCE_INSTANCE + "-hidden.qcow2")
+
+QMP_SOCK = os.path.join(HA_RSCTMP, OCF_RESOURCE_INSTANCE + "-qmp.sock")
+HELPER_SOCK = os.path.join(HA_RSCTMP, OCF_RESOURCE_INSTANCE + "-helper.sock")
+COMP_SOCK = os.path.join(HA_RSCTMP, OCF_RESOURCE_INSTANCE + "-compare.sock")
+COMP_OUT_SOCK = os.path.join(HA_RSCTMP, OCF_RESOURCE_INSTANCE \
+                                        + "-comp_out.sock")
+
+PID_FILE = os.path.join(HA_RSCTMP, OCF_RESOURCE_INSTANCE + "-qemu.pid")
+STATE_FILE = os.path.join(HA_RSCTMP, OCF_RESOURCE_INSTANCE + "-state")
+
+QMP_LOG = os.path.join(OCF_RESKEY_log_dir, OCF_RESOURCE_INSTANCE + "-qmp.log")
+QEMU_LOG = os.path.join(OCF_RESKEY_log_dir, OCF_RESOURCE_INSTANCE + "-qemu.log")
+HELPER_LOG = os.path.join(OCF_RESKEY_log_dir, OCF_RESOURCE_INSTANCE \
+                                                                + "-helper.log")
+
+START_TIME = time.time()
+did_yank = False
+
+# Exception only raised by ourself
+class Error(Exception):
+    pass
+
+def setup_constants():
+    # This function is called after the parameters where validated
+    global OCF_RESKEY_CRM_meta_timeout
+    if OCF_ACTION == "monitor":
+        OCF_RESKEY_CRM_meta_timeout = OCF_RESKEY_monitor_timeout
+
+    global MIGRATE_PORT, MIRROR_PORT, COMPARE_IN_PORT, NBD_PORT
+    MIGRATE_PORT = int(OCF_RESKEY_base_port)
+    MIRROR_PORT = int(OCF_RESKEY_base_port) + 1
+    COMPARE_IN_PORT = int(OCF_RESKEY_base_port) + 2
+    NBD_PORT = int(OCF_RESKEY_base_port) + 3
+
+    global QEMU_PRIMARY_CMDLINE
+    QEMU_PRIMARY_CMDLINE = \
+        ("'%(OCF_RESKEY_qemu_binary)s' %(OCF_RESKEY_options)s"
+        " -drive if=none,node-name=colo-disk0,driver=quorum,read-pattern=fifo,"
+        "vote-threshold=1,children.0=parent0"
+        " -qmp unix:'%(QMP_SOCK)s',server,nowait -no-shutdown"
+        " -daemonize -D '%(QEMU_LOG)s' -pidfile '%(PID_FILE)s'") % globals()
+
+    global QEMU_SECONDARY_CMDLINE
+    QEMU_SECONDARY_CMDLINE = \
+        ("'%(OCF_RESKEY_qemu_binary)s' %(OCF_RESKEY_options)s"
+        " -chardev socket,id=red0,host='%(OCF_RESKEY_listen_address)s',"
+        "port=%(MIRROR_PORT)s,server,nowait,nodelay"
+        " -chardev socket,id=red1,host='%(OCF_RESKEY_listen_address)s',"
+        "port=%(COMPARE_IN_PORT)s,server,nowait,nodelay"
+        " -object filter-redirector,id=f1,netdev=hn0,queue=tx,indev=red0"
+        " -object filter-redirector,id=f2,netdev=hn0,queue=rx,outdev=red1") \
+        % globals()
+
+    if is_true(OCF_RESKEY_use_filter_rewriter):
+        QEMU_SECONDARY_CMDLINE += \
+            " -object filter-rewriter,id=rew0,netdev=hn0,queue=all"
+
+    QEMU_SECONDARY_CMDLINE += \
+        (" -drive if=none,node-name=childs0,top-id=colo-disk0,"
+        "driver=replication,mode=secondary,file.driver=qcow2,"
+        "file.file.filename='%(ACTIVE_IMAGE)s',file.backing.driver=qcow2,"
+        "file.backing.file.filename='%(HIDDEN_IMAGE)s',"
+        "file.backing.backing=parent0"
+        " -drive if=none,node-name=colo-disk0,driver=quorum,read-pattern=fifo,"
+        "vote-threshold=1,children.0=childs0"
+        " -incoming tcp:'%(OCF_RESKEY_listen_address)s':%(MIGRATE_PORT)s"
+        " -qmp unix:'%(QMP_SOCK)s',server,nowait -no-shutdown"
+        " -daemonize -D '%(QEMU_LOG)s' -pidfile '%(PID_FILE)s'") % globals()
+
+    global QEMU_DUMMY_CMDLINE
+    QEMU_DUMMY_CMDLINE = \
+        ("'%(OCF_RESKEY_qemu_binary)s' %(OCF_RESKEY_options)s"
+        " -drive if=none,node-name=colo-disk0,driver=null-co -S"
+        " -qmp unix:'%(QMP_SOCK)s',server,nowait"
+        " -daemonize -D '%(QEMU_LOG)s' -pidfile '%(PID_FILE)s'") % globals()
+
+def qemu_colo_meta_data():
+    print("""\
+<?xml version="1.0"?>
+<!DOCTYPE resource-agent SYSTEM "ra-api-1.dtd">
+<resource-agent name="colo">
+
+    <version>1.0</version>
+    <longdesc lang="en">
+Resource agent for qemu COLO. (https://wiki.qemu.org/Features/COLO)
+
+After defining the master/slave instance, the master score has to be
+manually set to show which node has up-to-date data. So you copy your
+image to one host (and create empty images the other host(s)) and then
+run "crm_master -r name_of_your_primitive -v 10" on that host.
+Also, you have to set 'notify=true' in the metadata attributes when
+defining the master/slave instance.
+
+Note:
+-If the instance is stopped cluster-wide, the resource agent will do a
+clean shutdown. Set the demote timeout to the time it takes for your
+guest to shutdown.
+-Colo replication is started from the monitor action. Set the monitor
+timeout to at least the time it takes for replication to start. You can
+set the monitor_timeout parameter for a soft timeout, which the resource
+agent tries to satisfy.
+-The resource agent may notify pacemaker about peer failure,
+these failures will show up with exitreason="Simulated failure".
+    </longdesc>
+    <shortdesc lang="en">Qemu COLO</shortdesc>
+
+    <parameters>
+
+    <parameter name="qemu_binary" unique="0" required="0">
+        <longdesc lang="en">qemu binary to use</longdesc>
+        <shortdesc lang="en">qemu binary</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_qemu_binary_default + """\"/>
+    </parameter>
+
+    <parameter name="qemu_img_binary" unique="0" required="0">
+        <longdesc lang="en">qemu-img binary to use</longdesc>
+        <shortdesc lang="en">qemu-img binary</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_qemu_img_binary_default + """\"/>
+    </parameter>
+
+    <parameter name="log_dir" unique="0" required="0">
+        <longdesc lang="en">Directory to place logs in</longdesc>
+        <shortdesc lang="en">Log directory</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_log_dir_default + """\"/>
+    </parameter>
+
+    <parameter name="options" unique="0" required="1">
+        <longdesc lang="en">
+Options to pass to qemu. These will be passed alongside COLO specific
+options, so you need to follow these conventions: The netdev should have
+id=hn0 and the disk controller drive=colo-disk0. The image node should
+have node-name=parent0, but should not be connected to the guest.
+Example:
+-vnc :0 -enable-kvm -cpu qemu64,+kvmclock -m 512 -netdev bridge,id=hn0
+-device e1000,netdev=hn0 -device virtio-blk,drive=colo-disk0
+-drive if=none,node-name=parent0,format=qcow2,file=/mnt/vms/vm01.qcow2
+        </longdesc>
+        <shortdesc lang="en">Options to pass to qemu.</shortdesc>
+    </parameter>
+
+    <parameter name="active_hidden_dir" unique="0" required="1">
+        <longdesc lang="en">
+Directory where the active and hidden images will be stored. It is
+recommended to put this on a ramdisk.
+        </longdesc>
+        <shortdesc lang="en">Path to active and hidden images</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_active_hidden_dir_default + """\"/>
+    </parameter>
+
+    <parameter name="listen_address" unique="0" required="0">
+        <longdesc lang="en">Address to listen on.</longdesc>
+        <shortdesc lang="en">Listen address</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_listen_address_default + """\"/>
+    </parameter>
+
+    <parameter name="base_port" unique="1" required="0">
+        <longdesc lang="en">
+4 tcp ports that are unique for each instance. (base_port to base_port + 3)
+        </longdesc>
+        <shortdesc lang="en">Ports to use</shortdesc>
+        <content type="integer" default=\"""" \
+            + OCF_RESKEY_base_port_default + """\"/>
+    </parameter>
+
+    <parameter name="checkpoint_interval" unique="0" required="0">
+        <longdesc lang="en">
+Interval for regular checkpoints in milliseconds.
+        </longdesc>
+        <shortdesc lang="en">Interval for regular checkpoints</shortdesc>
+        <content type="integer" default=\"""" \
+            + OCF_RESKEY_checkpoint_interval_default + """\"/>
+    </parameter>
+
+    <parameter name="compare_timeout" unique="0" required="0">
+        <longdesc lang="en">
+Maximum time to hold a primary packet if secondary hasn't sent it yet,
+in milliseconds.
+You should also adjust "expired_scan_cycle" accordingly.
+        </longdesc>
+        <shortdesc lang="en">Compare timeout</shortdesc>
+        <content type="integer" default=\"""" \
+            + OCF_RESKEY_compare_timeout_default + """\"/>
+    </parameter>
+
+    <parameter name="expired_scan_cycle" unique="0" required="0">
+        <longdesc lang="en">
+Interval for checking for expired primary packets in milliseconds.
+        </longdesc>
+        <shortdesc lang="en">Expired packet check interval</shortdesc>
+        <content type="integer" default=\"""" \
+            + OCF_RESKEY_expired_scan_cycle_default + """\"/>
+    </parameter>
+
+    <parameter name="max_queue_size" unique="0" required="0">
+        <longdesc lang="en">
+Maximum queue size for network packets.
+        </longdesc>
+        <shortdesc lang="en">Maximum queue size</shortdesc>
+        <content type="integer" default=\"""" \
+            + OCF_RESKEY_max_queue_size_default + """\"/>
+    </parameter>
+
+    <parameter name="use_filter_rewriter" unique="0" required="0">
+        <longdesc lang="en">
+Use filter-rewriter to increase similarity between the VMs.
+        </longdesc>
+        <shortdesc lang="en">Use filter-rewriter</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_use_filter_rewriter_default + """\"/>
+    </parameter>
+
+    <parameter name="vnet_hdr" unique="0" required="0">
+        <longdesc lang="en">
+Set this to true if your system supports vnet_hdr and you enabled
+it on the tap netdev.
+        </longdesc>
+        <shortdesc lang="en">vnet_hdr support</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_vnet_hdr_default + """\"/>
+    </parameter>
+
+    <parameter name="max_disk_errors" unique="0" required="0">
+        <longdesc lang="en">
+Maximum disk read errors per monitor interval before marking the resource
+as failed. A write error is always fatal except if the value is 0.
+A value of 0 will disable disk error handling.
+Primary disk errors are only handled if there is a healthy secondary.
+        </longdesc>
+        <shortdesc lang="en">Maximum disk errors</shortdesc>
+        <content type="integer" default=\"""" \
+            + OCF_RESKEY_max_disk_errors_default + """\"/>
+    </parameter>
+
+    <parameter name="monitor_timeout" unique="0" required="0">
+        <longdesc lang="en">
+Soft timeout for monitor, in milliseconds.
+Must be lower than the monitor action timeout.
+        </longdesc>
+        <shortdesc lang="en">Monitor timeout</shortdesc>
+        <content type="integer" default=\"""" \
+            + OCF_RESKEY_monitor_timeout_default + """\"/>
+    </parameter>
+
+    <parameter name="yank_timeout" unique="0" required="0">
+        <longdesc lang="en">
+Timeout for QMP commands after which to execute the "yank" command,
+in milliseconds.
+Must be lower than any of the action timeouts.
+        </longdesc>
+        <shortdesc lang="en">Yank timeout</shortdesc>
+        <content type="integer" default=\"""" \
+            + OCF_RESKEY_yank_timeout_default + """\"/>
+    </parameter>
+
+    <parameter name="fail_fast_timeout" unique="0" required="0">
+        <longdesc lang="en">
+Timeout for QMP commands used in the stop and demote actions to speed
+up recovery from a hanging qemu, in milliseconds.
+Must be lower than any of the action timeouts.
+        </longdesc>
+        <shortdesc lang="en">Timeout for fast paths</shortdesc>
+        <content type="integer" default=\"""" \
+            + OCF_RESKEY_fail_fast_timeout_default + """\"/>
+    </parameter>
+
+    <parameter name="debug" unique="0" required="0">
+        <longdesc lang="en">
+Control debugging:
+0: disable debugging
+1: log debug messages and qmp commands
+2: + dump core of hanging qemu
+        </longdesc>
+        <shortdesc lang="en">Control debugging</shortdesc>
+        <content type="integer" default=\"""" \
+            + OCF_RESKEY_debug_default + """\"/>
+    </parameter>
+
+    </parameters>
+
+    <actions>
+        <action name="start"        timeout="30s" />
+        <action name="stop"         timeout="10s" />
+        <action name="monitor"      timeout="30s" \
+            interval="1000ms" depth="0" role="Slave" />
+        <action name="monitor"      timeout="30s" \
+            interval="1001ms" depth="0" role="Master" />
+        <action name="notify"       timeout="30s" />
+        <action name="promote"      timeout="30s" />
+        <action name="demote"       timeout="120s" />
+        <action name="meta-data"    timeout="5s" />
+        <action name="validate-all" timeout="20s" />
+    </actions>
+
+</resource-agent>
+""")
+
+def logs_open():
+    global log
+    log = logging.getLogger(OCF_RESOURCE_INSTANCE)
+    if int(OCF_RESKEY_debug) >= 1 or HA_DEBUG != "0":
+        log.setLevel(logging.DEBUG)
+    else:
+        log.setLevel(logging.INFO)
+
+    formater = logging.Formatter("(%(name)s) %(levelname)s: %(message)s")
+
+    if sys.stdout.isatty():
+        handler = logging.StreamHandler(stream=sys.stderr)
+        handler.setFormatter(formater)
+        log.addHandler(handler)
+
+    if HA_LOGFACILITY:
+        handler = logging.handlers.SysLogHandler("/dev/log")
+        handler.setFormatter(formater)
+        log.addHandler(handler)
+
+    if HA_LOGFILE:
+        handler = logging.FileHandler(HA_LOGFILE)
+        handler.setFormatter(formater)
+        log.addHandler(handler)
+
+    if HA_DEBUGLOG and HA_DEBUGLOG != HA_LOGFILE:
+        handler = logging.FileHandler(HA_DEBUGLOG)
+        handler.setFormatter(formater)
+        log.addHandler(handler)
+
+    global qmp_log
+    qmp_log = logging.getLogger("qmp_log")
+    qmp_log.setLevel(logging.DEBUG)
+    formater = logging.Formatter("%(message)s")
+
+    if int(OCF_RESKEY_debug) >= 1:
+        handler = logging.handlers.WatchedFileHandler(QMP_LOG)
+        handler.setFormatter(formater)
+        qmp_log.addHandler(handler)
+    else:
+        handler = logging.NullHandler()
+        qmp_log.addHandler(handler)
+
+def rotate_logfile(logfile, numlogs):
+    numlogs -= 1
+    for n in range(numlogs, -1, -1):
+        _file = logfile
+        if n != 0:
+            _file = "%s.%s" % (_file, n)
+        if os.path.exists(_file):
+            if n == numlogs:
+                os.remove(_file)
+            else:
+                newname = "%s.%s" % (logfile, n + 1)
+                os.rename(_file, newname)
+
+def is_writable(_file):
+    return os.access(_file, os.W_OK)
+
+def is_executable_file(_file):
+    return os.path.isfile(_file) and os.access(_file, os.X_OK)
+
+def is_true(var):
+    return re.match("yes|true|1|YES|TRUE|True|ja|on|ON", str(var)) != None
+
+# Check if the binary exists and is executable
+def check_binary(binary):
+    if is_executable_file(binary):
+        return True
+    PATH = os.getenv("PATH", os.defpath)
+    for _dir in PATH.split(os.pathsep):
+        if is_executable_file(os.path.join(_dir, binary)):
+            return True
+    log.error("binary \"%s\" doesn't exist or not executable" % binary)
+    return False
+
+def run_command(commandline):
+    proc = subprocess.Popen(commandline, shell=True, stdout=subprocess.PIPE,
+                          stderr=subprocess.STDOUT, universal_newlines=True)
+    stdout, stderr = proc.communicate()
+    if proc.returncode != 0:
+        log.error("command \"%s\" failed with code %s:\n%s" \
+                    % (commandline, proc.returncode, stdout))
+        raise Error()
+
+# Functions for setting and getting the master score to tell Pacemaker which
+# host has the most recent data
+def set_master_score(score):
+    if score == 0:
+        run_command("crm_master -q -l forever -D")
+    else:
+        run_command("crm_master -q -l forever -v %s" % score)
+
+def set_remote_master_score(remote, score):
+    if score == 0:
+        run_command("crm_master -q -l forever -N '%s' -D" % remote)
+    else:
+        run_command("crm_master -q -l forever -N '%s' -v %s" % (remote, score))
+
+def get_master_score():
+    proc = subprocess.Popen("crm_master -q -G", shell=True,
+                            stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
+                            universal_newlines=True)
+    stdout, stderr = proc.communicate()
+    if proc.returncode != 0:
+        return 0
+    else:
+        return int(str.strip(stdout))
+
+def get_remote_master_score(remote):
+    proc = subprocess.Popen("crm_master -q -N '%s' -G" % remote, shell=True,
+                            stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
+                            universal_newlines=True)
+    stdout, stderr = proc.communicate()
+    if proc.returncode != 0:
+        return 0
+    else:
+        return int(str.strip(stdout))
+
+# Tell Pacemaker that the remote resource failed
+def report_remote_failure(remote):
+    run_command("crm_resource --resource '%s' --fail --node '%s'"
+                % (OCF_RESOURCE_INSTANCE, remote))
+
+def recv_line(fd):
+    line = ""
+    while True:
+        tmp = fd.recv(1).decode()
+        line += tmp
+        if tmp == "\n" or len(tmp) == 0:
+            break
+    return line
+
+# Filter out events
+def read_answer(fd):
+    while True:
+        line = recv_line(fd)
+        qmp_log.debug(str.strip(line))
+
+        if len(line) == 0:
+            log.error("qmp connection closed")
+            raise Error()
+
+        answer = json.loads(line)
+        # Ignore everything else
+        if "return" in answer or "error" in answer:
+            break
+    return answer
+
+# Execute one or more qmp commands
+def qmp_execute(fd, commands, ignore_error = False, do_yank = True):
+    for command in commands:
+        if not command:
+            continue
+
+        try:
+            to_send = json.dumps(command)
+            fd.sendall(str.encode(to_send + "\n"))
+            qmp_log.debug(to_send)
+
+            answer = read_answer(fd)
+        except Exception as e:
+            if isinstance(e, socket.timeout) and do_yank:
+                log.warning("Command timed out, trying to unfreeze qemu")
+                new_timeout = max(2, (int(OCF_RESKEY_CRM_meta_timeout)/1000) \
+                                    - (time.time() - START_TIME) - 2)
+                fd.settimeout(new_timeout)
+                try:
+                    # answer is the answer of timed-out command
+                    answer = yank(fd)
+                    if not answer:
+                        answer = read_answer(fd)
+                except socket.error as e:
+                    log.error("while reading answer of timed out command: "
+                              "%s\n%s" % (json.dumps(command), e))
+                    raise Error()
+            elif isinstance(e, (socket.error, socket.timeout)):
+                log.error("while executing qmp command: %s\n%s" \
+                            % (json.dumps(command), e))
+                raise Error()
+            else:
+                raise
+
+        if not ignore_error and ("error" in answer):
+            log.error("qmp command returned error:\n%s\n%s" \
+                        % (json.dumps(command), json.dumps(answer)))
+            raise Error()
+
+    return answer
+
+# Open qemu qmp connection
+def qmp_open(fail_fast = False):
+    if fail_fast:
+        timeout = int(OCF_RESKEY_fail_fast_timeout)/1000
+    else:
+        timeout = int(OCF_RESKEY_yank_timeout)/1000
+
+    try:
+        fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        fd.settimeout(timeout)
+        fd.connect(HELPER_SOCK)
+    except socket.error as e:
+        log.error("while connecting to helper socket: %s" % e)
+        raise Error()
+
+    return fd
+
+def yank(fd):
+    global did_yank
+    did_yank = True
+    ret = None
+    while True:
+        answer = qmp_execute(fd, [{"exec-oob": "query-yank", "id": "yank0"}], \
+                                do_yank = False, ignore_error = True)
+        if "id" not in answer:
+            # This is the answer of the timed-out command
+            ret = answer
+            answer = read_answer(fd)
+        if "error" in answer:
+            log.error("While executing 'query-yank':\n%s" % json.dumps(answer))
+            raise Error()
+        instances = []
+        for n in answer["return"]:
+            if n == { "type": "block-node", "node-name": "nbd0" } \
+                or n == { "type": "chardev", "id": "mirror0" } \
+                or n == { "type": "chardev", "id": "comp_sec_in0" } \
+                or n == { "type": "chardev", "id": "red0" } \
+                or n == { "type": "chardev", "id": "red1" } \
+                or n == { "type": "migration" }:
+                instances.append(n)
+        answer = qmp_execute(fd, [{"exec-oob": "yank", "id": "yank0", "arguments":{ "instances": instances }}], \
+                                do_yank = False, ignore_error = True)
+        if "id" not in answer:
+            # This is the answer of the timed-out command
+            ret = answer
+            answer = read_answer(fd)
+        if "error" in answer:
+            if answer["error"]["class"] == "DeviceNotFound":
+                continue
+            else:
+                log.error("While executing 'yank':\n%s" % json.dumps(answer))
+                raise Error()
+        break
+
+    return ret
+
+def oob_helper_exec(client, cmd, events):
+    if cmd["exec-helper"] == "get-events":
+        event = cmd["arguments"]["event"]
+        if (event in events):
+            to_send = json.dumps({"return": events[event]})
+            client.sendall(str.encode(to_send + "\n"))
+        else:
+            client.sendall(str.encode("{\"return\": []}\n"))
+    elif cmd["exec-helper"] == "clear-events":
+        events.clear()
+        client.sendall(str.encode("{\"return\": {}}\n"))
+    else:
+        client.sendall(str.encode("{\"error\": \"Unknown helper command\"}\n"))
+
+def oob_helper(qmp, server):
+    max_events = max(100, int(OCF_RESKEY_max_disk_errors))
+    events = {}
+    try:
+        os.close(0)
+        os.close(1)
+        os.close(2)
+        logging.shutdown()
+
+        client = None
+        while True:
+            if client:
+                watch = [client, qmp]
+            else:
+                watch = [server, qmp]
+            sel = select.select(watch, [], [])
+            try:
+                if client in sel[0]:
+                    cmd = recv_line(client)
+                    if len(cmd) == 0:
+                        # client socket was closed: wait for new client
+                        client.close()
+                        client = None
+                        continue
+                    else:
+                        parsed = json.loads(cmd)
+                        if ("exec-helper" in parsed):
+                            oob_helper_exec(client, parsed, events)
+                        else:
+                            qmp.sendall(str.encode(cmd))
+                if qmp in sel[0]:
+                    answer = recv_line(qmp)
+                    if len(answer) == 0:
+                        # qmp socket was closed: qemu died, exit
+                        os._exit(0)
+                    else:
+                        parsed = json.loads(answer)
+                        if ("event" in parsed):
+                            event = parsed["event"]
+                            if (event not in events):
+                                events[event] = []
+                            if len(events[event]) < max_events:
+                                events[event].append(parsed)
+                        elif client:
+                            client.sendall(str.encode(answer))
+                if server in sel[0]:
+                    client, client_addr = server.accept()
+            except socket.error as e:
+                pass
+    except Exception as e:
+        with open(HELPER_LOG, 'a') as f:
+            f.write(str(e) + "\n")
+    os._exit(0)
+
+# Fork off helper to keep the oob qmp connection open and to catch events
+def oob_helper_open():
+    try:
+        qmp = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        qmp.connect(QMP_SOCK)
+        qmp_execute(qmp, [{"execute": "qmp_capabilities", "arguments": {"enable": ["oob"]}}])
+    except socket.error as e:
+        log.error("while connecting to qmp socket: %s" % e)
+        raise Error()
+
+    try:
+        if os.path.exists(HELPER_SOCK):
+            os.unlink(HELPER_SOCK)
+        server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        server.bind(HELPER_SOCK)
+        server.listen(1)
+    except socket.error as e:
+        log.error("while opening helper socket: %s" % e)
+        raise Error()
+
+    qmp.set_inheritable(True)
+    server.set_inheritable(True)
+
+    try:
+        pid = os.fork()
+    except OSError as e:
+        log.error("while forking off oob helper: %s" % e)
+        raise Error()
+
+    if pid == 0:
+        # child 1: Exits after forking off child 2, so pid 1 will become
+        # responsible for the child
+        os.setsid()
+
+        pid = os.fork()
+
+        if pid == 0:
+            # child 2: here the actual work is being done
+            oob_helper(qmp, server)
+        else:
+            os._exit(0)
+
+    qmp.close()
+    server.close()
+
+# Get the disk size of the (user supplied) parent disk
+def qmp_get_disk_size(fd):
+    block_nodes = qmp_execute(fd, [{"execute": "query-named-block-nodes", "arguments": {"flat": True}}])
+    for node in block_nodes["return"]:
+        if node["node-name"] == "parent0":
+            return node["image"]["virtual-size"]
+
+    log.error("Disk \"parent0\" not found")
+    raise Error()
+
+# Get the host of the nbd node
+def qmp_get_nbd_remote(fd):
+    block_nodes = qmp_execute(fd, [{"execute": "query-named-block-nodes", "arguments": {"flat": True}}])
+    for node in block_nodes["return"]:
+        if node["node-name"] == "nbd0":
+            url = str(node["image"]["filename"])
+            return str.split(url, "//")[1].split("/")[0].split(":")[0]
+    return None
+
+# Check if we are currently resyncing
+def qmp_check_resync(fd):
+    answer = qmp_execute(fd, [{"execute": "query-block-jobs"}])
+    for job in answer["return"]:
+        if job["device"] == "resync":
+            return job
+    return None
+
+def qmp_start_resync(fd, remote):
+    answer = qmp_execute(fd, [{"execute": "blockdev-add", "arguments": {"driver": "nbd", "node-name": "nbd0", "server": {"type": "inet", "host": str(remote), "port": str(NBD_PORT)}, "export": "parent0", "detect-zeroes": "on"}}], ignore_error = True)
+    if "error" in answer:
+        log.warning("Failed to add nbd node: %s" % json.dumps(answer))
+        log.warning("Assuming peer failure")
+        report_remote_failure(remote)
+    else:
+        qmp_execute(fd, [{"execute": "blockdev-mirror", "arguments": {"device": "colo-disk0", "job-id": "resync", "target": "nbd0", "sync": "full", "on-target-error": "report", "on-source-error": "ignore", "auto-dismiss": False}}])
+
+def qmp_cancel_resync(fd):
+    timeout = START_TIME + (int(OCF_RESKEY_yank_timeout)/1000)
+
+    if qmp_check_resync(fd)["status"] != "concluded":
+        qmp_execute(fd, [{"execute": "block-job-cancel", "arguments": {"device": "resync", "force": True}}], ignore_error = True)
+        # Wait for the block-job to finish
+        while time.time() < timeout:
+            if qmp_check_resync(fd)["status"] == "concluded":
+                break
+            log.debug("Waiting for block-job to finish in qmp_cancel_resync()")
+            time.sleep(1)
+        else:
+            log.warning("Timed out, trying to unfreeze qemu")
+            yank(fd)
+            while qmp_check_resync(fd)["status"] != "concluded":
+                log.debug("Waiting for block-job to finish")
+                time.sleep(1)
+
+    qmp_execute(fd, [
+        {"execute": "block-job-dismiss", "arguments": {"id": "resync"}},
+        {"execute": "blockdev-del", "arguments": {"node-name": "nbd0"}}
+        ])
+
+def qmp_start_colo(fd, remote):
+    # Check if we have a filter-rewriter
+    answer = qmp_execute(fd, [{"execute": "qom-list", "arguments": {"path": "/objects/rew0"}}], ignore_error = True)
+    if "error" in answer:
+        if answer["error"]["class"] == "DeviceNotFound":
+            have_filter_rewriter = False
+        else:
+            log.error("While checking for filter-rewriter:\n%s" \
+                        % json.dumps(answer))
+            raise Error()
+    else:
+        have_filter_rewriter = True
+
+    # Pause VM and cancel resync
+    qmp_execute(fd, [
+        {"execute": "stop"},
+        {"execute": "block-job-cancel", "arguments": {"device": "resync"}}
+        ])
+
+    # Wait for the block-job to finish
+    while qmp_check_resync(fd)["status"] != "concluded":
+        log.debug("Waiting for block-job to finish in qmp_start_colo()")
+        time.sleep(1)
+
+    # Add nbd to the quorum node
+    qmp_execute(fd, [
+        {"execute": "block-job-dismiss", "arguments": {"id": "resync"}},
+        {"execute": "x-blockdev-change", "arguments": {"parent": "colo-disk0", "node": "nbd0"}}
+        ])
+
+    # Connect mirror and compare_in to secondary
+    qmp_execute(fd, [
+        {"execute": "chardev-add", "arguments": {"id": "comp_pri_in0<", "backend": {"type": "socket", "data": {"addr": {"type": "unix", "data": {"path": str(COMP_SOCK)}}, "server": True}}}},
+        {"execute": "chardev-add", "arguments": {"id": "comp_pri_in0>", "backend": {"type": "socket", "data": {"addr": {"type": "unix", "data": {"path": str(COMP_SOCK)}}, "server": False}}}},
+        {"execute": "chardev-add", "arguments": {"id": "comp_out0<", "backend": {"type": "socket", "data": {"addr": {"type": "unix", "data": {"path": str(COMP_OUT_SOCK)}}, "server": True}}}},
+        {"execute": "chardev-add", "arguments": {"id": "comp_out0>", "backend": {"type": "socket", "data": {"addr": {"type": "unix", "data": {"path": str(COMP_OUT_SOCK)}}, "server": False}}}},
+        {"execute": "chardev-add", "arguments": {"id": "mirror0", "backend": {"type": "socket", "data": {"addr": {"type": "inet", "data": {"host": str(remote), "port": str(MIRROR_PORT)}}, "server": False, "nodelay": True }}}},
+        {"execute": "chardev-add", "arguments": {"id": "comp_sec_in0", "backend": {"type": "socket", "data": {"addr": {"type": "inet", "data": {"host": str(remote), "port": str(COMPARE_IN_PORT)}}, "server": False, "nodelay": True }}}}
+        ])
+
+    # Add the COLO filters
+    vnet_hdr_support = is_true(OCF_RESKEY_vnet_hdr)
+    if have_filter_rewriter:
+        qmp_execute(fd, [
+            {"execute": "object-add", "arguments": {"qom-type": "filter-mirror", "id": "m0", "props": {"insert": "before", "position": "id=rew0", "netdev": "hn0", "queue": "tx", "outdev": "mirror0", "vnet_hdr_support": vnet_hdr_support}}},
+            {"execute": "object-add", "arguments": {"qom-type": "filter-redirector", "id": "redire0", "props": {"insert": "before", "position": "id=rew0", "netdev": "hn0", "queue": "rx", "indev": "comp_out0<", "vnet_hdr_support": vnet_hdr_support}}},
+            {"execute": "object-add", "arguments": {"qom-type": "filter-redirector", "id": "redire1", "props": {"insert": "before", "position": "id=rew0", "netdev": "hn0", "queue": "rx", "outdev": "comp_pri_in0<", "vnet_hdr_support": vnet_hdr_support}}},
+            {"execute": "object-add", "arguments": {"qom-type": "iothread", "id": "iothread1"}},
+            {"execute": "object-add", "arguments": {"qom-type": "colo-compare", "id": "comp0", "props": {"primary_in": "comp_pri_in0>", "secondary_in": "comp_sec_in0", "outdev": "comp_out0>", "iothread": "iothread1", "compare_timeout": int(OCF_RESKEY_compare_timeout), "expired_scan_cycle": int(OCF_RESKEY_expired_scan_cycle), "max_queue_size": int(OCF_RESKEY_max_queue_size), "vnet_hdr_support": vnet_hdr_support}}}
+            ])
+    else:
+        qmp_execute(fd, [
+            {"execute": "object-add", "arguments": {"qom-type": "filter-mirror", "id": "m0", "props": {"netdev": "hn0", "queue": "tx", "outdev": "mirror0", "vnet_hdr_support": vnet_hdr_support}}},
+            {"execute": "object-add", "arguments": {"qom-type": "filter-redirector", "id": "redire0", "props": {"netdev": "hn0", "queue": "rx", "indev": "comp_out0<", "vnet_hdr_support": vnet_hdr_support}}},
+            {"execute": "object-add", "arguments": {"qom-type": "filter-redirector", "id": "redire1", "props": {"netdev": "hn0", "queue": "rx", "outdev": "comp_pri_in0<", "vnet_hdr_support": vnet_hdr_support}}},
+            {"execute": "object-add", "arguments": {"qom-type": "iothread", "id": "iothread1"}},
+            {"execute": "object-add", "arguments": {"qom-type": "colo-compare", "id": "comp0", "props": {"primary_in": "comp_pri_in0>", "secondary_in": "comp_sec_in0", "outdev": "comp_out0>", "iothread": "iothread1", "compare_timeout": int(OCF_RESKEY_compare_timeout), "expired_scan_cycle": int(OCF_RESKEY_expired_scan_cycle), "max_queue_size": int(OCF_RESKEY_max_queue_size), "vnet_hdr_support": vnet_hdr_support}}}
+            ])
+
+    # Start COLO
+    qmp_execute(fd, [
+        {"execute": "migrate-set-capabilities", "arguments": {"capabilities": [{"capability": "x-colo", "state": True }] }},
+        {"execute": "migrate-set-parameters", "arguments": {"x-checkpoint-delay": int(OCF_RESKEY_checkpoint_interval) }},
+        {"execute": "migrate", "arguments": {"uri": "tcp:%s:%s" % (remote, MIGRATE_PORT)}}
+        ])
+
+    # Wait for COLO to start
+    while qmp_execute(fd, [{"execute": "query-status"}])["return"]["status"] \
+            == "paused" \
+            or qmp_execute(fd, [{"execute": "query-colo-status"}])["return"]["mode"] \
+            != "primary" :
+        log.debug("Waiting for colo replication to start")
+        time.sleep(1)
+
+def qmp_primary_failover(fd):
+    qmp_execute(fd, [
+        {"execute": "object-del", "arguments": {"id": "m0"}},
+        {"execute": "object-del", "arguments": {"id": "redire0"}},
+        {"execute": "object-del", "arguments": {"id": "redire1"}},
+        {"execute": "x-colo-lost-heartbeat"},
+        {"execute": "object-del", "arguments": {"id": "comp0"}},
+        {"execute": "object-del", "arguments": {"id": "iothread1"}},
+        {"execute": "x-blockdev-change", "arguments": {"parent": "colo-disk0", "child": "children.1"}},
+        {"execute": "blockdev-del", "arguments": {"node-name": "nbd0"}},
+        {"execute": "chardev-remove", "arguments": {"id": "mirror0"}},
+        {"execute": "chardev-remove", "arguments": {"id": "comp_sec_in0"}},
+        {"execute": "chardev-remove", "arguments": {"id": "comp_pri_in0>"}},
+        {"execute": "chardev-remove", "arguments": {"id": "comp_pri_in0<"}},
+        {"execute": "chardev-remove", "arguments": {"id": "comp_out0>"}},
+        {"execute": "chardev-remove", "arguments": {"id": "comp_out0<"}}
+        ])
+
+def qmp_secondary_failover(fd):
+    qmp_execute(fd, [
+        {"execute": "nbd-server-stop"},
+        {"execute": "object-del", "arguments": {"id": "f2"}},
+        {"execute": "object-del", "arguments": {"id": "f1"}},
+        {"execute": "x-colo-lost-heartbeat"},
+        {"execute": "chardev-remove", "arguments": {"id": "red1"}},
+        {"execute": "chardev-remove", "arguments": {"id": "red0"}},
+        ])
+
+def qmp_is_colo_active(fd):
+    answer = qmp_execute(fd, [{"execute": "query-colo-status"}])
+
+    if answer["return"]["mode"] != "none":
+        return True
+    else:
+        return False
+
+# Check qemu health and colo role
+def qmp_check_health(fd, do_yank = True):
+    answer = qmp_execute(fd, [{"execute": "query-status"}], do_yank = do_yank)
+    vm_status = answer["return"]
+
+    answer = qmp_execute(fd, [{"execute": "query-colo-status"}], \
+                            do_yank = do_yank)
+    colo_status = answer["return"]
+
+    if vm_status["status"] == "inmigrate" \
+        or vm_status["status"] == "shutdown":
+        role = OCF_SUCCESS
+        replication = OCF_NOT_RUNNING
+
+    elif (vm_status["status"] == "running" \
+          or vm_status["status"] == "colo" \
+          or vm_status["status"] == "finish-migrate") \
+         and colo_status["mode"] == "none" \
+         and (colo_status["reason"] == "request" \
+              or colo_status["reason"] == "none"):
+        role = OCF_RUNNING_MASTER
+        replication = OCF_NOT_RUNNING
+
+    elif (vm_status["status"] == "running" \
+          or vm_status["status"] == "colo" \
+          or vm_status["status"] == "finish-migrate") \
+         and colo_status["mode"] == "secondary":
+        role = OCF_SUCCESS
+        replication = OCF_SUCCESS
+
+    elif (vm_status["status"] == "running" \
+          or vm_status["status"] == "colo" \
+          or vm_status["status"] == "finish-migrate") \
+         and colo_status["mode"] == "primary":
+        role = OCF_RUNNING_MASTER
+        replication = OCF_SUCCESS
+
+    else:
+        log.error("Invalid qemu status:\nvm status: %s\ncolo status: %s" \
+                    % (vm_status, colo_status))
+        role = OCF_ERR_GENERIC
+        replication = OCF_ERR_GENERIC
+
+    return role, replication
+
+# Sanity checks: check parameters, files, binaries, etc.
+def qemu_colo_validate_all():
+    # Check resource parameters
+    if not str.isdigit(OCF_RESKEY_base_port):
+        log.error("base_port needs to be a number")
+        return OCF_ERR_CONFIGURED
+
+    if not str.isdigit(OCF_RESKEY_checkpoint_interval):
+        log.error("checkpoint_interval needs to be a number")
+        return OCF_ERR_CONFIGURED
+
+    if not str.isdigit(OCF_RESKEY_compare_timeout):
+        log.error("compare_timeout needs to be a number")
+        return OCF_ERR_CONFIGURED
+
+    if not str.isdigit(OCF_RESKEY_expired_scan_cycle):
+        log.error("expired_scan_cycle needs to be a number")
+        return OCF_ERR_CONFIGURED
+
+    if not str.isdigit(OCF_RESKEY_max_queue_size):
+        log.error("max_queue_size needs to be a number")
+        return OCF_ERR_CONFIGURED
+
+    if not str.isdigit(OCF_RESKEY_max_disk_errors):
+        log.error("max_disk_errors needs to be a number")
+        return OCF_ERR_CONFIGURED
+
+    if not str.isdigit(OCF_RESKEY_monitor_timeout):
+        log.error("monitor_timeout needs to be a number")
+        return OCF_ERR_CONFIGURED
+
+    if not str.isdigit(OCF_RESKEY_yank_timeout):
+        log.error("yank_timeout needs to be a number")
+        return OCF_ERR_CONFIGURED
+
+    if not str.isdigit(OCF_RESKEY_fail_fast_timeout):
+        log.error("fail_fast_timeout needs to be a number")
+        return OCF_ERR_CONFIGURED
+
+    if not str.isdigit(OCF_RESKEY_debug):
+        log.error("debug needs to be a number")
+        return OCF_ERR_CONFIGURED
+
+    if not OCF_RESKEY_active_hidden_dir:
+        log.error("active_hidden_dir needs to be specified")
+        return OCF_ERR_CONFIGURED
+
+    # Check resource meta configuration
+    if OCF_ACTION != "stop":
+        if OCF_RESKEY_CRM_meta_master_max != 1:
+            log.error("only one master allowed")
+            return OCF_ERR_CONFIGURED
+
+        if OCF_RESKEY_CRM_meta_clone_max > 2:
+            log.error("maximum 2 clones allowed")
+            return OCF_ERR_CONFIGURED
+
+        if OCF_RESKEY_CRM_meta_master_node_max != 1:
+            log.error("only one master per node allowed")
+            return OCF_ERR_CONFIGURED
+
+        if OCF_RESKEY_CRM_meta_clone_node_max != 1:
+            log.error("only one clone per node allowed")
+            return OCF_ERR_CONFIGURED
+
+    # Check if notify is enabled
+    if OCF_ACTION != "stop" and OCF_ACTION != "monitor":
+        if not is_true(OCF_RESKEY_CRM_meta_notify) \
+           and not OCF_RESKEY_CRM_meta_notify_start_uname:
+            log.error("notify needs to be enabled")
+            return OCF_ERR_CONFIGURED
+
+    # Check that globally-unique is disabled
+    if is_true(OCF_RESKEY_CRM_meta_globally_unique):
+        log.error("globally-unique needs to be disabled")
+        return OCF_ERR_CONFIGURED
+
+    # Check binaries
+    if not check_binary(OCF_RESKEY_qemu_binary):
+        return OCF_ERR_INSTALLED
+
+    if not check_binary(OCF_RESKEY_qemu_img_binary):
+        return OCF_ERR_INSTALLED
+
+    # Check paths and files
+    if not is_writable(OCF_RESKEY_active_hidden_dir) \
+        or not os.path.isdir(OCF_RESKEY_active_hidden_dir):
+        log.error("active and hidden image directory missing or not writable")
+        return OCF_ERR_PERM
+
+    return OCF_SUCCESS
+
+def set_state(state):
+    fd = open(STATE_FILE, "w")
+    fd.write(state)
+    fd.close()
+
+def get_state():
+    if not os.path.exists(STATE_FILE):
+        return "none"
+
+    fd = open(STATE_FILE, "r")
+    state = str.strip(fd.readline())
+    fd.close()
+
+    return state
+
+# Check if qemu is running
+def check_pid():
+    if not os.path.exists(PID_FILE):
+        return OCF_NOT_RUNNING, None
+
+    fd = open(PID_FILE, "r")
+    pid = int(str.strip(fd.readline()))
+    fd.close()
+    try:
+        os.kill(pid, 0)
+    except OSError:
+        log.info("qemu is not running")
+        return OCF_NOT_RUNNING, pid
+    else:
+        return OCF_SUCCESS, pid
+
+def qemu_colo_monitor(fail_fast = False):
+    status, pid = check_pid()
+    if status != OCF_SUCCESS:
+        return status, OCF_NOT_RUNNING
+
+    fd = qmp_open(fail_fast)
+
+    role, replication = qmp_check_health(fd, do_yank = not fail_fast)
+    if role != OCF_SUCCESS and role != OCF_RUNNING_MASTER:
+        return role, replication
+
+    colo_events = qmp_execute(fd, [{"exec-helper": "get-events", "arguments": {"event": "COLO_EXIT"}}], do_yank = False)
+    for event in colo_events["return"]:
+        if event["data"]["reason"] == "error":
+            if replication == OCF_SUCCESS:
+                replication = OCF_ERR_GENERIC
+
+    if did_yank and replication == OCF_SUCCESS:
+        replication = OCF_ERR_GENERIC
+
+    peer_disk_errors = 0
+    local_disk_errors = 0
+    quorum_events = qmp_execute(fd, [{"exec-helper": "get-events", "arguments": {"event": "QUORUM_REPORT_BAD"}}], do_yank = False)
+    for event in quorum_events["return"]:
+        if event["data"]["node-name"] == "nbd0":
+            if event["data"]["type"] == "read":
+                peer_disk_errors += 1
+            else:
+                peer_disk_errors += int(OCF_RESKEY_max_disk_errors)
+        else:
+            if event["data"]["type"] == "read":
+                local_disk_errors += 1
+            else:
+                local_disk_errors += int(OCF_RESKEY_max_disk_errors)
+
+    if int(OCF_RESKEY_max_disk_errors) != 0:
+        if peer_disk_errors >= int(OCF_RESKEY_max_disk_errors):
+            log.error("Peer disk error")
+            if replication == OCF_SUCCESS:
+                replication = OCF_ERR_GENERIC
+
+        if local_disk_errors >= int(OCF_RESKEY_max_disk_errors):
+            if replication == OCF_SUCCESS:
+                log.error("Local disk error")
+                role = OCF_ERR_GENERIC
+            else:
+                log.warning("Local disk error")
+
+    if not fail_fast and OCF_RESKEY_CRM_meta_interval != 0:
+        # This isn't a probe monitor
+        block_job = qmp_check_resync(fd)
+        if block_job:
+            if "error" in block_job:
+                log.error("resync error: %s" % block_job["error"])
+                peer = qmp_get_nbd_remote(fd)
+                qmp_cancel_resync(fd)
+                report_remote_failure(peer)
+            elif block_job["ready"] == True:
+                log.info("resync done, starting colo")
+                peer = qmp_get_nbd_remote(fd)
+                qmp_start_colo(fd, peer)
+                # COLO started, our secondary now can be promoted if the
+                # primary fails
+                set_remote_master_score(peer, 100)
+            else:
+                pct_done = (float(block_job["offset"]) \
+                            / float(block_job["len"])) * 100
+                log.info("resync %.1f%% done" % pct_done)
+        else:
+            if replication == OCF_ERR_GENERIC:
+                if role == OCF_RUNNING_MASTER:
+                    log.error("Replication error")
+                    peer = qmp_get_nbd_remote(fd)
+                    if peer:
+                        report_remote_failure(peer)
+                else:
+                    log.warning("Replication error")
+        qmp_execute(fd, [{"exec-helper": "clear-events"}], do_yank = False)
+
+    fd.close()
+
+    return role, replication
+
+def qemu_colo_start():
+    if check_pid()[0] == OCF_SUCCESS:
+        log.info("qemu is already running")
+        return OCF_SUCCESS
+
+    rotate_logfile(QMP_LOG, 8)
+    rotate_logfile(QEMU_LOG, 8)
+
+    run_command(QEMU_DUMMY_CMDLINE)
+    oob_helper_open()
+    fd = qmp_open()
+    disk_size = qmp_get_disk_size(fd)
+    fd.close()
+    _qemu_colo_stop(OCF_SUCCESS, False)
+
+    run_command("'%s' create -q -f qcow2 %s %s" \
+            % (OCF_RESKEY_qemu_img_binary, ACTIVE_IMAGE, disk_size))
+    run_command("'%s' create -q -f qcow2 %s %s" \
+            % (OCF_RESKEY_qemu_img_binary, HIDDEN_IMAGE, disk_size))
+
+    run_command(QEMU_SECONDARY_CMDLINE)
+    oob_helper_open()
+
+    fd = qmp_open()
+    qmp_execute(fd, [
+        {"execute": "nbd-server-start", "arguments": {"addr": {"type": "inet", "data": {"host": str(OCF_RESKEY_listen_address), "port": str(NBD_PORT)}}}},
+        {"execute": "nbd-server-add", "arguments": {"device": "parent0", "writable": True}}
+        ])
+    fd.close()
+
+    set_state("running")
+
+    return OCF_SUCCESS
+
+def env_do_shutdown_guest():
+    return OCF_RESKEY_CRM_meta_notify_active_uname \
+           and OCF_RESKEY_CRM_meta_notify_stop_uname \
+           and str.strip(OCF_RESKEY_CRM_meta_notify_active_uname) \
+               == str.strip(OCF_RESKEY_CRM_meta_notify_stop_uname)
+
+def env_find_secondary():
+    # slave(s) =
+    # OCF_RESKEY_CRM_meta_notify_slave_uname
+    # - OCF_RESKEY_CRM_meta_notify_stop_uname
+    # + OCF_RESKEY_CRM_meta_notify_start_uname
+    # Filter out hosts that are stopping and ourselves
+    for host in str.split(OCF_RESKEY_CRM_meta_notify_slave_uname, " "):
+        if host:
+            for stopping_host \
+                in str.split(OCF_RESKEY_CRM_meta_notify_stop_uname, " "):
+                if host == stopping_host:
+                    break
+            else:
+                if host != HOSTNAME:
+                    # we found a valid secondary
+                    return host
+
+    for host in str.split(OCF_RESKEY_CRM_meta_notify_start_uname, " "):
+        if host != HOSTNAME:
+            # we found a valid secondary
+            return host
+
+    # we found no secondary
+    return None
+
+def _qemu_colo_stop(monstatus, shutdown_guest):
+    # stop action must do everything possible to stop the resource
+    try:
+        timeout = START_TIME + (int(OCF_RESKEY_CRM_meta_timeout)/1000) - 5
+        force_stop = False
+
+        if monstatus == OCF_NOT_RUNNING:
+            log.info("resource is already stopped")
+            return OCF_SUCCESS
+        elif monstatus == OCF_RUNNING_MASTER or monstatus == OCF_SUCCESS:
+            force_stop = False
+        else:
+            force_stop = True
+
+        if not force_stop:
+            executed_quit = False
+
+            fd = qmp_open(fail_fast = True)
+            if shutdown_guest:
+                qmp_execute(fd, [{"execute": "system_powerdown"}], \
+                                    do_yank = False)
+            else:
+                qmp_execute(fd, [{"execute": "quit"}], do_yank = False)
+                fd.close()
+                executed_quit = True
+
+            # wait for qemu to stop
+            while time.time() < timeout:
+                status, pid = check_pid()
+                if status == OCF_NOT_RUNNING:
+                    # qemu stopped
+                    return OCF_SUCCESS
+                elif status == OCF_SUCCESS and not executed_quit:
+                    vmstatus = qmp_execute(fd, [{"execute": "query-status"}], \
+                                        do_yank = False)
+                    if vmstatus["return"]["status"] == "shutdown":
+                        qmp_execute(fd, [{"execute": "quit"}], do_yank = False)
+                        fd.close()
+                        executed_quit = True
+                    log.debug("Waiting for guest to shutdown")
+                    time.sleep(1)
+                elif status == OCF_SUCCESS and executed_quit:
+                    log.debug("Waiting for qemu to stop")
+                    time.sleep(1)
+                else:
+                    # something went wrong, force stop instead
+                    break
+
+            log.warning("clean stop timeout reached")
+    except Exception as e:
+        log.warning("error while stopping: %s" % e)
+
+    log.info("force stopping qemu")
+
+    status, pid = check_pid()
+    if status == OCF_NOT_RUNNING:
+        return OCF_SUCCESS
+    try:
+        if int(OCF_RESKEY_debug) >= 2:
+            os.kill(pid, signal.SIGSEGV)
+        else:
+            os.kill(pid, signal.SIGTERM)
+            time.sleep(2)
+            os.kill(pid, signal.SIGKILL)
+    except Exception:
+        pass
+
+    while check_pid()[0] != OCF_NOT_RUNNING:
+        time.sleep(1)
+
+    return OCF_SUCCESS
+
+def calculate_master_score(role, replication, shutdown_guest):
+    if HOSTNAME == str.strip(OCF_RESKEY_CRM_meta_notify_master_uname):
+        if str.strip(OCF_RESKEY_CRM_meta_notify_promote_uname) \
+            and str.strip(OCF_RESKEY_CRM_meta_notify_promote_uname) != HOSTNAME:
+            # We where primary and the secondary is to be promoted.
+            # We are going to be out of date.
+            set_master_score(0)
+        else:
+            if role == OCF_RUNNING_MASTER:
+                # We where a healthy primary but had no healty secondary or it
+                # was stopped as well. So we have up-to-date data.
+                set_master_score(10)
+            else:
+                # We where a unhealthy primary but also had no healty secondary.
+                # So we still should have up-to-date data.
+                set_master_score(5)
+    else:
+        if get_master_score() > 10:
+            if role == OCF_SUCCESS:
+                if shutdown_guest:
+                    # We where a healthy secondary and (probably) had a healthy
+                    # primary and both where stopped. So we have up-to-date data
+                    # too.
+                    set_master_score(10)
+                else:
+                    # We where a healthy secondary and (probably) had a healthy
+                    # primary still running. So we are now out of date.
+                    set_master_score(0)
+            else:
+                # We where a unhealthy secondary. So we are now out of date.
+                set_master_score(0)
+
+def qemu_colo_stop():
+    shutdown_guest = env_do_shutdown_guest()
+
+    try:
+        role, replication = qemu_colo_monitor(fail_fast = True)
+    except Exception:
+        role, replication = OCF_ERR_GENERIC, OCF_ERR_GENERIC
+
+    status = _qemu_colo_stop(role, shutdown_guest)
+
+    if get_state() != "none":
+        calculate_master_score(role, replication, shutdown_guest)
+    set_state("none")
+
+    return status
+
+def qemu_colo_notify():
+    action = "%s-%s" % (OCF_RESKEY_CRM_meta_notify_type, \
+                        OCF_RESKEY_CRM_meta_notify_operation)
+
+    if action == "post-start":
+        if HOSTNAME == str.strip(OCF_RESKEY_CRM_meta_notify_master_uname):
+            peer = str.strip(OCF_RESKEY_CRM_meta_notify_start_uname)
+            fd = qmp_open()
+            qmp_start_resync(fd, peer)
+            # The secondary has inconsistent data until resync is finished
+            set_remote_master_score(peer, 0)
+            fd.close()
+
+    elif action == "pre-stop":
+        if not env_do_shutdown_guest() \
+           and HOSTNAME == str.strip(OCF_RESKEY_CRM_meta_notify_master_uname) \
+           and HOSTNAME != str.strip(OCF_RESKEY_CRM_meta_notify_stop_uname):
+            fd = qmp_open()
+            peer = qmp_get_nbd_remote(fd)
+            log.debug("our peer: %s" % peer)
+            if peer == str.strip(OCF_RESKEY_CRM_meta_notify_stop_uname):
+                if qmp_check_resync(fd):
+                    qmp_cancel_resync(fd)
+                elif qmp_is_colo_active(fd):
+                    qmp_primary_failover(fd)
+                qmp_execute(fd, [{"exec-helper": "clear-events"}],do_yank=False)
+            fd.close()
+
+    elif action == "post-stop" \
+         and OCF_RESKEY_CRM_meta_notify_key_operation == "stonith" \
+         and (HOSTNAME == str.strip(OCF_RESKEY_CRM_meta_notify_master_uname)
+            or str.strip(OCF_RESKEY_CRM_meta_notify_promote_uname)):
+        peer = str.strip(OCF_RESKEY_CRM_meta_notify_stop_uname)
+        set_remote_master_score(peer, 0)
+
+    return OCF_SUCCESS
+
+def qemu_colo_promote():
+    role, replication = qemu_colo_monitor()
+
+    if role == OCF_SUCCESS and replication == OCF_NOT_RUNNING:
+        status = _qemu_colo_stop(OCF_SUCCESS, False)
+        if status != OCF_SUCCESS:
+            return status
+
+        rotate_logfile(QMP_LOG, 8)
+        rotate_logfile(QEMU_LOG, 8)
+        run_command(QEMU_PRIMARY_CMDLINE)
+        oob_helper_open()
+        set_master_score(101)
+
+        peer = env_find_secondary()
+        if peer:
+            fd = qmp_open()
+            qmp_start_resync(fd, peer)
+            # The secondary has inconsistent data until resync is finished
+            set_remote_master_score(peer, 0)
+            fd.close()
+        return OCF_SUCCESS
+    elif role == OCF_SUCCESS and replication != OCF_NOT_RUNNING:
+        fd = qmp_open()
+        qmp_secondary_failover(fd)
+        set_master_score(101)
+
+        peer = env_find_secondary()
+        if peer:
+            qmp_start_resync(fd, peer)
+            # The secondary has inconsistent data until resync is finished
+            set_remote_master_score(peer, 0)
+        qmp_execute(fd, [{"exec-helper": "clear-events"}], do_yank=False)
+        fd.close()
+        return OCF_SUCCESS
+    else:
+        return OCF_ERR_GENERIC
+
+def qemu_colo_demote():
+    status = qemu_colo_stop()
+    if status != OCF_SUCCESS:
+        return status
+    return qemu_colo_start()
+
+
+if OCF_ACTION == "meta-data":
+    qemu_colo_meta_data()
+    exit(OCF_SUCCESS)
+
+logs_open()
+
+status = qemu_colo_validate_all()
+# Exit here if our sanity checks fail, but try to continue if we need to stop
+if status != OCF_SUCCESS and OCF_ACTION != "stop":
+    exit(status)
+
+setup_constants()
+
+try:
+    if OCF_ACTION == "start":
+        status = qemu_colo_start()
+    elif OCF_ACTION == "stop":
+        status = qemu_colo_stop()
+    elif OCF_ACTION == "monitor":
+        status = qemu_colo_monitor()[0]
+    elif OCF_ACTION == "notify":
+        status = qemu_colo_notify()
+    elif OCF_ACTION == "promote":
+        status = qemu_colo_promote()
+    elif OCF_ACTION == "demote":
+        status = qemu_colo_demote()
+    elif OCF_ACTION == "validate-all":
+        status = qemu_colo_validate_all()
+    else:
+        status = OCF_ERR_UNIMPLEMENTED
+except Error:
+    exit(OCF_ERR_GENERIC)
+else:
+    exit(status)
--
2.30.0


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]

^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH v4 4/6] colo: Introduce high-level test suite
  2021-02-07 15:54 [PATCH v4 0/6] colo: Introduce resource agent and test suite/CI Lukas Straub
                   ` (2 preceding siblings ...)
  2021-02-07 15:55 ` [PATCH v4 3/6] colo: Introduce resource agent Lukas Straub
@ 2021-02-07 15:55 ` Lukas Straub
  2021-02-07 15:55 ` [PATCH v4 5/6] configure,Makefile: Install colo resource-agent Lukas Straub
  2021-02-07 15:55 ` [PATCH v4 6/6] MAINTAINERS: Add myself as maintainer for COLO resource agent Lukas Straub
  5 siblings, 0 replies; 7+ messages in thread
From: Lukas Straub @ 2021-02-07 15:55 UTC (permalink / raw)
  To: qemu-devel
  Cc: Philippe Mathieu-Daudé, Wainer dos Santos Moschetta, Cleber Rosa

[-- Attachment #1: Type: text/plain, Size: 25967 bytes --]

Add high-level test relying on the colo resource-agent to test
all failover cases while checking guest network connectivity.

Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
 scripts/colo-resource-agent/crm_master   |  44 ++
 scripts/colo-resource-agent/crm_resource |  12 +
 tests/acceptance/colo.py                 | 654 +++++++++++++++++++++++
 3 files changed, 710 insertions(+)
 create mode 100755 scripts/colo-resource-agent/crm_master
 create mode 100755 scripts/colo-resource-agent/crm_resource
 create mode 100644 tests/acceptance/colo.py

diff --git a/scripts/colo-resource-agent/crm_master b/scripts/colo-resource-agent/crm_master
new file mode 100755
index 0000000000..886f523bda
--- /dev/null
+++ b/scripts/colo-resource-agent/crm_master
@@ -0,0 +1,44 @@
+#!/bin/bash
+
+# Fake crm_master for COLO testing
+#
+# Copyright (c) Lukas Straub <lukasstraub2@web.de>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later.  See the COPYING file in the top-level directory.
+
+TMPDIR="$HA_RSCTMP"
+score=0
+query=0
+
+OPTIND=1
+while getopts 'Qql:Dv:N:G' opt; do
+    case "$opt" in
+        Q|q)
+            # Noop
+        ;;
+        "l")
+            # Noop
+        ;;
+        "D")
+            score=0
+        ;;
+        "v")
+            score=$OPTARG
+        ;;
+        "N")
+            TMPDIR="$COLO_TEST_REMOTE_TMP"
+        ;;
+        "G")
+            query=1
+        ;;
+    esac
+done
+
+if (( query )); then
+    cat "${TMPDIR}/master_score" || exit 1
+else
+    echo $score > "${TMPDIR}/master_score" || exit 1
+fi
+
+exit 0
diff --git a/scripts/colo-resource-agent/crm_resource b/scripts/colo-resource-agent/crm_resource
new file mode 100755
index 0000000000..ad69ff3c6b
--- /dev/null
+++ b/scripts/colo-resource-agent/crm_resource
@@ -0,0 +1,12 @@
+#!/bin/sh
+
+# Fake crm_resource for COLO testing
+#
+# Copyright (c) Lukas Straub <lukasstraub2@web.de>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later.  See the COPYING file in the top-level directory.
+
+# Noop
+
+exit 0
diff --git a/tests/acceptance/colo.py b/tests/acceptance/colo.py
new file mode 100644
index 0000000000..2a0027f0c8
--- /dev/null
+++ b/tests/acceptance/colo.py
@@ -0,0 +1,654 @@
+# High-level test suite for qemu COLO testing all failover cases while checking
+# guest network connectivity
+#
+# Copyright (c) Lukas Straub <lukasstraub2@web.de>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later.  See the COPYING file in the top-level directory.
+
+# HOWTO:
+#
+# This test has the following parameters:
+# bridge_name: name of the bridge interface to connect qemu to
+# host_address: ip address of the bridge interface
+# guest_address: ip address that the guest gets from the dhcp server
+# bridge_helper: path to the brige helper
+#                (default: /usr/lib/qemu/qemu-bridge-helper)
+# install_cmd: command to run to install iperf3 and memtester in the guest
+#              (default: "sudo -n dnf -q -y install iperf3 memtester")
+#
+# To run the network tests, you have to specify the parameters.
+#
+# Example for running the colo tests:
+# make check-acceptance FEDORA_31_ARCHES="x86_64" AVOCADO_TAGS="-t colo \
+#  -p bridge_name=br0 -p host_address=192.168.220.1 \
+#  -p guest_address=192.168.220.222"
+#
+# The colo tests currently only use x86_64 test vm images. With the
+# FEDORA_31_ARCHES make variable as in the example, only the x86_64 images will
+# be downloaded.
+#
+# If you're running the network tests as an unprivileged user, you need to set
+# the suid bit on the bridge helper (chmod +s <bridge-helper>).
+#
+# The dhcp server should assign a static ip to the guest, else the test may be
+# unreliable. The Mac address for the guest is always 52:54:00:12:34:56.
+
+
+import sys
+import subprocess
+import shutil
+import os
+import signal
+import os.path
+import time
+import tempfile
+
+from avocado import skipUnless
+from avocado.utils import network
+from avocado.utils import vmimage
+from avocado.utils import cloudinit
+from avocado.utils import ssh
+from avocado.utils.path import find_command, CmdNotFoundError
+
+from avocado_qemu import Test, pick_default_qemu_bin, SOURCE_DIR
+from qemu.qmp import QEMUMonitorProtocol
+
+def iperf3_available():
+    try:
+        find_command("iperf3")
+    except CmdNotFoundError:
+        return False
+    return True
+
+class Host:
+
+    logdir = ""
+    tmpdir = ""
+    pid_file = ""
+    master_score_file = ""
+    qmp_sock = ""
+    image = ""
+    bridge_port = 0
+
+class ColoTest(Test):
+
+    # Constants
+    OCF_SUCCESS = 0
+    OCF_ERR_GENERIC = 1
+    OCF_ERR_ARGS = 2
+    OCF_ERR_UNIMPLEMENTED = 3
+    OCF_ERR_PERM = 4
+    OCF_ERR_INSTALLED = 5
+    OCF_ERR_CONFIGURED = 6
+    OCF_NOT_RUNNING = 7
+    OCF_RUNNING_MASTER = 8
+    OCF_FAILED_MASTER = 9
+
+    QEMU_OPTIONS = (" -display none -vga none -enable-kvm"
+                    " -smp 2 -cpu host -m 768"
+                    " -device e1000,mac=52:54:00:12:34:56,netdev=hn0"
+                    " -device virtio-blk,drive=colo-disk0"
+                    " -device virtio-rng")
+
+    FEDORA_VERSION = "31"
+    IMAGE_CHECKSUM = "e3c1b309d9203604922d6e255c2c5d098a309c2d46215d8fc026954f3c5c27a0"
+
+    hosts = [ Host(), Host() ]
+    hang_qemu = False
+    checkpoint_failover = False
+    traffic_procs = []
+
+    def get_image(self, temp_dir):
+        try:
+            return vmimage.get(
+                "fedora", arch="x86_64", version=self.FEDORA_VERSION,
+                checksum=self.IMAGE_CHECKSUM, algorithm="sha256",
+                cache_dir=self.cache_dirs[0],
+                snapshot_dir=temp_dir)
+        except:
+            self.cancel("Failed to download/prepare image")
+
+    @skipUnless(ssh.SSH_CLIENT_BINARY, "No SSH client available")
+    def setUp(self):
+        # Qemu and qemu-img binary
+        default_qemu_bin = pick_default_qemu_bin()
+        self.QEMU_BINARY = self.params.get("qemu_bin", default=default_qemu_bin)
+
+        qemu_img = self.pick_qemu_util("qemu-img")
+        self.QEMU_IMG_BINARY = qemu_img
+        vmimage.QEMU_IMG = qemu_img
+
+        self.RESOURCE_AGENT = os.path.join(SOURCE_DIR,
+                                           "scripts/colo-resource-agent/colo")
+        self.ADD_PATH = os.path.join(SOURCE_DIR, "scripts/colo-resource-agent")
+
+        # Logs
+        self.RA_LOG = os.path.join(self.outputdir, "resource-agent.log")
+        for n in range(2):
+            logdir = os.path.join(self.outputdir, "host%u" % n)
+            self.hosts[n].logdir = logdir
+            os.makedirs(logdir)
+
+        # Temporary directories
+        # We don't use self.workdir because of unix socket path length
+        # limitations
+        self.TMPDIR = tempfile.mkdtemp()
+        for n in range(2):
+            tmpdir = os.path.join(self.TMPDIR, "host%u" % n)
+            self.hosts[n].tmpdir = tmpdir
+            os.makedirs(tmpdir)
+
+        for n in range(2):
+            self.hosts[n].pid_file = \
+                os.path.join(self.hosts[n].tmpdir, "colo-test-qemu.pid")
+            self.hosts[n].master_score_file = \
+                os.path.join(self.hosts[n].tmpdir, "master_score")
+            self.hosts[n].qmp_sock = \
+                os.path.join(self.hosts[n].tmpdir, "my-qmp.sock")
+
+        # Network
+        self.BRIDGE_NAME = self.params.get("bridge_name")
+        if self.BRIDGE_NAME:
+            self.HOST_ADDRESS = self.params.get("host_address")
+            self.GUEST_ADDRESS = self.params.get("guest_address")
+            self.BRIDGE_HELPER = self.pick_qemu_util("qemu-bridge-helper")
+            self.SSH_PORT = 22
+        else:
+            # QEMU's hard coded usermode router address
+            self.HOST_ADDRESS = "10.0.2.2"
+            self.GUEST_ADDRESS = "127.0.0.1"
+            self.SSH_PORT = network.find_free_port(address="127.0.0.1")
+            for n in range(2):
+                self.hosts[n].bridge_port = \
+                    network.find_free_port(address="127.0.0.1")
+
+        self.CLOUDINIT_HOME_PORT = network.find_free_port()
+
+        # Find free port range
+        base_port = 1024
+        while True:
+            base_port = network.find_free_port(start_port=base_port,
+                                               address="127.0.0.1")
+            if base_port == None:
+                self.cancel("Failed to find a free port")
+            for n in range(base_port, base_port +4):
+                if n > 65535:
+                    break
+                if not network.is_port_free(n, "127.0.0.1"):
+                    break
+            else:
+                # for loop above didn't break
+                break
+
+        self.BASE_PORT = base_port
+
+        # Disk images
+        self.log.info("Downloading/preparing boot image")
+        for n in range(2):
+            self.hosts[n].image = self.get_image(self.hosts[n].tmpdir).path
+        self.CLOUDINIT_ISO = os.path.join(self.TMPDIR, "cloudinit.iso")
+
+        self.log.info("Preparing cloudinit image")
+        try:
+            cloudinit.iso(self.CLOUDINIT_ISO, self.name,
+                          username="test", password="password",
+                          phone_home_host=self.HOST_ADDRESS,
+                          phone_home_port=self.CLOUDINIT_HOME_PORT)
+        except Exception as e:
+            self.cancel("Failed to prepare cloudinit image")
+
+        self.QEMU_OPTIONS += " -cdrom %s" % self.CLOUDINIT_ISO
+
+        # Network bridge
+        if not self.BRIDGE_NAME:
+            self.BRIDGE_PIDFILE = os.path.join(self.TMPDIR, "bridge.pid")
+            self.run_command(("'%s' -pidfile '%s'"
+                " -M none -display none -daemonize"
+                " -netdev user,id=host,hostfwd=tcp:127.0.0.1:%s-:22"
+                " -netdev socket,id=host0,listen=127.0.0.1:%s"
+                " -netdev socket,id=host1,listen=127.0.0.1:%s"
+                " -netdev hubport,id=hostport,hubid=0,netdev=host"
+                " -netdev hubport,id=port0,hubid=0,netdev=host0"
+                " -netdev hubport,id=port1,hubid=0,netdev=host1")
+                % (self.QEMU_BINARY, self.BRIDGE_PIDFILE, self.SSH_PORT,
+                   self.hosts[0].bridge_port, self.hosts[1].bridge_port), 0)
+
+    def tearDown(self):
+        try:
+            pid = self.read_pidfile(self.BRIDGE_PIDFILE)
+            if pid and self.check_pid(pid):
+                os.kill(pid, signal.SIGKILL)
+        except Exception as e:
+            pass
+
+        for n in range(2):
+            try:
+                self.ra_stop(n)
+            except Exception as e:
+                pass
+
+        try:
+            self.ssh_close()
+        except Exception as e:
+            pass
+
+        for proc in self.traffic_procs:
+            try:
+                os.killpg(proc.pid, signal.SIGTERM)
+            except Exception as e:
+                pass
+
+        shutil.rmtree(self.TMPDIR)
+
+    def run_command(self, cmdline, expected_status, env=None):
+        proc = subprocess.Popen(cmdline, shell=True, stdout=subprocess.PIPE,
+                                stderr=subprocess.STDOUT,
+                                universal_newlines=True, env=env)
+        stdout, stderr = proc.communicate()
+        if proc.returncode != expected_status:
+            self.fail("command \"%s\" failed with code %s:\n%s"
+                           % (cmdline, proc.returncode, stdout))
+
+        return proc.returncode
+
+    def cat_line(self, path):
+        line=""
+        try:
+            fd = open(path, "r")
+            line = str.strip(fd.readline())
+            fd.close()
+        except:
+            pass
+        return line
+
+    def read_pidfile(self, pidfile):
+        try:
+            pid = int(self.cat_line(pidfile))
+        except ValueError:
+            return None
+        else:
+            return pid
+
+    def check_pid(self, pid):
+        try:
+            os.kill(pid, 0)
+        except OSError:
+            return False
+        else:
+            return True
+
+    def ssh_open(self):
+        self.ssh_conn = ssh.Session(self.GUEST_ADDRESS, self.SSH_PORT,
+                                    user="test", password="password")
+        self.ssh_conn.DEFAULT_OPTIONS += (("UserKnownHostsFile", "/dev/null"),)
+        for i in range(10):
+            try:
+                if self.ssh_conn.connect():
+                    return
+            except Exception as e:
+                pass
+            time.sleep(4)
+        self.fail("sshd timeout")
+
+    def ssh_ping(self):
+        if self.ssh_conn.cmd("echo ping").stdout != b"ping\n":
+            self.fail("ssh ping failed")
+
+    def ssh_close(self):
+        self.ssh_conn.quit()
+
+    def setup_base_env(self, host):
+        remotehost = (host +1) % 2
+        PATH = os.getenv("PATH", "")
+        env = { "PATH": "%s:%s" % (self.ADD_PATH, PATH),
+                "HA_LOGFILE": self.RA_LOG,
+                "OCF_RESOURCE_INSTANCE": "colo-test",
+                "OCF_RESKEY_CRM_meta_clone_max": "2",
+                "OCF_RESKEY_CRM_meta_notify": "true",
+                "OCF_RESKEY_CRM_meta_timeout": "30000",
+                "OCF_RESKEY_qemu_binary": self.QEMU_BINARY,
+                "OCF_RESKEY_qemu_img_binary": self.QEMU_IMG_BINARY,
+                "OCF_RESKEY_checkpoint_interval": "10000",
+                "OCF_RESKEY_base_port": str(self.BASE_PORT),
+                "OCF_RESKEY_debug": "2"}
+
+        env.update({"OCF_RESKEY_options":
+                        ("%s -qmp unix:%s,server,nowait"
+                         " -drive if=none,node-name=parent0,file='%s'")
+                        % (self.QEMU_OPTIONS, self.hosts[host].qmp_sock,
+                           self.hosts[host].image),
+                    "OCF_RESKEY_active_hidden_dir": self.hosts[host].tmpdir,
+                    "OCF_RESKEY_listen_address": "127.0.0.%u" % (host +1),
+                    "OCF_RESKEY_log_dir": self.hosts[host].logdir,
+                    "OCF_RESKEY_CRM_meta_on_node": "127.0.0.%u" % (host +1),
+                    "HA_RSCTMP": self.hosts[host].tmpdir,
+                    "COLO_TEST_REMOTE_TMP": self.hosts[remotehost].tmpdir})
+
+        if self.BRIDGE_NAME:
+            env["OCF_RESKEY_options"] += \
+                " -netdev bridge,id=hn0,br=%s,helper='%s'" \
+                % (self.BRIDGE_NAME, self.BRIDGE_HELPER)
+        else:
+            env["OCF_RESKEY_options"] += \
+                " -netdev socket,id=hn0,connect=127.0.0.1:%s" \
+                % self.hosts[host].bridge_port
+        return env
+
+    def ra_start(self, host):
+        env = self.setup_base_env(host)
+        self.run_command(self.RESOURCE_AGENT + " start", self.OCF_SUCCESS, env)
+
+    def ra_stop(self, host):
+        env = self.setup_base_env(host)
+        self.run_command(self.RESOURCE_AGENT + " stop", self.OCF_SUCCESS, env)
+
+    def ra_monitor(self, host, expected_status):
+        env = self.setup_base_env(host)
+        self.run_command(self.RESOURCE_AGENT + " monitor", expected_status, env)
+
+    def ra_promote(self, host):
+        env = self.setup_base_env(host)
+        self.run_command(self.RESOURCE_AGENT + " promote", self.OCF_SUCCESS,env)
+
+    def ra_notify_start(self, host):
+        remotehost = (host +1) % 2
+        env = self.setup_base_env(host)
+
+        env.update({"OCF_RESKEY_CRM_meta_notify_type": "post",
+                    "OCF_RESKEY_CRM_meta_notify_operation": "start"})
+
+        env.update({"OCF_RESKEY_CRM_meta_notify_master_uname":
+                        "127.0.0.%u" % (host +1),
+                    "OCF_RESKEY_CRM_meta_notify_start_uname":
+                        "127.0.0.%u" % (remotehost +1)})
+
+        self.run_command(self.RESOURCE_AGENT + " notify", self.OCF_SUCCESS, env)
+
+    def ra_notify_stop(self, host):
+        remotehost = (host +1) % 2
+        env = self.setup_base_env(host)
+
+        env.update({"OCF_RESKEY_CRM_meta_notify_type": "pre",
+                    "OCF_RESKEY_CRM_meta_notify_operation": "stop"})
+
+        env.update({"OCF_RESKEY_CRM_meta_notify_master_uname":
+                        "127.0.0.%u" % (host +1),
+                    "OCF_RESKEY_CRM_meta_notify_stop_uname":
+                        "127.0.0.%u" % (remotehost +1)})
+
+        self.run_command(self.RESOURCE_AGENT + " notify", self.OCF_SUCCESS, env)
+
+    def get_pid(self, host):
+        return self.read_pidfile(self.hosts[host].pid_file)
+
+    def get_master_score(self, host):
+        return int(self.cat_line(self.hosts[host].master_score_file))
+
+    def kill_qemu_pre(self, host):
+        pid = self.get_pid(host)
+
+        if self.checkpoint_failover:
+            qmp_conn = QEMUMonitorProtocol(self.hosts[host].qmp_sock)
+            qmp_conn.settimeout(10)
+            qmp_conn.connect()
+            while True:
+                event = qmp_conn.pull_event(wait=True)
+                if event["event"] == "STOP":
+                    break
+            qmp_conn.close()
+
+
+        if pid and self.check_pid(pid):
+            if self.hang_qemu:
+                os.kill(pid, signal.SIGSTOP)
+            else:
+                os.kill(pid, signal.SIGKILL)
+                while self.check_pid(pid):
+                    time.sleep(1)
+
+    def kill_qemu_post(self, host):
+        pid = self.get_pid(host)
+
+        if self.hang_qemu and pid and self.check_pid(pid):
+            os.kill(pid, signal.SIGKILL)
+            while self.check_pid(pid):
+                time.sleep(1)
+
+    def prepare_guest(self):
+        pass
+
+    def cycle_start(self, cycle):
+        pass
+
+    def active_section(self):
+        return False
+
+    def cycle_end(self, cycle):
+        pass
+
+    def check_connection(self):
+        self.ssh_ping()
+        for proc in self.traffic_procs:
+            if proc.poll() != None:
+                self.fail("Traffic process died")
+
+    def _test_colo(self, loop=1):
+        loop = max(loop, 1)
+        self.log.info("Will put logs in %s" % self.outputdir)
+
+        for n in range(2):
+            self.ra_stop(n)
+
+        self.log.info("*** Startup ***")
+        for n in range(2):
+            self.ra_start(n)
+
+        for n in range(2):
+            self.ra_monitor(n, self.OCF_SUCCESS)
+
+        self.log.info("*** Promoting ***")
+        self.ra_promote(0)
+        cloudinit.wait_for_phone_home(("0.0.0.0", self.CLOUDINIT_HOME_PORT),
+                                      self.name)
+        self.ssh_open()
+        self.prepare_guest()
+
+        self.ra_notify_start(0)
+
+        while self.get_master_score(1) != 100:
+            self.ra_monitor(0, self.OCF_RUNNING_MASTER)
+            self.ra_monitor(1, self.OCF_SUCCESS)
+            time.sleep(1)
+
+        self.log.info("*** Replication started ***")
+
+        self.check_connection()
+
+        primary = 0
+        secondary = 1
+
+        for n in range(loop):
+            self.cycle_start(n)
+            self.log.info("*** Secondary failover ***")
+            self.kill_qemu_pre(primary)
+            self.ra_notify_stop(secondary)
+            self.ra_monitor(secondary, self.OCF_SUCCESS)
+            self.ra_promote(secondary)
+            self.ra_monitor(secondary, self.OCF_RUNNING_MASTER)
+            self.kill_qemu_post(primary)
+
+            self.check_connection()
+
+            tmp = primary
+            primary = secondary
+            secondary = tmp
+
+            self.log.info("*** Secondary continue replication ***")
+            self.ra_start(secondary)
+            self.ra_notify_start(primary)
+
+            self.check_connection()
+
+            # Wait for resync
+            while self.get_master_score(secondary) != 100:
+                self.ra_monitor(primary, self.OCF_RUNNING_MASTER)
+                self.ra_monitor(secondary, self.OCF_SUCCESS)
+                time.sleep(1)
+
+            self.log.info("*** Replication started ***")
+
+            self.check_connection()
+
+            if self.active_section():
+                break
+
+            self.log.info("*** Primary failover ***")
+            self.kill_qemu_pre(secondary)
+            self.ra_monitor(primary, self.OCF_RUNNING_MASTER)
+            self.ra_notify_stop(primary)
+            self.ra_monitor(primary, self.OCF_RUNNING_MASTER)
+            self.kill_qemu_post(secondary)
+
+            self.check_connection()
+
+            self.log.info("*** Primary continue replication ***")
+            self.ra_start(secondary)
+            self.ra_notify_start(primary)
+
+            self.check_connection()
+
+            # Wait for resync
+            while self.get_master_score(secondary) != 100:
+                self.ra_monitor(primary, self.OCF_RUNNING_MASTER)
+                self.ra_monitor(secondary, self.OCF_SUCCESS)
+                time.sleep(1)
+
+            self.log.info("*** Replication started ***")
+
+            self.check_connection()
+
+            self.cycle_end(n)
+
+        self.ssh_close()
+
+        for n in range(2):
+            self.ra_stop(n)
+
+        for n in range(2):
+            self.ra_monitor(n, self.OCF_NOT_RUNNING)
+        self.log.info("*** all ok ***")
+
+
+class ColoQuickTest(ColoTest):
+    """
+    :avocado: tags=colo
+    :avocado: tags=quick
+    :avocado: tags=arch:x86_64
+    """
+
+    timeout = 300
+
+    def cycle_end(self, cycle):
+        self.log.info("Testing with peer qemu hanging"
+                      " and failover during checkpoint")
+        self.hang_qemu = True
+
+    def test_quick(self):
+        self.checkpoint_failover = True
+        self.log.info("Testing with peer qemu crashing"
+                      " and failover during checkpoint")
+        self._test_colo(loop=2)
+
+
+class ColoNetworkTest(ColoTest):
+
+    def prepare_guest(self):
+        install_cmd = self.params.get("install_cmd", default=
+                                "sudo -n dnf -q -y install iperf3 memtester")
+        self.ssh_conn.cmd(install_cmd)
+        # Use two instances to work around a bug in iperf3
+        self.ssh_conn.cmd("iperf3 -sD; iperf3 -sD -p 5202")
+
+    def _cycle_start(self, cycle):
+        pass
+
+    def cycle_start(self, cycle):
+        self._cycle_start(cycle)
+        tests = [("", "client-to-server tcp"), ("-R", "server-to-client tcp")]
+
+        self.log.info("Testing iperf %s" % tests[cycle % 2][1])
+        iperf_cmd = "iperf3 %s -t 60 -i 60 --connect-timeout 30000 -c %s" \
+                        % (tests[cycle % 2][0], self.GUEST_ADDRESS)
+        proc = subprocess.Popen("while %s && %s; do sleep 1; done >>'%s' 2>&1"
+                    % (iperf_cmd, iperf_cmd + " -p 5202",
+                    os.path.join(self.outputdir, "iperf.log")),
+                    shell=True, preexec_fn=os.setsid, stdin=subprocess.DEVNULL,
+                    stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+        self.traffic_procs.append(proc)
+        time.sleep(5) # Wait for iperf to get up to speed
+
+    def cycle_end(self, cycle):
+        for proc in self.traffic_procs:
+            try:
+                os.killpg(proc.pid, signal.SIGTERM)
+                proc.wait()
+            except Exception as e:
+                pass
+        self.traffic_procs.clear()
+        time.sleep(20)
+
+class ColoRealNetworkTest(ColoNetworkTest):
+    """
+    :avocado: tags=colo
+    :avocado: tags=slow
+    :avocado: tags=network_test
+    :avocado: tags=arch:x86_64
+    """
+
+    timeout = 900
+
+    def active_section(self):
+        time.sleep(300)
+        return False
+
+    @skipUnless(iperf3_available(), "iperf3 not available")
+    def test_network(self):
+        if not self.BRIDGE_NAME:
+            self.cancel("bridge options not given, will skip network test")
+        self.log.info("Testing with peer qemu crashing and network load")
+        self._test_colo(loop=2)
+
+class ColoStressTest(ColoNetworkTest):
+    """
+    :avocado: tags=colo
+    :avocado: tags=slow
+    :avocado: tags=stress_test
+    :avocado: tags=arch:x86_64
+    """
+
+    timeout = 1800
+
+    def _cycle_start(self, cycle):
+        if cycle == 4:
+            self.log.info("Stresstest with peer qemu hanging, network load"
+                          " and failover during checkpoint")
+            self.checkpoint_failover = True
+            self.hang_qemu = True
+        elif cycle == 8:
+            self.log.info("Stresstest with peer qemu crashing and network load")
+            self.checkpoint_failover = False
+            self.hang_qemu = False
+        elif cycle == 12:
+            self.log.info("Stresstest with peer qemu hanging and network load")
+            self.checkpoint_failover = False
+            self.hang_qemu = True
+
+    @skipUnless(iperf3_available(), "iperf3 not available")
+    def test_stress(self):
+        if not self.BRIDGE_NAME:
+            self.cancel("bridge options not given, will skip network test")
+        self.log.info("Stresstest with peer qemu crashing, network load"
+                      " and failover during checkpoint")
+        self.checkpoint_failover = True
+        self._test_colo(loop=16)
--
2.30.0


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]

^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH v4 5/6] configure,Makefile: Install colo resource-agent
  2021-02-07 15:54 [PATCH v4 0/6] colo: Introduce resource agent and test suite/CI Lukas Straub
                   ` (3 preceding siblings ...)
  2021-02-07 15:55 ` [PATCH v4 4/6] colo: Introduce high-level test suite Lukas Straub
@ 2021-02-07 15:55 ` Lukas Straub
  2021-02-07 15:55 ` [PATCH v4 6/6] MAINTAINERS: Add myself as maintainer for COLO resource agent Lukas Straub
  5 siblings, 0 replies; 7+ messages in thread
From: Lukas Straub @ 2021-02-07 15:55 UTC (permalink / raw)
  To: qemu-devel
  Cc: Philippe Mathieu-Daudé, Wainer dos Santos Moschetta, Cleber Rosa

[-- Attachment #1: Type: text/plain, Size: 3729 bytes --]

Optionally install the resouce-agent so it gets picked up by
pacemaker.

Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
 configure         | 7 +++++++
 meson.build       | 5 +++++
 meson_options.txt | 2 ++
 3 files changed, 14 insertions(+)

diff --git a/configure b/configure
index a34f91171d..54fc7e533f 100755
--- a/configure
+++ b/configure
@@ -382,6 +382,7 @@ softmmu="yes"
 linux_user="no"
 bsd_user="no"
 blobs="true"
+install_colo_ra="false"
 pkgversion=""
 pie=""
 qom_cast_debug="yes"
@@ -1229,6 +1230,10 @@ for opt do
   ;;
   --disable-blobs) blobs="false"
   ;;
+  --disable-colo-ra) install_colo_ra="false"
+  ;;
+  --enable-colo-ra) install_colo_ra="true"
+  ;;
   --with-pkgversion=*) pkgversion="$optarg"
   ;;
   --with-coroutine=*) coroutine="$optarg"
@@ -1772,6 +1777,7 @@ Advanced options (experts only):
                            ucontext, sigaltstack, windows
   --enable-gcov            enable test coverage analysis with gcov
   --disable-blobs          disable installing provided firmware blobs
+  --enable-colo-ra         enable installing the COLO resource agent for pacemaker
   --with-vss-sdk=SDK-path  enable Windows VSS support in QEMU Guest Agent
   --with-win-sdk=SDK-path  path to Windows Platform SDK (to build VSS .tlb)
   --tls-priority           default TLS protocol/cipher priority string
@@ -6414,6 +6420,7 @@ NINJA=$ninja $meson setup \
         -Dzstd=$zstd -Dseccomp=$seccomp -Dvirtfs=$virtfs -Dcap_ng=$cap_ng \
         -Dattr=$attr -Ddefault_devices=$default_devices \
         -Ddocs=$docs -Dsphinx_build=$sphinx_build -Dinstall_blobs=$blobs \
+        -Dinstall_colo_ra=$install_colo_ra \
         -Dvhost_user_blk_server=$vhost_user_blk_server \
         -Dfuse=$fuse -Dfuse_lseek=$fuse_lseek -Dguest_agent_msi=$guest_agent_msi \
         $(if test "$default_features" = no; then echo "-Dauto_features=disabled"; fi) \
diff --git a/meson.build b/meson.build
index 2d8b433ff0..82efa75e36 100644
--- a/meson.build
+++ b/meson.build
@@ -2263,6 +2263,10 @@ elif get_option('guest_agent_msi').enabled()
   error('Guest agent MSI requested, but the guest agent is not being built')
 endif

+if get_option('install_colo_ra')
+  install_data('scripts/colo-resource-agent/colo', install_dir: get_option('libdir') / 'ocf/resource.d/qemu')
+endif
+
 # Don't build qemu-keymap if xkbcommon is not explicitly enabled
 # when we don't build tools or system
 if xkbcommon.found()
@@ -2398,6 +2402,7 @@ summary_info += {'system-mode emulation': have_system}
 summary_info += {'user-mode emulation': have_user}
 summary_info += {'block layer':       have_block}
 summary_info += {'Install blobs':     get_option('install_blobs')}
+summary_info += {'Install COLO resource agent': get_option('install_colo_ra')}
 summary_info += {'module support':    config_host.has_key('CONFIG_MODULES')}
 if config_host.has_key('CONFIG_MODULES')
   summary_info += {'alternative module path': config_host.has_key('CONFIG_MODULE_UPGRADES')}
diff --git a/meson_options.txt b/meson_options.txt
index 95f1079829..907d5dff61 100644
--- a/meson_options.txt
+++ b/meson_options.txt
@@ -15,6 +15,8 @@ option('gettext', type : 'feature', value : 'auto',
        description: 'Localization of the GTK+ user interface')
 option('install_blobs', type : 'boolean', value : true,
        description: 'install provided firmware blobs')
+option('install_colo_ra', type : 'boolean', value : false,
+       description: 'install the COLO resource agent for pacemaker')
 option('sparse', type : 'feature', value : 'auto',
        description: 'sparse checker')
 option('guest_agent_msi', type : 'feature', value : 'auto',
--
2.30.0


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]

^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH v4 6/6] MAINTAINERS: Add myself as maintainer for COLO resource agent
  2021-02-07 15:54 [PATCH v4 0/6] colo: Introduce resource agent and test suite/CI Lukas Straub
                   ` (4 preceding siblings ...)
  2021-02-07 15:55 ` [PATCH v4 5/6] configure,Makefile: Install colo resource-agent Lukas Straub
@ 2021-02-07 15:55 ` Lukas Straub
  5 siblings, 0 replies; 7+ messages in thread
From: Lukas Straub @ 2021-02-07 15:55 UTC (permalink / raw)
  To: qemu-devel
  Cc: Philippe Mathieu-Daudé, Wainer dos Santos Moschetta, Cleber Rosa

[-- Attachment #1: Type: text/plain, Size: 597 bytes --]

Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
 MAINTAINERS | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/MAINTAINERS b/MAINTAINERS
index 8d8b0bf966..d04567aa4d 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -2773,6 +2773,12 @@ F: net/colo*
 F: net/filter-rewriter.c
 F: net/filter-mirror.c

+COLO resource agent and testing
+M: Lukas Straub <lukasstraub2@web.de>
+S: Odd fixes
+F: scripts/colo-resource-agent/*
+F: tests/acceptance/colo.py
+
 Record/replay
 M: Pavel Dovgalyuk <pavel.dovgaluk@ispras.ru>
 R: Paolo Bonzini <pbonzini@redhat.com>
--
2.30.0

[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]

^ permalink raw reply related	[flat|nested] 7+ messages in thread

end of thread, other threads:[~2021-02-07 16:04 UTC | newest]

Thread overview: 7+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-02-07 15:54 [PATCH v4 0/6] colo: Introduce resource agent and test suite/CI Lukas Straub
2021-02-07 15:55 ` [PATCH v4 1/6] avocado_qemu: Introduce pick_qemu_util to pick qemu utility binaries Lukas Straub
2021-02-07 15:55 ` [PATCH v4 2/6] boot_linux.py: Use pick_qemu_util Lukas Straub
2021-02-07 15:55 ` [PATCH v4 3/6] colo: Introduce resource agent Lukas Straub
2021-02-07 15:55 ` [PATCH v4 4/6] colo: Introduce high-level test suite Lukas Straub
2021-02-07 15:55 ` [PATCH v4 5/6] configure,Makefile: Install colo resource-agent Lukas Straub
2021-02-07 15:55 ` [PATCH v4 6/6] MAINTAINERS: Add myself as maintainer for COLO resource agent Lukas Straub

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.