qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
* [PATCH 0/4] colo: Introduce resource agent and high-level test
@ 2019-11-21 17:49 Lukas Straub
  2019-11-21 17:49 ` [PATCH 1/4] block/quorum.c: stable children names Lukas Straub
                   ` (4 more replies)
  0 siblings, 5 replies; 13+ messages in thread
From: Lukas Straub @ 2019-11-21 17:49 UTC (permalink / raw)
  To: qemu-devel
  Cc: Zhang, Chen, Jason Wang, Alberto Garcia, Dr. David Alan Gilbert

Hello Everyone,
These patches introduce a resource agent for use with the Pacemaker CRM and a
high-level test utilizing it for testing qemu COLO.

The resource agent manages qemu COLO including continuous replication.

Currently the second test case (where the peer qemu is frozen) fails on primary
failover, because qemu hangs while removing the replication related block nodes.
Note that this also happens in real world test when cutting power to the peer
host, so this needs to be fixed.

Based-on: <cover.1571925699.git.lukasstraub2@web.de>
([PATCH v7 0/4] colo: Add support for continuous replication)

Lukas Straub (4):
  block/quorum.c: stable children names
  colo: Introduce resource agent
  colo: Introduce high-level test
  MAINTAINERS: Add myself as maintainer for COLO resource agent

 MAINTAINERS                            |    6 +
 block/quorum.c                         |    6 +
 scripts/colo-resource-agent/colo       | 1026 ++++++++++++++++++++++++
 scripts/colo-resource-agent/crm_master |   44 +
 tests/acceptance/colo.py               |  444 ++++++++++
 5 files changed, 1526 insertions(+)
 create mode 100755 scripts/colo-resource-agent/colo
 create mode 100755 scripts/colo-resource-agent/crm_master
 create mode 100644 tests/acceptance/colo.py

--
2.20.1


^ permalink raw reply	[flat|nested] 13+ messages in thread

* [PATCH 1/4] block/quorum.c: stable children names
  2019-11-21 17:49 [PATCH 0/4] colo: Introduce resource agent and high-level test Lukas Straub
@ 2019-11-21 17:49 ` Lukas Straub
  2019-11-21 18:04   ` Eric Blake
  2019-11-21 17:49 ` [PATCH 2/4] colo: Introduce resource agent Lukas Straub
                   ` (3 subsequent siblings)
  4 siblings, 1 reply; 13+ messages in thread
From: Lukas Straub @ 2019-11-21 17:49 UTC (permalink / raw)
  To: qemu-devel
  Cc: Zhang, Chen, Jason Wang, Alberto Garcia, Dr. David Alan Gilbert

If we remove the child with the highest index from the quorum,
decrement s->next_child_index. This way we get stable children
names as long as we only remove the last child.

Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
 block/quorum.c | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/block/quorum.c b/block/quorum.c
index df68adcfaa..6100d4108a 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -1054,6 +1054,12 @@ static void quorum_del_child(BlockDriverState *bs, BdrvChild *child,
     /* We know now that num_children > threshold, so blkverify must be false */
     assert(!s->is_blkverify);

+    unsigned child_id;
+    sscanf(child->name, "children.%u", &child_id);
+    if (child_id == s->next_child_index - 1) {
+        s->next_child_index--;
+    }
+
     bdrv_drained_begin(bs);

     /* We can safely remove this child now */
--
2.20.1



^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 2/4] colo: Introduce resource agent
  2019-11-21 17:49 [PATCH 0/4] colo: Introduce resource agent and high-level test Lukas Straub
  2019-11-21 17:49 ` [PATCH 1/4] block/quorum.c: stable children names Lukas Straub
@ 2019-11-21 17:49 ` Lukas Straub
  2019-11-21 17:49 ` [PATCH 3/4] colo: Introduce high-level test Lukas Straub
                   ` (2 subsequent siblings)
  4 siblings, 0 replies; 13+ messages in thread
From: Lukas Straub @ 2019-11-21 17:49 UTC (permalink / raw)
  To: qemu-devel
  Cc: Zhang, Chen, Jason Wang, Alberto Garcia, Dr. David Alan Gilbert

Introduce a resource agent which can be used in a
Pacemaker cluster to manage qemu COLO.

Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
 scripts/colo-resource-agent/colo | 1026 ++++++++++++++++++++++++++++++
 1 file changed, 1026 insertions(+)
 create mode 100755 scripts/colo-resource-agent/colo

diff --git a/scripts/colo-resource-agent/colo b/scripts/colo-resource-agent/colo
new file mode 100755
index 0000000000..5fd9cfc0b5
--- /dev/null
+++ b/scripts/colo-resource-agent/colo
@@ -0,0 +1,1026 @@
+#!/usr/bin/env python
+
+# Resource agent for qemu COLO for use with Pacemaker CRM
+#
+# Copyright (c) Lukas Straub <lukasstraub2@web.de>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later.  See the COPYING file in the top-level directory.
+
+from __future__ import print_function
+import subprocess
+import sys
+import os
+import os.path
+import signal
+import socket
+import json
+import re
+import time
+import logging
+import logging.handlers
+
+# Constants
+OCF_SUCCESS = 0
+OCF_ERR_GENERIC = 1
+OCF_ERR_ARGS = 2
+OCF_ERR_UNIMPLEMENTED = 3
+OCF_ERR_PERM = 4
+OCF_ERR_INSTALLED = 5
+OCF_ERR_CONFIGURED = 6
+OCF_NOT_RUNNING = 7
+OCF_RUNNING_MASTER = 8
+OCF_FAILED_MASTER = 9
+
+# Get environment variables
+OCF_RESKEY_CRM_meta_notify_type \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_type")
+OCF_RESKEY_CRM_meta_notify_operation \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_operation")
+OCF_RESKEY_CRM_meta_notify_start_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_start_uname", "")
+OCF_RESKEY_CRM_meta_notify_stop_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_stop_uname", "")
+OCF_RESKEY_CRM_meta_notify_active_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_active_uname", "")
+OCF_RESKEY_CRM_meta_notify_promote_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_promote_uname", "")
+OCF_RESKEY_CRM_meta_notify_demote_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_demote_uname", "")
+OCF_RESKEY_CRM_meta_notify_master_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_master_uname", "")
+OCF_RESKEY_CRM_meta_notify_slave_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_slave_uname", "")
+
+HA_RSCTMP = os.getenv("HA_RSCTMP", "/run/resource-agents")
+HA_LOGFACILITY = os.getenv("HA_LOGFACILITY")
+HA_LOGFILE = os.getenv("HA_LOGFILE")
+HA_DEBUGLOG = os.getenv("HA_DEBUGLOG")
+OCF_RESOURCE_INSTANCE = os.getenv("OCF_RESOURCE_INSTANCE", "default-instance")
+OCF_RESKEY_CRM_meta_timeout \
+    = int(os.getenv("OCF_RESKEY_CRM_meta_timeout", "60000"))
+OCF_RESKEY_CRM_meta_interval \
+    = int(os.getenv("OCF_RESKEY_CRM_meta_interval", "1"))
+OCF_RESKEY_CRM_meta_clone_max \
+    = int(os.getenv("OCF_RESKEY_CRM_meta_clone_max", "1"))
+OCF_RESKEY_CRM_meta_clone_node_max \
+    = int(os.getenv("OCF_RESKEY_CRM_meta_clone_node_max", "1"))
+OCF_RESKEY_CRM_meta_master_max \
+    = int(os.getenv("OCF_RESKEY_CRM_meta_master_max", "1"))
+OCF_RESKEY_CRM_meta_master_node_max \
+    = int(os.getenv("OCF_RESKEY_CRM_meta_master_node_max", "1"))
+OCF_RESKEY_CRM_meta_notify \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify")
+OCF_RESKEY_CRM_meta_globally_unique \
+    = os.getenv("OCF_RESKEY_CRM_meta_globally_unique")
+
+HOSTNAME = os.getenv("OCF_RESKEY_CRM_meta_on_node", socket.gethostname())
+
+OCF_ACTION = os.getenv("__OCF_ACTION")
+if not OCF_ACTION and len(sys.argv) == 2:
+    OCF_ACTION = sys.argv[1]
+
+# Resource parameters
+OCF_RESKEY_binary_default = "qemu-system-x86_64"
+OCF_RESKEY_log_dir_default = HA_RSCTMP
+OCF_RESKEY_options_default = ""
+OCF_RESKEY_disk_size_default = ""
+OCF_RESKEY_active_hidden_dir_default = ""
+OCF_RESKEY_listen_address_default = "0.0.0.0"
+OCF_RESKEY_base_port_default = "9000"
+OCF_RESKEY_checkpoint_interval_default = "20000"
+OCF_RESKEY_debug_default = "false"
+
+OCF_RESKEY_binary = os.getenv("OCF_RESKEY_binary", OCF_RESKEY_binary_default)
+OCF_RESKEY_log_dir = os.getenv("OCF_RESKEY_log_dir", OCF_RESKEY_log_dir_default)
+OCF_RESKEY_options = os.getenv("OCF_RESKEY_options", OCF_RESKEY_options_default)
+OCF_RESKEY_disk_size \
+    = os.getenv("OCF_RESKEY_disk_size", OCF_RESKEY_disk_size_default)
+OCF_RESKEY_active_hidden_dir \
+    = os.getenv("OCF_RESKEY_active_hidden_dir", \
+                OCF_RESKEY_active_hidden_dir_default)
+OCF_RESKEY_listen_address \
+    = os.getenv("OCF_RESKEY_listen_address", OCF_RESKEY_listen_address_default)
+OCF_RESKEY_base_port \
+    = os.getenv("OCF_RESKEY_base_port", OCF_RESKEY_base_port_default)
+OCF_RESKEY_checkpoint_interval \
+    = os.getenv("OCF_RESKEY_checkpoint_interval", \
+                OCF_RESKEY_checkpoint_interval_default)
+OCF_RESKEY_debug = os.getenv("OCF_RESKEY_debug", OCF_RESKEY_debug_default)
+
+ACTIVE_IMAGE = os.path.join(OCF_RESKEY_active_hidden_dir, \
+                            OCF_RESOURCE_INSTANCE + "-active.qcow2")
+HIDDEN_IMAGE = os.path.join(OCF_RESKEY_active_hidden_dir, \
+                            OCF_RESOURCE_INSTANCE + "-hidden.qcow2")
+
+QMP_SOCK = os.path.join(HA_RSCTMP, OCF_RESOURCE_INSTANCE + "-qmp.sock")
+COMP_SOCK = os.path.join(HA_RSCTMP, OCF_RESOURCE_INSTANCE + "-compare.sock")
+COMP_OUT_SOCK = os.path.join(HA_RSCTMP, OCF_RESOURCE_INSTANCE \
+                                        + "-comp_out.sock")
+
+PID_FILE = os.path.join(HA_RSCTMP, OCF_RESOURCE_INSTANCE + "-qemu.pid")
+
+QMP_LOG = os.path.join(OCF_RESKEY_log_dir, OCF_RESOURCE_INSTANCE + "-qmp.log")
+QEMU_LOG = os.path.join(OCF_RESKEY_log_dir, OCF_RESOURCE_INSTANCE + "-qemu.log")
+
+# Exception only raised by ourself
+class Error(Exception):
+    pass
+
+def setup_constants():
+    # This function is called after the parameters where validated
+    global MIGRATE_PORT, MIROR_PORT, COMPARE_IN_PORT, NBD_PORT
+    MIGRATE_PORT = int(OCF_RESKEY_base_port)
+    MIROR_PORT = int(OCF_RESKEY_base_port) + 1
+    COMPARE_IN_PORT = int(OCF_RESKEY_base_port) + 2
+    NBD_PORT = int(OCF_RESKEY_base_port) + 3
+
+    global QEMU_PRIMARY_CMDLINE
+    QEMU_PRIMARY_CMDLINE = ("'%(OCF_RESKEY_binary)s' %(OCF_RESKEY_options)s"
+        " -chardev socket,id=comp0,path='%(COMP_SOCK)s',server,nowait"
+        " -chardev socket,id=comp0-0,path='%(COMP_SOCK)s'"
+        " -chardev socket,id=comp_out,path='%(COMP_OUT_SOCK)s',server,nowait"
+        " -chardev socket,id=comp_out0,path='%(COMP_OUT_SOCK)s'"
+        " -drive if=none,node-name=colo-disk0,driver=quorum,read-pattern=fifo,"
+        "vote-threshold=1,children.0=parent0"
+        " -qmp unix:'%(QMP_SOCK)s',server,nowait"
+        " -daemonize -D '%(QEMU_LOG)s' -pidfile '%(PID_FILE)s'") % globals()
+
+    global QEMU_SECONDARY_CMDLINE
+    QEMU_SECONDARY_CMDLINE = ("'%(OCF_RESKEY_binary)s' %(OCF_RESKEY_options)s"
+        " -chardev socket,id=red0,host='%(OCF_RESKEY_listen_address)s',"
+        "port=%(MIROR_PORT)s,server,nowait"
+        " -chardev socket,id=red1,host='%(OCF_RESKEY_listen_address)s',"
+        "port=%(COMPARE_IN_PORT)s,server,nowait"
+        " -object filter-redirector,id=f1,netdev=hn0,queue=tx,indev=red0"
+        " -object filter-redirector,id=f2,netdev=hn0,queue=rx,outdev=red1"
+        " -object filter-rewriter,id=rew0,netdev=hn0,queue=all"
+        " -drive if=none,node-name=childs0,top-id=colo-disk0,"
+        "driver=replication,mode=secondary,file.driver=qcow2,"
+        "file.file.filename='%(ACTIVE_IMAGE)s',file.backing.driver=qcow2,"
+        "file.backing.file.filename='%(HIDDEN_IMAGE)s',"
+        "file.backing.backing=parent0"
+        " -drive if=none,node-name=colo-disk0,driver=quorum,read-pattern=fifo,"
+        "vote-threshold=1,children.0=childs0"
+        " -incoming tcp:'%(OCF_RESKEY_listen_address)s':%(MIGRATE_PORT)s"
+        " -qmp unix:'%(QMP_SOCK)s',server,nowait"
+        " -daemonize -D '%(QEMU_LOG)s' -pidfile '%(PID_FILE)s'") % globals()
+
+def qemu_colo_meta_data():
+    print("""\
+<?xml version="1.0"?>
+<!DOCTYPE resource-agent SYSTEM "ra-api-1.dtd">
+<resource-agent name="colo">
+
+    <version>0.1</version>
+    <longdesc lang="en">
+Resource agent for qemu COLO. (https://wiki.qemu.org/Features/COLO)
+
+After defining the master/slave instance, the master score has to be
+manually set to show which node has up-to-date data. So you copy your
+image to one host (and create empty images the other host(s)) and then
+run "crm_master -r name_of_your_primitive -v 10" on that host.
+
+Also, you have to set 'notify=true' in the metadata attributes when
+defining the master/slave instance.
+    </longdesc>
+    <shortdesc lang="en">Qemu COLO</shortdesc>
+
+    <parameters>
+
+    <parameter name="binary" unique="0" required="0">
+        <longdesc lang="en">Qemu binary to use</longdesc>
+        <shortdesc lang="en">Qemu binary</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_binary_default + """\"/>
+    </parameter>
+
+    <parameter name="log_dir" unique="0" required="0">
+        <longdesc lang="en">Directory to place logs in</longdesc>
+        <shortdesc lang="en">Log directory</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_log_dir_default + """\"/>
+    </parameter>
+
+    <parameter name="options" unique="0" required="1">
+        <longdesc lang="en">
+Options to pass to qemu. These will be passed alongside COLO specific
+options, so you need to follow these conventions: The netdev should have
+id=hn0 and the disk controller drive=colo-disk0. The image node should
+have id/node-name=parent0, but should not be connected to the guest.
+Example:
+-vnc :0 -enable-kvm -cpu qemu64,+kvmclock -m 512 -netdev bridge,id=hn0
+-device virtio-net,netdev=hn0 -device virtio-blk,drive=colo-disk0
+-drive if=none,id=parent0,format=qcow2,file=/mnt/vms/vm01.qcow2
+        </longdesc>
+        <shortdesc lang="en">Options to pass to qemu.</shortdesc>
+    </parameter>
+
+    <parameter name="disk_size" unique="0" required="1">
+        <longdesc lang="en">Disk size of the image</longdesc>
+        <shortdesc lang="en">Disk size of the image</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_disk_size_default + """\"/>
+    </parameter>
+
+    <parameter name="active_hidden_dir" unique="0" required="1">
+        <longdesc lang="en">
+Directory where the active and hidden images will be stored. It is
+recommended to put this on a ramdisk.
+        </longdesc>
+        <shortdesc lang="en">Path to active and hidden images</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_active_hidden_dir_default + """\"/>
+    </parameter>
+
+    <parameter name="listen_address" unique="0" required="0">
+        <longdesc lang="en">Address to listen on.</longdesc>
+        <shortdesc lang="en">Listen address</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_listen_address_default + """\"/>
+    </parameter>
+
+    <parameter name="base_port" unique="1" required="0">
+        <longdesc lang="en">
+4 tcp ports that are unique for each instance. (base_port to base_port + 3)
+        </longdesc>
+        <shortdesc lang="en">Ports to use</shortdesc>
+        <content type="integer" default=\"""" \
+            + OCF_RESKEY_base_port_default + """\"/>
+    </parameter>
+
+    <parameter name="checkpoint_interval" unique="0" required="0">
+        <longdesc lang="en">
+Interval for regular checkpoints in milliseconds.
+        </longdesc>
+        <shortdesc lang="en">Interval for regular checkpoints</shortdesc>
+        <content type="integer" default=\"""" \
+            + OCF_RESKEY_checkpoint_interval_default + """\"/>
+    </parameter>
+
+    <parameter name="debug" unique="0" required="0">
+        <longdesc lang="en">Enable debuging</longdesc>
+        <shortdesc lang="en">Enable debuging</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_debug_default + """\"/>
+    </parameter>
+
+    </parameters>
+
+    <actions>
+        <action name="start"        timeout="30s" />
+        <action name="stop"         timeout="20s" />
+        <action name="monitor"      timeout="40s" \
+            interval="12s" depth="0" />
+        <action name="monitor"      timeout="40s" \
+            interval="10s" depth="0" role="Slave" />
+        <action name="monitor"      timeout="40s" \
+            interval="11s" depth="0" role="Master" />
+        <action name="notify"       timeout="120s" />
+        <action name="promote"      timeout="120s" />
+        <action name="demote"       timeout="120s" />
+        <action name="meta-data"    timeout="5s" />
+        <action name="validate-all" timeout="20s" />
+    </actions>
+
+</resource-agent>
+""")
+
+def logs_open():
+    global log
+    log = logging.getLogger(OCF_RESOURCE_INSTANCE)
+    log.setLevel(logging.DEBUG)
+    formater = logging.Formatter("(%(name)s) %(levelname)s: %(message)s")
+
+    if sys.stdout.isatty():
+        handler = logging.StreamHandler(stream=sys.stderr)
+        handler.setFormatter(formater)
+        log.addHandler(handler)
+
+    if HA_LOGFACILITY:
+        handler = logging.handlers.SysLogHandler("/dev/log")
+        handler.setFormatter(formater)
+        log.addHandler(handler)
+
+    if HA_LOGFILE:
+        handler = logging.FileHandler(HA_LOGFILE)
+        handler.setFormatter(formater)
+        log.addHandler(handler)
+
+    if HA_DEBUGLOG and HA_DEBUGLOG != HA_LOGFILE:
+        handler = logging.FileHandler(HA_DEBUGLOG)
+        handler.setFormatter(formater)
+        log.addHandler(handler)
+
+    global qmp_log
+    qmp_log = logging.getLogger("qmp_log")
+    qmp_log.setLevel(logging.DEBUG)
+    formater = logging.Formatter("%(message)s")
+
+    if is_true(OCF_RESKEY_debug):
+        handler = logging.handlers.WatchedFileHandler(QMP_LOG)
+        handler.setFormatter(formater)
+        qmp_log.addHandler(handler)
+    else:
+        handler = logging.NullHandler()
+        qmp_log.addHandler(handler)
+
+def rotate_logfile(logfile, numlogs):
+    numlogs -= 1
+    for n in range(numlogs, -1, -1):
+        file = logfile
+        if n != 0:
+            file = "%s.%s" % (file, n)
+        if os.path.exists(file):
+            if n == numlogs:
+                os.remove(file)
+            else:
+                newname = "%s.%s" % (logfile, n + 1)
+                os.rename(file, newname)
+
+def is_writable(file):
+    return os.access(file, os.W_OK)
+
+def is_executable_file(file):
+    return os.path.isfile(file) and os.access(file, os.X_OK)
+
+def is_true(var):
+    return re.match("yes|true|1|YES|TRUE|True|ja|on|ON", str(var)) != None
+
+# Check if the binary exists and is executable
+def check_binary(binary):
+    if is_executable_file(binary):
+        return True
+    PATH = os.getenv("PATH", os.defpath)
+    for dir in PATH.split(os.pathsep):
+        if is_executable_file(os.path.join(dir, binary)):
+            return True
+    log.error("binary \"%s\" doesn't exist or not executable" % binary)
+    return False
+
+def run_command(commandline):
+    proc = subprocess.Popen(commandline, shell=True, stdout=subprocess.PIPE,\
+                          stderr=subprocess.STDOUT, universal_newlines=True)
+    stdout, stderr = proc.communicate()
+    if proc.returncode != 0:
+        log.error("command \"%s\" failed with code %s:\n%s" \
+                    % (commandline, proc.returncode, stdout))
+        raise Error()
+
+# Functions for setting and getting the master score to tell Pacemaker which
+# host has the most recent data
+def set_master_score(score):
+    if score <= 0:
+        os.system("crm_master -q -l forever -D")
+    else:
+        os.system("crm_master -q -l forever -v %s" % score)
+
+def set_remote_master_score(remote, score):
+    if score <= 0:
+        os.system("crm_master -q -l forever -N '%s' -D" % remote)
+    else:
+        os.system("crm_master -q -l forever -N '%s' -v %s" % (remote, score))
+
+def get_master_score():
+    proc = subprocess.Popen("crm_master -q -G", shell=True, \
+                            stdout=subprocess.PIPE, universal_newlines=True)
+    stdout, stderr = proc.communicate()
+    if proc.returncode != 0:
+        return 0
+    else:
+        return int(str.strip(stdout))
+
+def get_remote_master_score(remote):
+    proc = subprocess.Popen("crm_master -q -N '%s' -G" % remote, shell=True, \
+                            stdout=subprocess.PIPE, universal_newlines=True)
+    stdout, stderr = proc.communicate()
+    if proc.returncode != 0:
+        return 0
+    else:
+        return int(str.strip(stdout))
+
+def recv_line(fd):
+    line = ""
+    while True:
+        tmp = fd.recv(1)
+        if tmp == "\n":
+            break
+        line += tmp
+    return line
+
+# Filter out events
+def read_answer(fd):
+    while True:
+        line = recv_line(fd)
+        qmp_log.debug(line)
+
+        answer = json.loads(line)
+        # Ignore everything else
+        if "return" in answer or "error" in answer:
+            break
+    return answer
+
+# Execute one or more qmp commands
+def qmp_execute(fd, commands, ignore_error = False):
+    for command in commands:
+        if not command:
+            continue
+
+        try:
+            to_send = json.dumps(command)
+            fd.sendall(to_send + "\n")
+            qmp_log.debug(to_send)
+
+            answer = read_answer(fd)
+        except Exception as e:
+            log.error("while executing qmp command: %s\n%s" \
+                        % (json.dumps(command), e))
+            raise Error()
+
+        if not ignore_error and ("error" in answer):
+            log.error("qmp command returned error:\n%s\n%s" \
+                        % (json.dumps(command), json.dumps(answer)))
+            raise Error()
+
+    return answer
+
+# Open qemu qmp connection
+def qmp_open(fail_fast = False):
+    # Timeout for commands = our timeout minus 10s
+    timeout = max(1, (OCF_RESKEY_CRM_meta_timeout/1000)-10)
+
+    try:
+        fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        if fail_fast:
+            fd.settimeout(min(10, (OCF_RESKEY_CRM_meta_timeout/1000)))
+        else:
+            fd.settimeout(timeout)
+        fd.connect(QMP_SOCK)
+
+        answer = qmp_execute(fd, [{"execute": "qmp_capabilities"}], True)
+    except Exception as e:
+        log.error("while connecting to qmp socket: %s" % e)
+        raise Error()
+
+    if "error" in answer:
+        log.error("while connecting to qmp socket: %s" % json.dumps(answer))
+        raise Error()
+
+    fd.settimeout(timeout)
+    return fd
+
+# Check qemu health and colo role
+def qmp_check_state(fd):
+    answer = qmp_execute(fd, [{"execute": "query-status"}])
+    vm_status = answer["return"]
+
+    answer = qmp_execute(fd, [{"execute": "query-colo-status"}])
+    colo_status = answer["return"]
+
+    if vm_status["status"] == "inmigrate":
+        role = OCF_SUCCESS
+        replication = OCF_NOT_RUNNING
+
+    elif (vm_status["status"] == "running" \
+          or vm_status["status"] == "colo" \
+          or vm_status["status"] == "finish-migrate") \
+         and colo_status["mode"] == "none" \
+         and (colo_status["reason"] == "request" \
+              or colo_status["reason"] == "none"):
+        role = OCF_RUNNING_MASTER
+        replication = OCF_NOT_RUNNING
+
+    elif (vm_status["status"] == "running" \
+          or vm_status["status"] == "colo" \
+          or vm_status["status"] == "finish-migrate") \
+         and colo_status["mode"] == "secondary":
+        role = OCF_SUCCESS
+        replication = OCF_SUCCESS
+
+    elif (vm_status["status"] == "running" \
+          or vm_status["status"] == "colo" \
+          or vm_status["status"] == "finish-migrate") \
+         and colo_status["mode"] == "primary":
+        role = OCF_RUNNING_MASTER
+        replication = OCF_SUCCESS
+
+    else:
+        log.error("unknown qemu status: vm status: %s colo mode: %s" \
+                    % (vm_status["status"], colo_status["mode"]))
+        role = OCF_ERR_GENERIC
+        replication = OCF_ERR_GENERIC
+
+    return role, replication
+
+# Get the host of the nbd node
+def qmp_get_nbd_remote(fd):
+    block_nodes = qmp_execute(fd, [{"execute": "query-named-block-nodes"}])
+    for node in block_nodes["return"]:
+        if node["node-name"] == "nbd0":
+            url = str(node["image"]["filename"])
+            return str.split(url, "//")[1].split("/")[0].split(":")[0]
+    return None
+
+# Check if we are currently resyncing
+def qmp_check_resync(fd):
+    answer = qmp_execute(fd, [{"execute": "query-block-jobs"}])
+    for job in answer["return"]:
+        if job["device"] == "resync":
+            return job
+    return None
+
+def qmp_start_resync(fd, remote):
+    qmp_execute(fd, [
+        {"execute": "blockdev-add", "arguments": {"driver": "nbd", "node-name": "nbd0", "server": {"type": "inet", "host": str(remote), "port": str(NBD_PORT)}, "export": "parent0"}},
+        {"execute": "blockdev-mirror", "arguments": {"device": "colo-disk0", "job-id": "resync", "target": "nbd0", "sync": "full", "auto-dismiss": False}}
+        ])
+
+def qmp_cancel_resync(fd):
+    if qmp_check_resync(fd)["status"] != "concluded":
+        qmp_execute(fd, [{"execute": "block-job-cancel", "arguments": {"device": "resync", "force": True}}], True)
+        # Wait for the block-job to finish
+        while qmp_check_resync(fd)["status"] != "concluded":
+            time.sleep(1)
+
+    qmp_execute(fd, [
+        {"execute": "block-job-dismiss", "arguments": {"id": "resync"}},
+        {"execute": "blockdev-del", "arguments": {"node-name": "nbd0"}}
+        ])
+
+def qmp_start_colo(fd, remote):
+    # Check if we have a filter-rewriter
+    answer = qmp_execute(fd, [{"execute": "qom-list", "arguments": {"path": "/objects/rew0"}}], True)
+    if "error" in answer:
+        if answer["error"]["class"] == "DeviceNotFound":
+            have_filter_rewriter = False
+        else:
+            log.error("while checking for filter-rewriter:\n%s" \
+                        % json.dumps(answer))
+            raise Error()
+    else:
+        have_filter_rewriter = True
+
+    # Pause VM and cancel resync
+    qmp_execute(fd, [
+        {"execute": "stop"},
+        {"execute": "block-job-cancel", "arguments": {"device": "resync"}}
+        ])
+
+    # Wait for the block-job to finish
+    while qmp_check_resync(fd)["status"] != "concluded":
+        time.sleep(1)
+
+    # Add the replication node
+    qmp_execute(fd, [
+        {"execute": "block-job-dismiss", "arguments": {"id": "resync"}},
+        {"execute": "blockdev-add", "arguments": {"driver": "replication", "node-name": "replication0", "mode": "primary", "file": "nbd0"}},
+        {"execute": "x-blockdev-change", "arguments": {"parent": "colo-disk0", "node": "replication0"}}
+        ])
+
+    # Connect mirror and compare_in to secondary
+    qmp_execute(fd, [
+        {"execute": "chardev-add", "arguments": {"id": "mirror0", "backend": {"type": "socket", "data": {"addr": {"type": "inet", "data": {"host": str(remote), "port": str(MIROR_PORT)}}, "server": False, "reconnect": 1}}}},
+        {"execute": "chardev-add", "arguments": {"id": "compare1", "backend": {"type": "socket", "data": {"addr": {"type": "inet", "data": {"host": str(remote), "port": str(COMPARE_IN_PORT)}}, "server": False, "reconnect": 1}}}}
+        ])
+
+    # Add the COLO filters
+    if have_filter_rewriter:
+        qmp_execute(fd, [
+            {"execute": "object-add", "arguments": {"qom-type": "filter-mirror", "id": "m0", "props": {"insert": "before", "position": "id=rew0", "netdev": "hn0", "queue": "tx", "outdev": "mirror0"}}},
+            {"execute": "object-add", "arguments": {"qom-type": "filter-redirector", "id": "redire0", "props": {"insert": "before", "position": "id=rew0", "netdev": "hn0", "queue": "rx", "indev": "comp_out"}}},
+            {"execute": "object-add", "arguments": {"qom-type": "filter-redirector", "id": "redire1", "props": {"insert": "before", "position": "id=rew0", "netdev": "hn0", "queue": "rx", "outdev": "comp0"}}},
+            {"execute": "object-add", "arguments": {"qom-type": "iothread", "id": "iothread1"}},
+            {"execute": "object-add", "arguments": {"qom-type": "colo-compare", "id": "comp0", "props": {"primary_in": "comp0-0", "secondary_in": "compare1", "outdev": "comp_out0", "iothread": "iothread1"}}}
+            ])
+    else:
+        qmp_execute(fd, [
+            {"execute": "object-add", "arguments": {"qom-type": "filter-mirror", "id": "m0", "props": {"netdev": "hn0", "queue": "tx", "outdev": "mirror0"}}},
+            {"execute": "object-add", "arguments": {"qom-type": "filter-redirector", "id": "redire0", "props": {"netdev": "hn0", "queue": "rx", "indev": "comp_out"}}},
+            {"execute": "object-add", "arguments": {"qom-type": "filter-redirector", "id": "redire1", "props": {"netdev": "hn0", "queue": "rx", "outdev": "comp0"}}},
+            {"execute": "object-add", "arguments": {"qom-type": "iothread", "id": "iothread1"}},
+            {"execute": "object-add", "arguments": {"qom-type": "colo-compare", "id": "comp0", "props": {"primary_in": "comp0-0", "secondary_in": "compare1", "outdev": "comp_out0", "iothread": "iothread1"}}}
+            ])
+
+    # Start COLO
+    qmp_execute(fd, [
+        {"execute": "migrate-set-capabilities", "arguments": {"capabilities": [{"capability": "x-colo", "state": True }] }},
+        {"execute": "migrate-set-parameters" , "arguments": {"x-checkpoint-delay": int(OCF_RESKEY_checkpoint_interval)}},
+        {"execute": "migrate", "arguments": {"uri": "tcp:%s:%s" % (remote, MIGRATE_PORT)}}
+        ])
+
+    # Wait for COLO to start
+    while qmp_execute(fd, [{"execute": "query-status"}])["return"]["status"] \
+            == "paused":
+        time.sleep(1)
+
+def qmp_primary_failover(fd):
+    qmp_execute(fd, [
+        {"execute": "x-blockdev-change", "arguments": {"parent": "colo-disk0", "child": "children.1"}},
+        {"execute": "blockdev-del", "arguments": {"node-name": "replication0"}},
+        {"execute": "blockdev-del", "arguments": {"node-name": "nbd0"}},
+        {"execute": "object-del", "arguments": {"id": "comp0"}},
+        {"execute": "object-del", "arguments": {"id": "iothread1"}},
+        {"execute": "object-del", "arguments": {"id": "m0"}},
+        {"execute": "object-del", "arguments": {"id": "redire0"}},
+        {"execute": "object-del", "arguments": {"id": "redire1"}},
+        {"execute": "chardev-remove", "arguments": {"id": "mirror0"}},
+        {"execute": "chardev-remove", "arguments": {"id": "compare1"}},
+        {"execute": "x-colo-lost-heartbeat"}
+        ])
+
+def qmp_secondary_failover(fd):
+    # Stop the NBD server, and resume
+    qmp_execute(fd, [
+        {"execute": "nbd-server-stop"},
+        {"execute": "x-colo-lost-heartbeat"}
+        ])
+
+    # Prepare for continuing replication when we have a new secondary
+    qmp_execute(fd, [
+        {"execute": "object-del", "arguments": {"id": "f2"}},
+        {"execute": "object-del", "arguments": {"id": "f1"}},
+        {"execute": "chardev-remove", "arguments": {"id": "red1"}},
+        {"execute": "chardev-remove", "arguments": {"id": "red0"}},
+        {"execute": "chardev-add", "arguments": {"id": "comp0", "backend": {"type": "socket", "data": {"addr": {"type": "unix", "data": {"path": str(COMP_SOCK)}}, "server": True}}}},
+        {"execute": "chardev-add", "arguments": {"id": "comp0-0", "backend": {"type": "socket", "data": {"addr": {"type": "unix", "data": {"path": str(COMP_SOCK)}}, "server": False}}}},
+        {"execute": "chardev-add", "arguments": {"id": "comp_out", "backend": {"type": "socket", "data": {"addr": {"type": "unix", "data": {"path": str(COMP_OUT_SOCK)}}, "server": True}}}},
+        {"execute": "chardev-add", "arguments": {"id": "comp_out0", "backend": {"type": "socket", "data": {"addr": {"type": "unix", "data": {"path": str(COMP_OUT_SOCK)}}, "server": False}}}}
+        ])
+
+# Sanity checks: check parameters, files, binaries, etc.
+def qemu_colo_validate_all():
+    # Check resource parameters
+    if not str.isdigit(OCF_RESKEY_base_port):
+        log.error("base_port needs to be a number")
+        return OCF_ERR_CONFIGURED
+
+    if not str.isdigit(OCF_RESKEY_checkpoint_interval):
+        log.error("checkpoint_interval needs to be a number")
+        return OCF_ERR_CONFIGURED
+
+    if not OCF_RESKEY_active_hidden_dir:
+        log.error("active_hidden_dir needs to be specified")
+        return OCF_ERR_CONFIGURED
+
+    if not OCF_RESKEY_disk_size:
+        log.error("disk_size needs to be specified")
+        return OCF_ERR_CONFIGURED
+
+    # Check resource meta configuration
+    if OCF_ACTION != "stop":
+        if OCF_RESKEY_CRM_meta_master_max != 1:
+            log.error("only one master allowed")
+            return OCF_ERR_CONFIGURED
+
+        if OCF_RESKEY_CRM_meta_clone_max > 2:
+            log.error("maximum 2 clones allowed")
+            return OCF_ERR_CONFIGURED
+
+        if OCF_RESKEY_CRM_meta_master_node_max != 1:
+            log.error("only one master per node allowed")
+            return OCF_ERR_CONFIGURED
+
+        if OCF_RESKEY_CRM_meta_clone_node_max != 1:
+            log.error("only one clone per node allowed")
+            return OCF_ERR_CONFIGURED
+
+    # Check if notify is enabled
+    if OCF_ACTION != "stop" and OCF_ACTION != "monitor":
+        if not is_true(OCF_RESKEY_CRM_meta_notify) \
+           and not OCF_RESKEY_CRM_meta_notify_start_uname:
+            log.error("notify needs to be enabled")
+            return OCF_ERR_CONFIGURED
+
+    # Check that globally-unique is disabled
+    if is_true(OCF_RESKEY_CRM_meta_globally_unique):
+        log.error("globally-unique needs to be disabled")
+        return OCF_ERR_CONFIGURED
+
+    # Check binaries
+    if not check_binary(OCF_RESKEY_binary):
+        return OCF_ERR_INSTALLED
+
+    if not check_binary("qemu-img"):
+        return OCF_ERR_INSTALLED
+
+    # Check paths and files
+    if not is_writable(OCF_RESKEY_active_hidden_dir) \
+        or not os.path.isdir(OCF_RESKEY_active_hidden_dir):
+        log.error("active and hidden image directory missing or not writable")
+        return OCF_ERR_PERM
+
+    return OCF_SUCCESS
+
+# Check if qemu is running
+def check_pid():
+    if not os.path.exists(PID_FILE):
+        return OCF_NOT_RUNNING, None
+
+    fd = open(PID_FILE, "r")
+    pid = int(str.strip(fd.readline()))
+    fd.close()
+    try:
+        os.kill(pid, 0)
+    except OSError:
+        log.info("qemu is not running")
+        return OCF_NOT_RUNNING, pid
+    else:
+        return OCF_SUCCESS, pid
+
+def qemu_colo_monitor(fail_fast = False):
+    status, pid = check_pid()
+    if status != OCF_SUCCESS:
+        return status, OCF_ERR_GENERIC
+
+    fd = qmp_open(fail_fast)
+
+    role, replication = qmp_check_state(fd)
+    if role != OCF_SUCCESS and role != OCF_RUNNING_MASTER:
+        return role, replication
+
+    if not fail_fast and OCF_RESKEY_CRM_meta_interval != 0:
+        # This isn't a probe monitor
+        block_job = qmp_check_resync(fd)
+        if block_job != None:
+            if "error" in block_job:
+                log.error("resync error: %s" % block_job["error"])
+                qmp_cancel_resync(fd)
+                # TODO: notify pacemaker about peer failure
+            elif block_job["ready"] == True:
+                log.info("resync done, starting colo")
+                peer = qmp_get_nbd_remote(fd)
+                qmp_start_colo(fd, peer)
+                # COLO started, our secondary now can be promoted if the
+                # primary fails
+                set_remote_master_score(peer, 100)
+            else:
+                pct_done = (float(block_job["offset"]) \
+                            / float(block_job["len"])) * 100
+                log.info("resync %.2f%% done" % pct_done)
+
+    fd.close()
+
+    return role, replication
+
+def qemu_colo_start():
+    if check_pid()[0] == OCF_SUCCESS:
+        log.info("qemu is already running")
+        return OCF_SUCCESS
+
+    run_command("qemu-img create -q -f qcow2 %s %s" \
+                % (ACTIVE_IMAGE, OCF_RESKEY_disk_size))
+    run_command("qemu-img create -q -f qcow2 %s %s" \
+                % (HIDDEN_IMAGE, OCF_RESKEY_disk_size))
+
+    rotate_logfile(QMP_LOG, 4)
+    rotate_logfile(QEMU_LOG, 4)
+    run_command(QEMU_SECONDARY_CMDLINE)
+
+    fd = qmp_open()
+    qmp_execute(fd, [
+        {"execute": "nbd-server-start", "arguments": {"addr": {"type": "inet", "data": {"host": str(OCF_RESKEY_listen_address), "port": str(NBD_PORT)}}}},
+        {"execute": "nbd-server-add", "arguments": {"device": "parent0", "writable": True}}
+        ])
+    fd.close()
+
+    return OCF_SUCCESS
+
+def env_do_shutdown_guest():
+    return OCF_RESKEY_CRM_meta_notify_active_uname \
+           and OCF_RESKEY_CRM_meta_notify_stop_uname \
+           and str.strip(OCF_RESKEY_CRM_meta_notify_active_uname) \
+               == str.strip(OCF_RESKEY_CRM_meta_notify_stop_uname)
+
+def env_find_secondary():
+    # slave(s) =
+    # OCF_RESKEY_CRM_meta_notify_slave_uname
+    # - OCF_RESKEY_CRM_meta_notify_stop_uname
+    # + OCF_RESKEY_CRM_meta_notify_start_uname
+    # Filter out hosts that are stopping and ourselves
+    for host in str.split(OCF_RESKEY_CRM_meta_notify_slave_uname, " "):
+        if host:
+            for stopping_host \
+                in str.split(OCF_RESKEY_CRM_meta_notify_stop_uname, " "):
+                if host == stopping_host:
+                    break
+            else:
+                if host != HOSTNAME:
+                    # we found a valid secondary
+                    return host
+
+    for host in str.split(OCF_RESKEY_CRM_meta_notify_start_uname, " "):
+        if host != HOSTNAME:
+            # we found a valid secondary
+            return host
+
+    # we found no secondary
+    return None
+
+def _qemu_colo_stop(monstatus, shutdown_guest):
+    # stop action must do everything possible to stop the resource
+    try:
+        now = time.time()
+        timeout = now + (OCF_RESKEY_CRM_meta_timeout/1000)-10
+        force_stop = False
+
+        if monstatus == OCF_NOT_RUNNING:
+            log.info("resource is already stopped")
+            return OCF_SUCCESS
+        elif monstatus == OCF_RUNNING_MASTER or monstatus == OCF_SUCCESS:
+            force_stop = False
+        else:
+            force_stop = True
+
+        if not force_stop:
+            fd = qmp_open(True)
+            if shutdown_guest:
+                if monstatus == OCF_RUNNING_MASTER:
+                    qmp_execute(fd, [{"execute": "system_powerdown"}])
+            else:
+                qmp_execute(fd, [{"execute": "quit"}])
+            fd.close()
+
+            # wait for qemu to stop
+            while time.time() < timeout:
+                status, pid = check_pid()
+                if status == OCF_NOT_RUNNING:
+                    # qemu stopped
+                    return OCF_SUCCESS
+                elif status == OCF_SUCCESS:
+                    # wait
+                    time.sleep(1)
+                else:
+                    # something went wrong, force stop instead
+                    break
+
+            log.warning("clean stop timeout reached")
+    except Exception as e:
+        log.warning("error while stopping: \"%s\"" % e.message)
+
+    log.info("force stopping qemu")
+
+    status, pid = check_pid()
+    if status == OCF_NOT_RUNNING:
+        return OCF_SUCCESS
+    try:
+        os.kill(pid, signal.SIGTERM)
+        time.sleep(2)
+        os.kill(pid, signal.SIGKILL)
+    except Exception:
+        pass
+
+    while check_pid()[0] != OCF_NOT_RUNNING:
+        time.sleep(1)
+
+    return OCF_SUCCESS
+
+def qemu_colo_stop():
+    shutdown_guest = env_do_shutdown_guest()
+    try:
+        role, replication = qemu_colo_monitor(True)
+    except Exception:
+        role, replication = OCF_ERR_GENERIC, OCF_ERR_GENERIC
+
+    status = _qemu_colo_stop(role, shutdown_guest)
+
+    if HOSTNAME == str.strip(OCF_RESKEY_CRM_meta_notify_master_uname):
+        peer = env_find_secondary()
+        if peer and (get_remote_master_score(peer) > 10) \
+           and not shutdown_guest:
+            # We where a healthy primary and had a healthy secondary. We where
+            # stopped so outdate ourselves, as the secondary will take over.
+            set_master_score(0)
+        else:
+            if role == OCF_RUNNING_MASTER:
+                # We where a healthy primary but had no healty secondary or it
+                # was stopped as well. So we have up-to-date data.
+                set_master_score(10)
+            else:
+                # We where a unhealthy primary but also had no healty secondary.
+                # So we still should have up-to-date data.
+                set_master_score(5)
+    else:
+        if get_master_score() > 10:
+            if role == OCF_SUCCESS:
+                if shutdown_guest:
+                    # We where a healthy secondary and (probably) had a healthy
+                    # primary and both where stopped. So we have up-to-date data
+                    # too.
+                    set_master_score(10)
+                else:
+                    # We where a healthy secondary and (probably) had a healthy
+                    # primary still running. So we are now out of date.
+                    set_master_score(0)
+            else:
+                # We where a unhealthy secondary. So we are now out of date.
+                set_master_score(0)
+
+    return status
+
+def qemu_colo_notify():
+    action = "%s-%s" % (OCF_RESKEY_CRM_meta_notify_type, \
+                        OCF_RESKEY_CRM_meta_notify_operation)
+
+    if action == "post-start":
+        if HOSTNAME == str.strip(OCF_RESKEY_CRM_meta_notify_master_uname):
+            peer = str.strip(OCF_RESKEY_CRM_meta_notify_start_uname)
+            fd = qmp_open()
+            qmp_start_resync(fd, peer)
+            fd.close()
+            # The secondary has inconsistent data until resync is finished
+            set_remote_master_score(peer, 0)
+
+    elif action == "pre-stop":
+        if not env_do_shutdown_guest() \
+           and HOSTNAME == str.strip(OCF_RESKEY_CRM_meta_notify_master_uname):
+            fd = qmp_open()
+            peer = qmp_get_nbd_remote(fd)
+            if peer == str.strip(OCF_RESKEY_CRM_meta_notify_stop_uname):
+                if qmp_check_resync(fd) != None:
+                    qmp_cancel_resync(fd)
+                elif peer and get_remote_master_score(peer) > 10:
+                    qmp_primary_failover(fd)
+            fd.close()
+
+    return OCF_SUCCESS
+
+def qemu_colo_promote():
+    role, replication = qemu_colo_monitor()
+
+    if role == OCF_SUCCESS and replication == OCF_NOT_RUNNING:
+        status = _qemu_colo_stop(role, False)
+        if status != OCF_SUCCESS:
+            return status
+
+        rotate_logfile(QMP_LOG, 4)
+        rotate_logfile(QEMU_LOG, 4)
+        run_command(QEMU_PRIMARY_CMDLINE)
+        set_master_score(101)
+
+        peer = env_find_secondary()
+        if peer:
+            fd = qmp_open()
+            qmp_start_resync(fd, peer)
+            # The secondary has inconsistent data until resync is finished
+            set_remote_master_score(peer, 0)
+            fd.close()
+        return OCF_SUCCESS
+    elif role == OCF_SUCCESS and replication == OCF_SUCCESS:
+        fd = qmp_open()
+        qmp_secondary_failover(fd)
+        set_master_score(101)
+
+        peer = env_find_secondary()
+        if peer:
+            qmp_start_resync(fd, peer)
+            # The secondary has inconsistent data until resync is finished
+            set_remote_master_score(peer, 0)
+        fd.close()
+        return OCF_SUCCESS
+    else:
+        return OCF_ERR_GENERIC
+
+def qemu_colo_demote():
+    status = qemu_colo_stop()
+    if status != OCF_SUCCESS:
+        return status
+    return qemu_colo_start()
+
+
+if OCF_ACTION == "meta-data":
+    qemu_colo_meta_data()
+    exit(OCF_SUCCESS)
+
+logs_open()
+
+status = qemu_colo_validate_all()
+# Exit here if our sanity checks fail, but try to continue if we need to stop
+if status != OCF_SUCCESS and OCF_ACTION != "stop":
+    exit(status)
+
+setup_constants()
+
+os.system("echo 'Action: %s' >> /tmp/ra-env.log" % OCF_ACTION)
+os.system("env >> /tmp/ra-env.log")
+
+try:
+    if OCF_ACTION == "start":
+        status = qemu_colo_start()
+    elif OCF_ACTION == "stop":
+        status = qemu_colo_stop()
+    elif OCF_ACTION == "monitor":
+        status = qemu_colo_monitor()[0]
+    elif OCF_ACTION == "notify":
+        status = qemu_colo_notify()
+    elif OCF_ACTION == "promote":
+        status = qemu_colo_promote()
+    elif OCF_ACTION == "demote":
+        status = qemu_colo_demote()
+    elif OCF_ACTION == "validate-all":
+        status = qemu_colo_validate_all()
+    else:
+        status = OCF_ERR_UNIMPLEMENTED
+except Error:
+    exit(OCF_ERR_GENERIC)
+else:
+    exit(status)
--
2.20.1



^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 3/4] colo: Introduce high-level test
  2019-11-21 17:49 [PATCH 0/4] colo: Introduce resource agent and high-level test Lukas Straub
  2019-11-21 17:49 ` [PATCH 1/4] block/quorum.c: stable children names Lukas Straub
  2019-11-21 17:49 ` [PATCH 2/4] colo: Introduce resource agent Lukas Straub
@ 2019-11-21 17:49 ` Lukas Straub
  2019-11-21 17:49 ` [PATCH 4/4] MAINTAINERS: Add myself as maintainer for COLO resource agent Lukas Straub
  2019-11-22  9:46 ` [PATCH 0/4] colo: Introduce resource agent and high-level test Dr. David Alan Gilbert
  4 siblings, 0 replies; 13+ messages in thread
From: Lukas Straub @ 2019-11-21 17:49 UTC (permalink / raw)
  To: qemu-devel
  Cc: Zhang, Chen, Jason Wang, Alberto Garcia, Dr. David Alan Gilbert

Add high-level test relying on the colo resource-agent to test
all failover cases while checking guest network connectivity

Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
 scripts/colo-resource-agent/crm_master |  44 +++
 tests/acceptance/colo.py               | 444 +++++++++++++++++++++++++
 2 files changed, 488 insertions(+)
 create mode 100755 scripts/colo-resource-agent/crm_master
 create mode 100644 tests/acceptance/colo.py

diff --git a/scripts/colo-resource-agent/crm_master b/scripts/colo-resource-agent/crm_master
new file mode 100755
index 0000000000..00c386b949
--- /dev/null
+++ b/scripts/colo-resource-agent/crm_master
@@ -0,0 +1,44 @@
+#!/bin/bash
+
+# Fake crm_master for COLO testing
+#
+# Copyright (c) Lukas Straub <lukasstraub2@web.de>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later.  See the COPYING file in the top-level directory.
+
+TMPDIR="$HA_RSCTMP"
+score=0
+query=0
+
+OPTIND=1
+while getopts 'Qql:Dv:N:G' opt; do
+    case "$opt" in
+        Q|q)
+            # Noop
+        ;;
+        "l")
+            # Noop
+        ;;
+        "D")
+            score=0
+        ;;
+        "v")
+            score=$OPTARG
+        ;;
+        "N")
+            TMPDIR="$COLO_SMOKE_REMOTE_TMP"
+        ;;
+        "G")
+            query=1
+        ;;
+    esac
+done
+
+if (( query )); then
+    cat "${TMPDIR}/master_score" || exit 1
+else
+    echo $score > "${TMPDIR}/master_score" || exit 1
+fi
+
+exit 0
diff --git a/tests/acceptance/colo.py b/tests/acceptance/colo.py
new file mode 100644
index 0000000000..94a6adabdd
--- /dev/null
+++ b/tests/acceptance/colo.py
@@ -0,0 +1,444 @@
+#!/usr/bin/env python
+
+# High-level test for qemu COLO testing all failover cases while checking
+# guest network connectivity
+#
+# Copyright (c) Lukas Straub <lukasstraub2@web.de>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later.  See the COPYING file in the top-level directory.
+
+import select
+import sys
+import subprocess
+import shutil
+import os
+import signal
+import os.path
+import json
+import time
+import tempfile
+
+from avocado import Test
+from avocado.utils.archive import gzip_uncompress
+from avocado.utils import network
+from avocado_qemu import pick_default_qemu_bin, SRC_ROOT_DIR
+
+class ColoTest(Test):
+    timeout = 120
+
+    # Constants
+    OCF_SUCCESS = 0
+    OCF_ERR_GENERIC = 1
+    OCF_ERR_ARGS = 2
+    OCF_ERR_UNIMPLEMENTED = 3
+    OCF_ERR_PERM = 4
+    OCF_ERR_INSTALLED = 5
+    OCF_ERR_CONFIGURED = 6
+    OCF_NOT_RUNNING = 7
+    OCF_RUNNING_MASTER = 8
+    OCF_FAILED_MASTER = 9
+
+    HOSTA = 10
+    HOSTB = 11
+
+    QEMU_OPTIONS = (" -enable-kvm -cpu qemu64,+kvmclock -m 256"
+                    " -device virtio-net,netdev=hn0"
+                    " -device virtio-blk,drive=colo-disk0")
+    COLO_RA = "scripts/colo-resource-agent/colo"
+    FAKEPATH = ".:scripts/colo-resource-agent"
+
+    bridge_proc = None
+    ssh_proc = None
+
+    def setUp(self):
+        # Qemu binary
+        default_qemu_bin = pick_default_qemu_bin()
+        self.QEMU_BINARY = self.params.get('qemu_bin', default=default_qemu_bin)
+
+        # Find free port range
+        base_port = 1024
+        while True:
+            base_port = network.find_free_port(start_port=base_port, \
+                                               address="127.0.0.1")
+            if base_port == None:
+                self.cancel("Failed to find a free port")
+            for n in range(base_port, base_port +6):
+                if not network.is_port_free(n, "127.0.0.1"):
+                    base_port = n +1
+                    break
+            else:
+                # for loop above didn't break
+                break
+
+        self.BRIDGE_HOSTA_PORT = base_port
+        self.BRIDGE_HOSTB_PORT = base_port + 1
+        self.SSH_PORT = base_port + 2
+        self.COLO_BASE_PORT = base_port + 3
+
+        # Temporary directories
+        self.TMPDIR = tempfile.mkdtemp()
+        self.TMPA = os.path.join(self.TMPDIR, "hosta")
+        self.TMPB = os.path.join(self.TMPDIR, "hostb")
+        os.makedirs(self.TMPA)
+        os.makedirs(self.TMPB)
+
+        # Disk images
+        self.HOSTA_IMAGE = os.path.join(self.TMPA, "image.raw")
+        self.HOSTB_IMAGE = os.path.join(self.TMPB, "image.raw")
+
+        image_url = ("https://downloads.openwrt.org/releases/18.06.5/targets/"
+                     "x86/64/openwrt-18.06.5-x86-64-combined-ext4.img.gz")
+        image_hash = ("55589a3a9b943218b1734d196bcaa92a"
+                      "3cfad91c07fa6891474b4291ce1b8ec2")
+        self.IMAGE_SIZE = "285736960b"
+        download = self.fetch_asset(image_url, asset_hash=image_hash, \
+                                    algorithm="sha256")
+        gzip_uncompress(download, self.HOSTA_IMAGE)
+        shutil.copyfile(self.HOSTA_IMAGE, self.HOSTB_IMAGE)
+
+        self.log.info("Will put logs in \"%s\"" % self.outputdir)
+        self.RA_LOG = os.path.join(self.outputdir, "resource-agent.log")
+        self.HOSTA_LOGDIR = os.path.join(self.outputdir, "hosta")
+        self.HOSTB_LOGDIR = os.path.join(self.outputdir, "hostb")
+        os.makedirs(self.HOSTA_LOGDIR)
+        os.makedirs(self.HOSTB_LOGDIR)
+
+        # Network bridge
+        self.BRIDGE_PIDFILE = os.path.join(self.TMPDIR, "bridge.pid")
+        pid = self.read_pidfile(self.BRIDGE_PIDFILE)
+        if not (pid and self.check_pid(pid)):
+            self.run_command(("%s -M none -daemonize -pidfile '%s'"
+                " -netdev socket,id=hosta,listen=127.0.0.1:%s"
+                " -netdev hubport,id=porta,hubid=0,netdev=hosta"
+                " -netdev socket,id=hostb,listen=127.0.0.1:%s"
+                " -netdev hubport,id=portb,hubid=0,netdev=hostb"
+                " -netdev user,net=192.168.1.1/24,host=192.168.1.2,"
+                "hostfwd=tcp:127.0.0.1:%s-192.168.1.1:22,id=host"
+                " -netdev hubport,id=hostport,hubid=0,netdev=host")
+                % (self.QEMU_BINARY, self.BRIDGE_PIDFILE,
+                   self.BRIDGE_HOSTA_PORT, self.BRIDGE_HOSTB_PORT,
+                   self.SSH_PORT), 0)
+
+    def tearDown(self):
+        try:
+            pid = self.read_pidfile(self.BRIDGE_PIDFILE)
+            if pid and self.check_pid(pid):
+                os.kill(pid, signal.SIGKILL)
+        except Exception():
+            pass
+        try:
+            self.ra_stop(self.HOSTA)
+        except Exception():
+            pass
+        try:
+            self.ra_stop(self.HOSTB)
+        except Exception():
+            pass
+        try:
+            if self.ssh_proc:
+                self.ssh_proc.terminate()
+        except Exception():
+            pass
+
+        shutil.rmtree(self.TMPDIR)
+
+    def run_command(self, cmdline, expected_status, env=None, error_fail=True):
+        proc = subprocess.Popen(cmdline, shell=True, stdout=subprocess.PIPE, \
+                                stderr=subprocess.STDOUT, \
+                                universal_newlines=True, env=env)
+        stdout, stderr = proc.communicate()
+        if proc.returncode != expected_status:
+            message = "command \"%s\" failed with code %s:\n%s" \
+                           % (cmdline, proc.returncode, stdout)
+            if error_fail:
+                self.log.error(message)
+                self.fail("command \"%s\" failed" % cmdline)
+            else:
+                self.log.info(message)
+
+        return proc.returncode
+
+    def cat_line(self, path):
+        line=""
+        try:
+            fd = open(path, "r")
+            line = str.strip(fd.readline())
+            fd.close()
+        except:
+            pass
+        return line
+
+    def read_pidfile(self, pidfile):
+        try:
+            pid = int(self.cat_line(pidfile))
+        except ValueError:
+            return None
+        else:
+            return pid
+
+    def check_pid(self, pid):
+        try:
+            os.kill(pid, 0)
+        except OSError:
+            return False
+        else:
+            return True
+
+    def ssh_ping(self, proc):
+        proc.stdin.write("ping\n")
+        if not select.select([proc.stdout], [], [], 30)[0]:
+            raise self.fail("ssh ping timeout reached")
+        if proc.stdout.readline() != "ping\n":
+            raise self.fail("unexpected ssh ping answer")
+
+    def ssh_open(self):
+        commandline = ("ssh -o \"UserKnownHostsFile /dev/null\""
+                       " -o \"StrictHostKeyChecking no\""
+                       " -p%s root@127.0.0.1") % self.SSH_PORT
+
+        self.log.info("Connecting via ssh")
+        for i in range(10):
+            if self.run_command(commandline + " exit", 0, error_fail=False) \
+                == 0:
+                proc = subprocess.Popen(commandline + " cat", shell=True, \
+                                            stdin=subprocess.PIPE, \
+                                            stdout=subprocess.PIPE, \
+                                            stderr=0, \
+                                            universal_newlines=True,
+                                            bufsize=1)
+                self.ssh_ping(proc)
+                return proc
+            else:
+                time.sleep(5)
+        self.fail("ssh connect timeout reached")
+
+    def ssh_close(self, proc):
+        proc.terminate()
+
+    def setup_base_env(self, host):
+        PATH = os.getenv("PATH", "")
+        env = { "PATH": "%s:%s" % (self.FAKEPATH, PATH),
+                "HA_LOGFILE": self.RA_LOG,
+                "OCF_RESOURCE_INSTANCE": "colo-test",
+                "OCF_RESKEY_CRM_meta_clone_max": "2",
+                "OCF_RESKEY_CRM_meta_notify": "true",
+                "OCF_RESKEY_CRM_meta_timeout": "30000",
+                "OCF_RESKEY_binary": self.QEMU_BINARY,
+                "OCF_RESKEY_disk_size": str(self.IMAGE_SIZE),
+                "OCF_RESKEY_checkpoint_interval": "1000",
+                "OCF_RESKEY_base_port": str(self.COLO_BASE_PORT),
+                "OCF_RESKEY_debug": "true"}
+
+        if host == self.HOSTA:
+            env.update({"OCF_RESKEY_options":
+                            ("%s -netdev socket,id=hn0,connect=127.0.0.1:%s"
+                             " -drive if=none,id=parent0,format=raw,file='%s'")
+                            % (self.QEMU_OPTIONS, self.BRIDGE_HOSTA_PORT,
+                                self.HOSTA_IMAGE),
+                        "OCF_RESKEY_active_hidden_dir": self.TMPA,
+                        "OCF_RESKEY_listen_address": "127.0.0.1",
+                        "OCF_RESKEY_log_dir": self.HOSTA_LOGDIR,
+                        "OCF_RESKEY_CRM_meta_on_node": "127.0.0.1",
+                        "HA_RSCTMP": self.TMPA,
+                        "COLO_SMOKE_REMOTE_TMP": self.TMPB})
+        else:
+            env.update({"OCF_RESKEY_options":
+                            ("%s -netdev socket,id=hn0,connect=127.0.0.1:%s"
+                             " -drive if=none,id=parent0,format=raw,file='%s'")
+                            % (self.QEMU_OPTIONS, self.BRIDGE_HOSTB_PORT,
+                                self.HOSTB_IMAGE),
+                        "OCF_RESKEY_active_hidden_dir": self.TMPB,
+                        "OCF_RESKEY_listen_address": "127.0.0.2",
+                        "OCF_RESKEY_log_dir": self.HOSTB_LOGDIR,
+                        "OCF_RESKEY_CRM_meta_on_node": "127.0.0.2",
+                        "HA_RSCTMP": self.TMPB,
+                        "COLO_SMOKE_REMOTE_TMP": self.TMPA})
+        return env
+
+    def ra_start(self, host):
+        env = self.setup_base_env(host)
+        self.run_command(self.COLO_RA + " start", self.OCF_SUCCESS, env)
+
+    def ra_stop(self, host):
+        env = self.setup_base_env(host)
+        self.run_command(self.COLO_RA + " stop", self.OCF_SUCCESS, env)
+
+    def ra_monitor(self, host, expected_status):
+        env = self.setup_base_env(host)
+        self.run_command(self.COLO_RA + " monitor", expected_status, env)
+
+    def ra_promote(self, host):
+        env = self.setup_base_env(host)
+        self.run_command(self.COLO_RA + " promote", self.OCF_SUCCESS, env)
+
+    def ra_notify_start(self, host):
+        env = self.setup_base_env(host)
+
+        env.update({"OCF_RESKEY_CRM_meta_notify_type": "post",
+                    "OCF_RESKEY_CRM_meta_notify_operation": "start"})
+
+        if host == self.HOSTA:
+            env.update({"OCF_RESKEY_CRM_meta_notify_master_uname": "127.0.0.1",
+                        "OCF_RESKEY_CRM_meta_notify_start_uname": "127.0.0.2"})
+        else:
+            env.update({"OCF_RESKEY_CRM_meta_notify_master_uname": "127.0.0.2",
+                        "OCF_RESKEY_CRM_meta_notify_start_uname": "127.0.0.1"})
+
+        self.run_command(self.COLO_RA + " notify", self.OCF_SUCCESS, env)
+
+    def ra_notify_stop(self, host):
+        env = self.setup_base_env(host)
+
+        env.update({"OCF_RESKEY_CRM_meta_notify_type": "pre",
+                    "OCF_RESKEY_CRM_meta_notify_operation": "stop"})
+
+        if host == self.HOSTA:
+            env.update({"OCF_RESKEY_CRM_meta_notify_master_uname": "127.0.0.1",
+                        "OCF_RESKEY_CRM_meta_notify_stop_uname": "127.0.0.2"})
+        else:
+            env.update({"OCF_RESKEY_CRM_meta_notify_master_uname": "127.0.0.2",
+                        "OCF_RESKEY_CRM_meta_notify_stop_uname": "127.0.0.1"})
+
+        self.run_command(self.COLO_RA + " notify", self.OCF_SUCCESS, env)
+
+    def kill_qemu_pre(self, host, hang_qemu=False):
+        if host == self.HOSTA:
+            pid = self.read_pidfile(os.path.join(self.TMPA, \
+                                                        "colo-test-qemu.pid"))
+        else:
+            pid = self.read_pidfile(os.path.join(self.TMPB, \
+                                                        "colo-test-qemu.pid"))
+
+        if pid and self.check_pid(pid):
+            if hang_qemu:
+                os.kill(pid, signal.SIGSTOP)
+            else:
+                os.kill(pid, signal.SIGKILL)
+                while self.check_pid(pid):
+                    time.sleep(1)
+
+    def kill_qemu_post(self, host, hang_qemu=False):
+        if host == self.HOSTA:
+            pid = self.read_pidfile(os.path.join(self.TMPA, \
+                                                        "colo-test-qemu.pid"))
+        else:
+            pid = self.read_pidfile(os.path.join(self.TMPB, \
+                                                        "colo-test-qemu.pid"))
+
+        if hang_qemu and pid and self.check_pid(pid):
+            os.kill(pid, signal.SIGKILL)
+            while self.check_pid(pid):
+                time.sleep(1)
+
+    def get_master_score(self, host):
+        if host == self.HOSTA:
+            return int(self.cat_line(os.path.join(self.TMPA, "master_score")))
+        else:
+            return int(self.cat_line(os.path.join(self.TMPB, "master_score")))
+
+    def _test_colo(self, hang_qemu=False, loop=False, do_ssh_ping=True):
+        self.ra_stop(self.HOSTA)
+        self.ra_stop(self.HOSTB)
+
+        self.log.info("Startup")
+        self.ra_start(self.HOSTA)
+        self.ra_start(self.HOSTB)
+
+        self.ra_monitor(self.HOSTA, self.OCF_SUCCESS)
+        self.ra_monitor(self.HOSTB, self.OCF_SUCCESS)
+
+        self.log.info("Promoting")
+        self.ra_promote(self.HOSTA)
+        self.ra_notify_start(self.HOSTA)
+
+        while self.get_master_score(self.HOSTB) != 100:
+            self.ra_monitor(self.HOSTA, self.OCF_RUNNING_MASTER)
+            self.ra_monitor(self.HOSTB, self.OCF_SUCCESS)
+            time.sleep(1)
+
+        if do_ssh_ping:
+            self.ssh_proc = self.ssh_open()
+
+        primary = self.HOSTA
+        secondary = self.HOSTB
+
+        while True:
+            self.log.info("Secondary failover")
+            self.kill_qemu_pre(primary, hang_qemu)
+            self.ra_notify_stop(secondary)
+            self.ra_monitor(secondary, self.OCF_SUCCESS)
+            self.ra_promote(secondary)
+            self.ra_monitor(secondary, self.OCF_RUNNING_MASTER)
+            self.kill_qemu_post(primary, hang_qemu)
+            if do_ssh_ping:
+                self.ssh_ping(self.ssh_proc)
+            tmp = primary
+            primary = secondary
+            secondary = tmp
+
+            self.log.info("Secondary continue replication")
+            self.ra_start(secondary)
+            self.ra_notify_start(primary)
+            if do_ssh_ping:
+                self.ssh_ping(self.ssh_proc)
+
+            # Wait for resync
+            while self.get_master_score(secondary) != 100:
+                self.ra_monitor(primary, self.OCF_RUNNING_MASTER)
+                self.ra_monitor(secondary, self.OCF_SUCCESS)
+                time.sleep(1)
+            if do_ssh_ping:
+                self.ssh_ping(self.ssh_proc)
+
+            self.log.info("Primary failover")
+            self.kill_qemu_pre(secondary, hang_qemu)
+            self.ra_monitor(primary, self.OCF_RUNNING_MASTER)
+            self.ra_notify_stop(primary)
+            self.ra_monitor(primary, self.OCF_RUNNING_MASTER)
+            self.kill_qemu_post(secondary, hang_qemu)
+            if do_ssh_ping:
+                self.ssh_ping(self.ssh_proc)
+
+            self.log.info("Primary continue replication")
+            self.ra_start(secondary)
+            self.ra_notify_start(primary)
+            if do_ssh_ping:
+                self.ssh_ping(self.ssh_proc)
+
+            # Wait for resync
+            while self.get_master_score(secondary) != 100:
+                self.ra_monitor(primary, self.OCF_RUNNING_MASTER)
+                self.ra_monitor(secondary, self.OCF_SUCCESS)
+                time.sleep(1)
+            if do_ssh_ping:
+                self.ssh_ping(self.ssh_proc)
+
+            if not loop:
+                break
+
+        if do_ssh_ping:
+            self.ssh_close(self.ssh_proc)
+
+        self.ra_stop(self.HOSTA)
+        self.ra_stop(self.HOSTB)
+
+        self.ra_monitor(self.HOSTA, self.OCF_NOT_RUNNING)
+        self.ra_monitor(self.HOSTB, self.OCF_NOT_RUNNING)
+        self.log.info("all ok")
+
+    def test_colo_peer_crashing(self):
+        """
+        :avocado: tags=colo
+        :avocado: tags=arch:x86_64
+        """
+        self.log.info("Testing with peer qemu crashing")
+        self._test_colo()
+
+    def test_colo_peer_hanging(self):
+        """
+        :avocado: tags=colo
+        :avocado: tags=arch:x86_64
+        """
+        self.log.info("Testing with peer qemu hanging")
+        self._test_colo(hang_qemu=True)
--
2.20.1



^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 4/4] MAINTAINERS: Add myself as maintainer for COLO resource agent
  2019-11-21 17:49 [PATCH 0/4] colo: Introduce resource agent and high-level test Lukas Straub
                   ` (2 preceding siblings ...)
  2019-11-21 17:49 ` [PATCH 3/4] colo: Introduce high-level test Lukas Straub
@ 2019-11-21 17:49 ` Lukas Straub
  2019-11-22  9:46 ` [PATCH 0/4] colo: Introduce resource agent and high-level test Dr. David Alan Gilbert
  4 siblings, 0 replies; 13+ messages in thread
From: Lukas Straub @ 2019-11-21 17:49 UTC (permalink / raw)
  To: qemu-devel
  Cc: Zhang, Chen, Jason Wang, Alberto Garcia, Dr. David Alan Gilbert

While I'm not going to have much time for this, I'll still
try to test and review patches.

Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
 MAINTAINERS | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/MAINTAINERS b/MAINTAINERS
index d6de200453..aad8356149 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -2239,6 +2239,12 @@ F: net/colo*
 F: net/filter-rewriter.c
 F: net/filter-mirror.c

+COLO resource agent and testing
+M: Lukas Straub <lukasstraub2@web.de>
+S: Odd fixes
+F: scripts/colo-resource-agent/*
+F: tests/acceptance/colo.py
+
 Record/replay
 M: Pavel Dovgalyuk <pavel.dovgaluk@ispras.ru>
 R: Paolo Bonzini <pbonzini@redhat.com>
--
2.20.1


^ permalink raw reply related	[flat|nested] 13+ messages in thread

* Re: [PATCH 1/4] block/quorum.c: stable children names
  2019-11-21 17:49 ` [PATCH 1/4] block/quorum.c: stable children names Lukas Straub
@ 2019-11-21 18:04   ` Eric Blake
  2019-11-21 18:34     ` Lukas Straub
  0 siblings, 1 reply; 13+ messages in thread
From: Eric Blake @ 2019-11-21 18:04 UTC (permalink / raw)
  To: Lukas Straub, qemu-devel
  Cc: Zhang, Chen, Jason Wang, Alberto Garcia, Dr. David Alan Gilbert

On 11/21/19 11:49 AM, Lukas Straub wrote:
> If we remove the child with the highest index from the quorum,
> decrement s->next_child_index. This way we get stable children
> names as long as we only remove the last child.
> 
> Signed-off-by: Lukas Straub <lukasstraub2@web.de>
> ---
>   block/quorum.c | 6 ++++++
>   1 file changed, 6 insertions(+)
> 
> diff --git a/block/quorum.c b/block/quorum.c
> index df68adcfaa..6100d4108a 100644
> --- a/block/quorum.c
> +++ b/block/quorum.c
> @@ -1054,6 +1054,12 @@ static void quorum_del_child(BlockDriverState *bs, BdrvChild *child,
>       /* We know now that num_children > threshold, so blkverify must be false */
>       assert(!s->is_blkverify);
> 
> +    unsigned child_id;
> +    sscanf(child->name, "children.%u", &child_id);

sscanf() cannot detect overflow. Do we trust our input enough to ignore 
this shortfall in the interface, or should we be using saner interfaces 
like qemu_strtoul()?  For that matter, why do we have to reparse 
something; is it not already available somewhere in numerical form?

> +    if (child_id == s->next_child_index - 1) {
> +        s->next_child_index--;
> +    }
> +
>       bdrv_drained_begin(bs);
> 
>       /* We can safely remove this child now */
> --
> 2.20.1
> 
> 

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.           +1-919-301-3226
Virtualization:  qemu.org | libvirt.org



^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH 1/4] block/quorum.c: stable children names
  2019-11-21 18:04   ` Eric Blake
@ 2019-11-21 18:34     ` Lukas Straub
  2019-11-26 14:21       ` Alberto Garcia
  0 siblings, 1 reply; 13+ messages in thread
From: Lukas Straub @ 2019-11-21 18:34 UTC (permalink / raw)
  To: Eric Blake
  Cc: Zhang, Chen, Jason Wang, Alberto Garcia, qemu-devel,
	Dr. David Alan Gilbert

On Thu, 21 Nov 2019 12:04:58 -0600
Eric Blake <eblake@redhat.com> wrote:

> On 11/21/19 11:49 AM, Lukas Straub wrote:
> > If we remove the child with the highest index from the quorum,
> > decrement s->next_child_index. This way we get stable children
> > names as long as we only remove the last child.
> >
> > Signed-off-by: Lukas Straub <lukasstraub2@web.de>
> > ---
> >   block/quorum.c | 6 ++++++
> >   1 file changed, 6 insertions(+)
> >
> > diff --git a/block/quorum.c b/block/quorum.c
> > index df68adcfaa..6100d4108a 100644
> > --- a/block/quorum.c
> > +++ b/block/quorum.c
> > @@ -1054,6 +1054,12 @@ static void quorum_del_child(BlockDriverState *bs, BdrvChild *child,
> >       /* We know now that num_children > threshold, so blkverify must be false */
> >       assert(!s->is_blkverify);
> >
> > +    unsigned child_id;
> > +    sscanf(child->name, "children.%u", &child_id);
>
> sscanf() cannot detect overflow. Do we trust our input enough to ignore
> this shortfall in the interface, or should we be using saner interfaces
> like qemu_strtoul()?  For that matter, why do we have to reparse
> something; is it not already available somewhere in numerical form?

Hi,
Yes, I wondered about that too, but found no other way. But the input
is trusted, AFAIK the only way to add child nodes is trough quorum_add_child
above and quorum_open and there already are adequate checks there.

> > +    if (child_id == s->next_child_index - 1) {
> > +        s->next_child_index--;
> > +    }
> > +
> >       bdrv_drained_begin(bs);
> >
> >       /* We can safely remove this child now */
> > --
> > 2.20.1
> >
> >
>



^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH 0/4] colo: Introduce resource agent and high-level test
  2019-11-21 17:49 [PATCH 0/4] colo: Introduce resource agent and high-level test Lukas Straub
                   ` (3 preceding siblings ...)
  2019-11-21 17:49 ` [PATCH 4/4] MAINTAINERS: Add myself as maintainer for COLO resource agent Lukas Straub
@ 2019-11-22  9:46 ` Dr. David Alan Gilbert
  2019-11-27 21:11   ` Lukas Straub
  4 siblings, 1 reply; 13+ messages in thread
From: Dr. David Alan Gilbert @ 2019-11-22  9:46 UTC (permalink / raw)
  To: Lukas Straub; +Cc: Zhang, Chen, Jason Wang, Alberto Garcia, qemu-devel

* Lukas Straub (lukasstraub2@web.de) wrote:
> Hello Everyone,
> These patches introduce a resource agent for use with the Pacemaker CRM and a
> high-level test utilizing it for testing qemu COLO.
> 
> The resource agent manages qemu COLO including continuous replication.
> 
> Currently the second test case (where the peer qemu is frozen) fails on primary
> failover, because qemu hangs while removing the replication related block nodes.
> Note that this also happens in real world test when cutting power to the peer
> host, so this needs to be fixed.

Do you understand why that happens? Is this it's trying to finish a
read/write to the dead partner?

Dave

> Based-on: <cover.1571925699.git.lukasstraub2@web.de>
> ([PATCH v7 0/4] colo: Add support for continuous replication)
> 
> Lukas Straub (4):
>   block/quorum.c: stable children names
>   colo: Introduce resource agent
>   colo: Introduce high-level test
>   MAINTAINERS: Add myself as maintainer for COLO resource agent
> 
>  MAINTAINERS                            |    6 +
>  block/quorum.c                         |    6 +
>  scripts/colo-resource-agent/colo       | 1026 ++++++++++++++++++++++++
>  scripts/colo-resource-agent/crm_master |   44 +
>  tests/acceptance/colo.py               |  444 ++++++++++
>  5 files changed, 1526 insertions(+)
>  create mode 100755 scripts/colo-resource-agent/colo
>  create mode 100755 scripts/colo-resource-agent/crm_master
>  create mode 100644 tests/acceptance/colo.py
> 
> --
> 2.20.1
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK



^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH 1/4] block/quorum.c: stable children names
  2019-11-21 18:34     ` Lukas Straub
@ 2019-11-26 14:21       ` Alberto Garcia
  2019-11-27 21:20         ` Lukas Straub
  0 siblings, 1 reply; 13+ messages in thread
From: Alberto Garcia @ 2019-11-26 14:21 UTC (permalink / raw)
  To: Lukas Straub, Eric Blake
  Cc: Zhang, Chen, Jason Wang, qemu-devel, Dr. David Alan Gilbert

On Thu 21 Nov 2019 07:34:45 PM CET, Lukas Straub wrote:
>> > diff --git a/block/quorum.c b/block/quorum.c
>> > index df68adcfaa..6100d4108a 100644
>> > --- a/block/quorum.c
>> > +++ b/block/quorum.c
>> > @@ -1054,6 +1054,12 @@ static void quorum_del_child(BlockDriverState *bs, BdrvChild *child,
>> >       /* We know now that num_children > threshold, so blkverify must be false */
>> >       assert(!s->is_blkverify);
>> >
>> > +    unsigned child_id;
>> > +    sscanf(child->name, "children.%u", &child_id);
>>
>> sscanf() cannot detect overflow. Do we trust our input enough to
>> ignore this shortfall in the interface, or should we be using saner
>> interfaces like qemu_strtoul()?  For that matter, why do we have to
>> reparse something; is it not already available somewhere in numerical
>> form?
>
> Yes, I wondered about that too, but found no other way. But the input
> is trusted, AFAIK the only way to add child nodes is trough
> quorum_add_child above and quorum_open and there already are adequate
> checks there.

I also don't see any other way to get that value, unless we change
BDRVQuorumState to store that information (e.g. instead of children
being a list of pointers BdrvChild ** it could be a list of {pointer,
index}, or something like that).

There's another (more convoluted) alternative if we don't want to parse
child->name. Since we only want to know if the child number equals
s->next_child_index - 1, we can do it the other way around:

   snprintf(str, 32, "children.%u", s->next_child_index - 1);

and then compare str and child->name.

Berto


^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH 0/4] colo: Introduce resource agent and high-level test
  2019-11-22  9:46 ` [PATCH 0/4] colo: Introduce resource agent and high-level test Dr. David Alan Gilbert
@ 2019-11-27 21:11   ` Lukas Straub
  2019-12-18  9:27     ` Lukas Straub
  0 siblings, 1 reply; 13+ messages in thread
From: Lukas Straub @ 2019-11-27 21:11 UTC (permalink / raw)
  To: Dr. David Alan Gilbert
  Cc: Kevin Wolf, Alberto Garcia, Jason Wang, qemu-devel, Max Reitz,
	Zhang, Chen

On Fri, 22 Nov 2019 09:46:46 +0000
"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:

> * Lukas Straub (lukasstraub2@web.de) wrote:
> > Hello Everyone,
> > These patches introduce a resource agent for use with the Pacemaker CRM and a
> > high-level test utilizing it for testing qemu COLO.
> >
> > The resource agent manages qemu COLO including continuous replication.
> >
> > Currently the second test case (where the peer qemu is frozen) fails on primary
> > failover, because qemu hangs while removing the replication related block nodes.
> > Note that this also happens in real world test when cutting power to the peer
> > host, so this needs to be fixed.
>
> Do you understand why that happens? Is this it's trying to finish a
> read/write to the dead partner?
>
> Dave

I haven't looked into it too closely yet, but it's often hanging in bdrv_flush()
while removing the replication blockdev and of course thats probably because the
nbd client waits for a reply. So I tried with the workaround below, which will
actively kill the TCP connection and with it the test passes, though I haven't
tested it in real world yet.

A proper solution to this would probably be a "force" parameter for blockdev-del,
which skips all flushing and aborts all inflight io. Or we could add a timeout
to the nbd client.

Regards,
Lukas Straub

diff --git a/scripts/colo-resource-agent/colo b/scripts/colo-resource-agent/colo
index 5fd9cfc0b5..62210af2a1 100755
--- a/scripts/colo-resource-agent/colo
+++ b/scripts/colo-resource-agent/colo
@@ -935,6 +935,7 @@ def qemu_colo_notify():
            and HOSTNAME == str.strip(OCF_RESKEY_CRM_meta_notify_master_uname):
             fd = qmp_open()
             peer = qmp_get_nbd_remote(fd)
+            os.system("sudo ss -K dst %s dport = %s" % (peer, NBD_PORT))
             if peer == str.strip(OCF_RESKEY_CRM_meta_notify_stop_uname):
                 if qmp_check_resync(fd) != None:
                     qmp_cancel_resync(fd)




^ permalink raw reply related	[flat|nested] 13+ messages in thread

* Re: [PATCH 1/4] block/quorum.c: stable children names
  2019-11-26 14:21       ` Alberto Garcia
@ 2019-11-27 21:20         ` Lukas Straub
  0 siblings, 0 replies; 13+ messages in thread
From: Lukas Straub @ 2019-11-27 21:20 UTC (permalink / raw)
  To: Alberto Garcia
  Cc: Zhang, Chen, Jason Wang, qemu-devel, Dr. David Alan Gilbert

On Tue, 26 Nov 2019 15:21:37 +0100
Alberto Garcia <berto@igalia.com> wrote:

> On Thu 21 Nov 2019 07:34:45 PM CET, Lukas Straub wrote:
> >> > diff --git a/block/quorum.c b/block/quorum.c
> >> > index df68adcfaa..6100d4108a 100644
> >> > --- a/block/quorum.c
> >> > +++ b/block/quorum.c
> >> > @@ -1054,6 +1054,12 @@ static void quorum_del_child(BlockDriverState *bs, BdrvChild *child,
> >> >       /* We know now that num_children > threshold, so blkverify must be false */
> >> >       assert(!s->is_blkverify);
> >> >
> >> > +    unsigned child_id;
> >> > +    sscanf(child->name, "children.%u", &child_id);
> >>
> >> sscanf() cannot detect overflow. Do we trust our input enough to
> >> ignore this shortfall in the interface, or should we be using saner
> >> interfaces like qemu_strtoul()?  For that matter, why do we have to
> >> reparse something; is it not already available somewhere in numerical
> >> form?
> >
> > Yes, I wondered about that too, but found no other way. But the input
> > is trusted, AFAIK the only way to add child nodes is trough
> > quorum_add_child above and quorum_open and there already are adequate
> > checks there.
>
> I also don't see any other way to get that value, unless we change
> BDRVQuorumState to store that information (e.g. instead of children
> being a list of pointers BdrvChild ** it could be a list of {pointer,
> index}, or something like that).
>
> There's another (more convoluted) alternative if we don't want to parse
> child->name. Since we only want to know if the child number equals
> s->next_child_index - 1, we can do it the other way around:
>
>    snprintf(str, 32, "children.%u", s->next_child_index - 1);
>
> and then compare str and child->name.
>
> Berto

Hi,
I will do it your way, then it's also more consistent with the name
creation in quorum_add and quorum_open.

Regards,
Lukas Straub


^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH 0/4] colo: Introduce resource agent and high-level test
  2019-11-27 21:11   ` Lukas Straub
@ 2019-12-18  9:27     ` Lukas Straub
  2019-12-18 19:46       ` Dr. David Alan Gilbert
  0 siblings, 1 reply; 13+ messages in thread
From: Lukas Straub @ 2019-12-18  9:27 UTC (permalink / raw)
  To: Dr. David Alan Gilbert
  Cc: Kevin Wolf, Alberto Garcia, Jason Wang, qemu-devel, Max Reitz,
	Zhang, Chen

On Wed, 27 Nov 2019 22:11:34 +0100
Lukas Straub <lukasstraub2@web.de> wrote:

> On Fri, 22 Nov 2019 09:46:46 +0000
> "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
>
> > * Lukas Straub (lukasstraub2@web.de) wrote:
> > > Hello Everyone,
> > > These patches introduce a resource agent for use with the Pacemaker CRM and a
> > > high-level test utilizing it for testing qemu COLO.
> > >
> > > The resource agent manages qemu COLO including continuous replication.
> > >
> > > Currently the second test case (where the peer qemu is frozen) fails on primary
> > > failover, because qemu hangs while removing the replication related block nodes.
> > > Note that this also happens in real world test when cutting power to the peer
> > > host, so this needs to be fixed.
> >
> > Do you understand why that happens? Is this it's trying to finish a
> > read/write to the dead partner?
> >
> > Dave
>
> I haven't looked into it too closely yet, but it's often hanging in bdrv_flush()
> while removing the replication blockdev and of course thats probably because the
> nbd client waits for a reply. So I tried with the workaround below, which will
> actively kill the TCP connection and with it the test passes, though I haven't
> tested it in real world yet.
>

In the real cluster, sometimes qemu even hangs while connecting to qmp (after remote
poweroff). But I currently don't have the time to look into it.

Still a failing test is better than no test. Could we mark this test as known-bad and
fix this issue later? How should I mark it as known-bad? By tag? Or warn in the log?

Regards,
Lukas Straub


^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH 0/4] colo: Introduce resource agent and high-level test
  2019-12-18  9:27     ` Lukas Straub
@ 2019-12-18 19:46       ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 13+ messages in thread
From: Dr. David Alan Gilbert @ 2019-12-18 19:46 UTC (permalink / raw)
  To: Lukas Straub, thuth
  Cc: Kevin Wolf, Alberto Garcia, Jason Wang, qemu-devel, Max Reitz,
	Zhang, Chen

* Lukas Straub (lukasstraub2@web.de) wrote:
> On Wed, 27 Nov 2019 22:11:34 +0100
> Lukas Straub <lukasstraub2@web.de> wrote:
> 
> > On Fri, 22 Nov 2019 09:46:46 +0000
> > "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> >
> > > * Lukas Straub (lukasstraub2@web.de) wrote:
> > > > Hello Everyone,
> > > > These patches introduce a resource agent for use with the Pacemaker CRM and a
> > > > high-level test utilizing it for testing qemu COLO.
> > > >
> > > > The resource agent manages qemu COLO including continuous replication.
> > > >
> > > > Currently the second test case (where the peer qemu is frozen) fails on primary
> > > > failover, because qemu hangs while removing the replication related block nodes.
> > > > Note that this also happens in real world test when cutting power to the peer
> > > > host, so this needs to be fixed.
> > >
> > > Do you understand why that happens? Is this it's trying to finish a
> > > read/write to the dead partner?
> > >
> > > Dave
> >
> > I haven't looked into it too closely yet, but it's often hanging in bdrv_flush()
> > while removing the replication blockdev and of course thats probably because the
> > nbd client waits for a reply. So I tried with the workaround below, which will
> > actively kill the TCP connection and with it the test passes, though I haven't
> > tested it in real world yet.
> >
> 
> In the real cluster, sometimes qemu even hangs while connecting to qmp (after remote
> poweroff). But I currently don't have the time to look into it.

That doesn't surprise me too much; QMP is mostly handled in the main
thread, as are a lot of other things; hanging in COLO has been my
assumption for a while because of that.  However, there's a way to fix
it.

A while ago, Peter Xu added a feature called 'out of band' to QMP; you
can open a QMP connection, set the OOB feature, and then commands that
are marked as OOB are executed off the main thread on  that connection.

At the moment we've just got the one real OOB command, 'migrate-recover'
which is used for recovering postcopy from a similar failure to the COLO
case.

To fix this you'd have to convert colo-lost-heartbeat to be an OOB
command; note it's not that trivial, because you have to make sure the
code that's run as part of the OOB command doesn't take any locks that
could block on something in the main thread; so it can set flags, start
new threads, perhaps call shutdown() on a socket; but it takes some
thinking about.


> Still a failing test is better than no test. Could we mark this test as known-bad and
> fix this issue later? How should I mark it as known-bad? By tag? Or warn in the log?

Not sure of that; cc'ing Maybe thuth knows?

Dave

> Regards,
> Lukas Straub
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK



^ permalink raw reply	[flat|nested] 13+ messages in thread

end of thread, other threads:[~2019-12-18 19:47 UTC | newest]

Thread overview: 13+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-11-21 17:49 [PATCH 0/4] colo: Introduce resource agent and high-level test Lukas Straub
2019-11-21 17:49 ` [PATCH 1/4] block/quorum.c: stable children names Lukas Straub
2019-11-21 18:04   ` Eric Blake
2019-11-21 18:34     ` Lukas Straub
2019-11-26 14:21       ` Alberto Garcia
2019-11-27 21:20         ` Lukas Straub
2019-11-21 17:49 ` [PATCH 2/4] colo: Introduce resource agent Lukas Straub
2019-11-21 17:49 ` [PATCH 3/4] colo: Introduce high-level test Lukas Straub
2019-11-21 17:49 ` [PATCH 4/4] MAINTAINERS: Add myself as maintainer for COLO resource agent Lukas Straub
2019-11-22  9:46 ` [PATCH 0/4] colo: Introduce resource agent and high-level test Dr. David Alan Gilbert
2019-11-27 21:11   ` Lukas Straub
2019-12-18  9:27     ` Lukas Straub
2019-12-18 19:46       ` Dr. David Alan Gilbert

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).