All of lore.kernel.org
 help / color / mirror / Atom feed
From: Lukas Straub <lukasstraub2@web.de>
To: qemu-devel <qemu-devel@nongnu.org>
Cc: "Zhang, Chen" <chen.zhang@intel.com>,
	Jason Wang <jasowang@redhat.com>,
	Alberto Garcia <berto@igalia.com>,
	"Dr. David Alan Gilbert" <dgilbert@redhat.com>
Subject: [PATCH 2/4] colo: Introduce resource agent
Date: Thu, 21 Nov 2019 18:49:33 +0100	[thread overview]
Message-ID: <4b51cd192df72d880841794199a10d7a834688d4.1574356137.git.lukasstraub2@web.de> (raw)
In-Reply-To: <cover.1574356137.git.lukasstraub2@web.de>

Introduce a resource agent which can be used in a
Pacemaker cluster to manage qemu COLO.

Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
 scripts/colo-resource-agent/colo | 1026 ++++++++++++++++++++++++++++++
 1 file changed, 1026 insertions(+)
 create mode 100755 scripts/colo-resource-agent/colo

diff --git a/scripts/colo-resource-agent/colo b/scripts/colo-resource-agent/colo
new file mode 100755
index 0000000000..5fd9cfc0b5
--- /dev/null
+++ b/scripts/colo-resource-agent/colo
@@ -0,0 +1,1026 @@
+#!/usr/bin/env python
+
+# Resource agent for qemu COLO for use with Pacemaker CRM
+#
+# Copyright (c) Lukas Straub <lukasstraub2@web.de>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later.  See the COPYING file in the top-level directory.
+
+from __future__ import print_function
+import subprocess
+import sys
+import os
+import os.path
+import signal
+import socket
+import json
+import re
+import time
+import logging
+import logging.handlers
+
+# Constants
+OCF_SUCCESS = 0
+OCF_ERR_GENERIC = 1
+OCF_ERR_ARGS = 2
+OCF_ERR_UNIMPLEMENTED = 3
+OCF_ERR_PERM = 4
+OCF_ERR_INSTALLED = 5
+OCF_ERR_CONFIGURED = 6
+OCF_NOT_RUNNING = 7
+OCF_RUNNING_MASTER = 8
+OCF_FAILED_MASTER = 9
+
+# Get environment variables
+OCF_RESKEY_CRM_meta_notify_type \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_type")
+OCF_RESKEY_CRM_meta_notify_operation \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_operation")
+OCF_RESKEY_CRM_meta_notify_start_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_start_uname", "")
+OCF_RESKEY_CRM_meta_notify_stop_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_stop_uname", "")
+OCF_RESKEY_CRM_meta_notify_active_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_active_uname", "")
+OCF_RESKEY_CRM_meta_notify_promote_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_promote_uname", "")
+OCF_RESKEY_CRM_meta_notify_demote_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_demote_uname", "")
+OCF_RESKEY_CRM_meta_notify_master_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_master_uname", "")
+OCF_RESKEY_CRM_meta_notify_slave_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_slave_uname", "")
+
+HA_RSCTMP = os.getenv("HA_RSCTMP", "/run/resource-agents")
+HA_LOGFACILITY = os.getenv("HA_LOGFACILITY")
+HA_LOGFILE = os.getenv("HA_LOGFILE")
+HA_DEBUGLOG = os.getenv("HA_DEBUGLOG")
+OCF_RESOURCE_INSTANCE = os.getenv("OCF_RESOURCE_INSTANCE", "default-instance")
+OCF_RESKEY_CRM_meta_timeout \
+    = int(os.getenv("OCF_RESKEY_CRM_meta_timeout", "60000"))
+OCF_RESKEY_CRM_meta_interval \
+    = int(os.getenv("OCF_RESKEY_CRM_meta_interval", "1"))
+OCF_RESKEY_CRM_meta_clone_max \
+    = int(os.getenv("OCF_RESKEY_CRM_meta_clone_max", "1"))
+OCF_RESKEY_CRM_meta_clone_node_max \
+    = int(os.getenv("OCF_RESKEY_CRM_meta_clone_node_max", "1"))
+OCF_RESKEY_CRM_meta_master_max \
+    = int(os.getenv("OCF_RESKEY_CRM_meta_master_max", "1"))
+OCF_RESKEY_CRM_meta_master_node_max \
+    = int(os.getenv("OCF_RESKEY_CRM_meta_master_node_max", "1"))
+OCF_RESKEY_CRM_meta_notify \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify")
+OCF_RESKEY_CRM_meta_globally_unique \
+    = os.getenv("OCF_RESKEY_CRM_meta_globally_unique")
+
+HOSTNAME = os.getenv("OCF_RESKEY_CRM_meta_on_node", socket.gethostname())
+
+OCF_ACTION = os.getenv("__OCF_ACTION")
+if not OCF_ACTION and len(sys.argv) == 2:
+    OCF_ACTION = sys.argv[1]
+
+# Resource parameters
+OCF_RESKEY_binary_default = "qemu-system-x86_64"
+OCF_RESKEY_log_dir_default = HA_RSCTMP
+OCF_RESKEY_options_default = ""
+OCF_RESKEY_disk_size_default = ""
+OCF_RESKEY_active_hidden_dir_default = ""
+OCF_RESKEY_listen_address_default = "0.0.0.0"
+OCF_RESKEY_base_port_default = "9000"
+OCF_RESKEY_checkpoint_interval_default = "20000"
+OCF_RESKEY_debug_default = "false"
+
+OCF_RESKEY_binary = os.getenv("OCF_RESKEY_binary", OCF_RESKEY_binary_default)
+OCF_RESKEY_log_dir = os.getenv("OCF_RESKEY_log_dir", OCF_RESKEY_log_dir_default)
+OCF_RESKEY_options = os.getenv("OCF_RESKEY_options", OCF_RESKEY_options_default)
+OCF_RESKEY_disk_size \
+    = os.getenv("OCF_RESKEY_disk_size", OCF_RESKEY_disk_size_default)
+OCF_RESKEY_active_hidden_dir \
+    = os.getenv("OCF_RESKEY_active_hidden_dir", \
+                OCF_RESKEY_active_hidden_dir_default)
+OCF_RESKEY_listen_address \
+    = os.getenv("OCF_RESKEY_listen_address", OCF_RESKEY_listen_address_default)
+OCF_RESKEY_base_port \
+    = os.getenv("OCF_RESKEY_base_port", OCF_RESKEY_base_port_default)
+OCF_RESKEY_checkpoint_interval \
+    = os.getenv("OCF_RESKEY_checkpoint_interval", \
+                OCF_RESKEY_checkpoint_interval_default)
+OCF_RESKEY_debug = os.getenv("OCF_RESKEY_debug", OCF_RESKEY_debug_default)
+
+ACTIVE_IMAGE = os.path.join(OCF_RESKEY_active_hidden_dir, \
+                            OCF_RESOURCE_INSTANCE + "-active.qcow2")
+HIDDEN_IMAGE = os.path.join(OCF_RESKEY_active_hidden_dir, \
+                            OCF_RESOURCE_INSTANCE + "-hidden.qcow2")
+
+QMP_SOCK = os.path.join(HA_RSCTMP, OCF_RESOURCE_INSTANCE + "-qmp.sock")
+COMP_SOCK = os.path.join(HA_RSCTMP, OCF_RESOURCE_INSTANCE + "-compare.sock")
+COMP_OUT_SOCK = os.path.join(HA_RSCTMP, OCF_RESOURCE_INSTANCE \
+                                        + "-comp_out.sock")
+
+PID_FILE = os.path.join(HA_RSCTMP, OCF_RESOURCE_INSTANCE + "-qemu.pid")
+
+QMP_LOG = os.path.join(OCF_RESKEY_log_dir, OCF_RESOURCE_INSTANCE + "-qmp.log")
+QEMU_LOG = os.path.join(OCF_RESKEY_log_dir, OCF_RESOURCE_INSTANCE + "-qemu.log")
+
+# Exception only raised by ourself
+class Error(Exception):
+    pass
+
+def setup_constants():
+    # This function is called after the parameters where validated
+    global MIGRATE_PORT, MIROR_PORT, COMPARE_IN_PORT, NBD_PORT
+    MIGRATE_PORT = int(OCF_RESKEY_base_port)
+    MIROR_PORT = int(OCF_RESKEY_base_port) + 1
+    COMPARE_IN_PORT = int(OCF_RESKEY_base_port) + 2
+    NBD_PORT = int(OCF_RESKEY_base_port) + 3
+
+    global QEMU_PRIMARY_CMDLINE
+    QEMU_PRIMARY_CMDLINE = ("'%(OCF_RESKEY_binary)s' %(OCF_RESKEY_options)s"
+        " -chardev socket,id=comp0,path='%(COMP_SOCK)s',server,nowait"
+        " -chardev socket,id=comp0-0,path='%(COMP_SOCK)s'"
+        " -chardev socket,id=comp_out,path='%(COMP_OUT_SOCK)s',server,nowait"
+        " -chardev socket,id=comp_out0,path='%(COMP_OUT_SOCK)s'"
+        " -drive if=none,node-name=colo-disk0,driver=quorum,read-pattern=fifo,"
+        "vote-threshold=1,children.0=parent0"
+        " -qmp unix:'%(QMP_SOCK)s',server,nowait"
+        " -daemonize -D '%(QEMU_LOG)s' -pidfile '%(PID_FILE)s'") % globals()
+
+    global QEMU_SECONDARY_CMDLINE
+    QEMU_SECONDARY_CMDLINE = ("'%(OCF_RESKEY_binary)s' %(OCF_RESKEY_options)s"
+        " -chardev socket,id=red0,host='%(OCF_RESKEY_listen_address)s',"
+        "port=%(MIROR_PORT)s,server,nowait"
+        " -chardev socket,id=red1,host='%(OCF_RESKEY_listen_address)s',"
+        "port=%(COMPARE_IN_PORT)s,server,nowait"
+        " -object filter-redirector,id=f1,netdev=hn0,queue=tx,indev=red0"
+        " -object filter-redirector,id=f2,netdev=hn0,queue=rx,outdev=red1"
+        " -object filter-rewriter,id=rew0,netdev=hn0,queue=all"
+        " -drive if=none,node-name=childs0,top-id=colo-disk0,"
+        "driver=replication,mode=secondary,file.driver=qcow2,"
+        "file.file.filename='%(ACTIVE_IMAGE)s',file.backing.driver=qcow2,"
+        "file.backing.file.filename='%(HIDDEN_IMAGE)s',"
+        "file.backing.backing=parent0"
+        " -drive if=none,node-name=colo-disk0,driver=quorum,read-pattern=fifo,"
+        "vote-threshold=1,children.0=childs0"
+        " -incoming tcp:'%(OCF_RESKEY_listen_address)s':%(MIGRATE_PORT)s"
+        " -qmp unix:'%(QMP_SOCK)s',server,nowait"
+        " -daemonize -D '%(QEMU_LOG)s' -pidfile '%(PID_FILE)s'") % globals()
+
+def qemu_colo_meta_data():
+    print("""\
+<?xml version="1.0"?>
+<!DOCTYPE resource-agent SYSTEM "ra-api-1.dtd">
+<resource-agent name="colo">
+
+    <version>0.1</version>
+    <longdesc lang="en">
+Resource agent for qemu COLO. (https://wiki.qemu.org/Features/COLO)
+
+After defining the master/slave instance, the master score has to be
+manually set to show which node has up-to-date data. So you copy your
+image to one host (and create empty images the other host(s)) and then
+run "crm_master -r name_of_your_primitive -v 10" on that host.
+
+Also, you have to set 'notify=true' in the metadata attributes when
+defining the master/slave instance.
+    </longdesc>
+    <shortdesc lang="en">Qemu COLO</shortdesc>
+
+    <parameters>
+
+    <parameter name="binary" unique="0" required="0">
+        <longdesc lang="en">Qemu binary to use</longdesc>
+        <shortdesc lang="en">Qemu binary</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_binary_default + """\"/>
+    </parameter>
+
+    <parameter name="log_dir" unique="0" required="0">
+        <longdesc lang="en">Directory to place logs in</longdesc>
+        <shortdesc lang="en">Log directory</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_log_dir_default + """\"/>
+    </parameter>
+
+    <parameter name="options" unique="0" required="1">
+        <longdesc lang="en">
+Options to pass to qemu. These will be passed alongside COLO specific
+options, so you need to follow these conventions: The netdev should have
+id=hn0 and the disk controller drive=colo-disk0. The image node should
+have id/node-name=parent0, but should not be connected to the guest.
+Example:
+-vnc :0 -enable-kvm -cpu qemu64,+kvmclock -m 512 -netdev bridge,id=hn0
+-device virtio-net,netdev=hn0 -device virtio-blk,drive=colo-disk0
+-drive if=none,id=parent0,format=qcow2,file=/mnt/vms/vm01.qcow2
+        </longdesc>
+        <shortdesc lang="en">Options to pass to qemu.</shortdesc>
+    </parameter>
+
+    <parameter name="disk_size" unique="0" required="1">
+        <longdesc lang="en">Disk size of the image</longdesc>
+        <shortdesc lang="en">Disk size of the image</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_disk_size_default + """\"/>
+    </parameter>
+
+    <parameter name="active_hidden_dir" unique="0" required="1">
+        <longdesc lang="en">
+Directory where the active and hidden images will be stored. It is
+recommended to put this on a ramdisk.
+        </longdesc>
+        <shortdesc lang="en">Path to active and hidden images</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_active_hidden_dir_default + """\"/>
+    </parameter>
+
+    <parameter name="listen_address" unique="0" required="0">
+        <longdesc lang="en">Address to listen on.</longdesc>
+        <shortdesc lang="en">Listen address</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_listen_address_default + """\"/>
+    </parameter>
+
+    <parameter name="base_port" unique="1" required="0">
+        <longdesc lang="en">
+4 tcp ports that are unique for each instance. (base_port to base_port + 3)
+        </longdesc>
+        <shortdesc lang="en">Ports to use</shortdesc>
+        <content type="integer" default=\"""" \
+            + OCF_RESKEY_base_port_default + """\"/>
+    </parameter>
+
+    <parameter name="checkpoint_interval" unique="0" required="0">
+        <longdesc lang="en">
+Interval for regular checkpoints in milliseconds.
+        </longdesc>
+        <shortdesc lang="en">Interval for regular checkpoints</shortdesc>
+        <content type="integer" default=\"""" \
+            + OCF_RESKEY_checkpoint_interval_default + """\"/>
+    </parameter>
+
+    <parameter name="debug" unique="0" required="0">
+        <longdesc lang="en">Enable debuging</longdesc>
+        <shortdesc lang="en">Enable debuging</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_debug_default + """\"/>
+    </parameter>
+
+    </parameters>
+
+    <actions>
+        <action name="start"        timeout="30s" />
+        <action name="stop"         timeout="20s" />
+        <action name="monitor"      timeout="40s" \
+            interval="12s" depth="0" />
+        <action name="monitor"      timeout="40s" \
+            interval="10s" depth="0" role="Slave" />
+        <action name="monitor"      timeout="40s" \
+            interval="11s" depth="0" role="Master" />
+        <action name="notify"       timeout="120s" />
+        <action name="promote"      timeout="120s" />
+        <action name="demote"       timeout="120s" />
+        <action name="meta-data"    timeout="5s" />
+        <action name="validate-all" timeout="20s" />
+    </actions>
+
+</resource-agent>
+""")
+
+def logs_open():
+    global log
+    log = logging.getLogger(OCF_RESOURCE_INSTANCE)
+    log.setLevel(logging.DEBUG)
+    formater = logging.Formatter("(%(name)s) %(levelname)s: %(message)s")
+
+    if sys.stdout.isatty():
+        handler = logging.StreamHandler(stream=sys.stderr)
+        handler.setFormatter(formater)
+        log.addHandler(handler)
+
+    if HA_LOGFACILITY:
+        handler = logging.handlers.SysLogHandler("/dev/log")
+        handler.setFormatter(formater)
+        log.addHandler(handler)
+
+    if HA_LOGFILE:
+        handler = logging.FileHandler(HA_LOGFILE)
+        handler.setFormatter(formater)
+        log.addHandler(handler)
+
+    if HA_DEBUGLOG and HA_DEBUGLOG != HA_LOGFILE:
+        handler = logging.FileHandler(HA_DEBUGLOG)
+        handler.setFormatter(formater)
+        log.addHandler(handler)
+
+    global qmp_log
+    qmp_log = logging.getLogger("qmp_log")
+    qmp_log.setLevel(logging.DEBUG)
+    formater = logging.Formatter("%(message)s")
+
+    if is_true(OCF_RESKEY_debug):
+        handler = logging.handlers.WatchedFileHandler(QMP_LOG)
+        handler.setFormatter(formater)
+        qmp_log.addHandler(handler)
+    else:
+        handler = logging.NullHandler()
+        qmp_log.addHandler(handler)
+
+def rotate_logfile(logfile, numlogs):
+    numlogs -= 1
+    for n in range(numlogs, -1, -1):
+        file = logfile
+        if n != 0:
+            file = "%s.%s" % (file, n)
+        if os.path.exists(file):
+            if n == numlogs:
+                os.remove(file)
+            else:
+                newname = "%s.%s" % (logfile, n + 1)
+                os.rename(file, newname)
+
+def is_writable(file):
+    return os.access(file, os.W_OK)
+
+def is_executable_file(file):
+    return os.path.isfile(file) and os.access(file, os.X_OK)
+
+def is_true(var):
+    return re.match("yes|true|1|YES|TRUE|True|ja|on|ON", str(var)) != None
+
+# Check if the binary exists and is executable
+def check_binary(binary):
+    if is_executable_file(binary):
+        return True
+    PATH = os.getenv("PATH", os.defpath)
+    for dir in PATH.split(os.pathsep):
+        if is_executable_file(os.path.join(dir, binary)):
+            return True
+    log.error("binary \"%s\" doesn't exist or not executable" % binary)
+    return False
+
+def run_command(commandline):
+    proc = subprocess.Popen(commandline, shell=True, stdout=subprocess.PIPE,\
+                          stderr=subprocess.STDOUT, universal_newlines=True)
+    stdout, stderr = proc.communicate()
+    if proc.returncode != 0:
+        log.error("command \"%s\" failed with code %s:\n%s" \
+                    % (commandline, proc.returncode, stdout))
+        raise Error()
+
+# Functions for setting and getting the master score to tell Pacemaker which
+# host has the most recent data
+def set_master_score(score):
+    if score <= 0:
+        os.system("crm_master -q -l forever -D")
+    else:
+        os.system("crm_master -q -l forever -v %s" % score)
+
+def set_remote_master_score(remote, score):
+    if score <= 0:
+        os.system("crm_master -q -l forever -N '%s' -D" % remote)
+    else:
+        os.system("crm_master -q -l forever -N '%s' -v %s" % (remote, score))
+
+def get_master_score():
+    proc = subprocess.Popen("crm_master -q -G", shell=True, \
+                            stdout=subprocess.PIPE, universal_newlines=True)
+    stdout, stderr = proc.communicate()
+    if proc.returncode != 0:
+        return 0
+    else:
+        return int(str.strip(stdout))
+
+def get_remote_master_score(remote):
+    proc = subprocess.Popen("crm_master -q -N '%s' -G" % remote, shell=True, \
+                            stdout=subprocess.PIPE, universal_newlines=True)
+    stdout, stderr = proc.communicate()
+    if proc.returncode != 0:
+        return 0
+    else:
+        return int(str.strip(stdout))
+
+def recv_line(fd):
+    line = ""
+    while True:
+        tmp = fd.recv(1)
+        if tmp == "\n":
+            break
+        line += tmp
+    return line
+
+# Filter out events
+def read_answer(fd):
+    while True:
+        line = recv_line(fd)
+        qmp_log.debug(line)
+
+        answer = json.loads(line)
+        # Ignore everything else
+        if "return" in answer or "error" in answer:
+            break
+    return answer
+
+# Execute one or more qmp commands
+def qmp_execute(fd, commands, ignore_error = False):
+    for command in commands:
+        if not command:
+            continue
+
+        try:
+            to_send = json.dumps(command)
+            fd.sendall(to_send + "\n")
+            qmp_log.debug(to_send)
+
+            answer = read_answer(fd)
+        except Exception as e:
+            log.error("while executing qmp command: %s\n%s" \
+                        % (json.dumps(command), e))
+            raise Error()
+
+        if not ignore_error and ("error" in answer):
+            log.error("qmp command returned error:\n%s\n%s" \
+                        % (json.dumps(command), json.dumps(answer)))
+            raise Error()
+
+    return answer
+
+# Open qemu qmp connection
+def qmp_open(fail_fast = False):
+    # Timeout for commands = our timeout minus 10s
+    timeout = max(1, (OCF_RESKEY_CRM_meta_timeout/1000)-10)
+
+    try:
+        fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        if fail_fast:
+            fd.settimeout(min(10, (OCF_RESKEY_CRM_meta_timeout/1000)))
+        else:
+            fd.settimeout(timeout)
+        fd.connect(QMP_SOCK)
+
+        answer = qmp_execute(fd, [{"execute": "qmp_capabilities"}], True)
+    except Exception as e:
+        log.error("while connecting to qmp socket: %s" % e)
+        raise Error()
+
+    if "error" in answer:
+        log.error("while connecting to qmp socket: %s" % json.dumps(answer))
+        raise Error()
+
+    fd.settimeout(timeout)
+    return fd
+
+# Check qemu health and colo role
+def qmp_check_state(fd):
+    answer = qmp_execute(fd, [{"execute": "query-status"}])
+    vm_status = answer["return"]
+
+    answer = qmp_execute(fd, [{"execute": "query-colo-status"}])
+    colo_status = answer["return"]
+
+    if vm_status["status"] == "inmigrate":
+        role = OCF_SUCCESS
+        replication = OCF_NOT_RUNNING
+
+    elif (vm_status["status"] == "running" \
+          or vm_status["status"] == "colo" \
+          or vm_status["status"] == "finish-migrate") \
+         and colo_status["mode"] == "none" \
+         and (colo_status["reason"] == "request" \
+              or colo_status["reason"] == "none"):
+        role = OCF_RUNNING_MASTER
+        replication = OCF_NOT_RUNNING
+
+    elif (vm_status["status"] == "running" \
+          or vm_status["status"] == "colo" \
+          or vm_status["status"] == "finish-migrate") \
+         and colo_status["mode"] == "secondary":
+        role = OCF_SUCCESS
+        replication = OCF_SUCCESS
+
+    elif (vm_status["status"] == "running" \
+          or vm_status["status"] == "colo" \
+          or vm_status["status"] == "finish-migrate") \
+         and colo_status["mode"] == "primary":
+        role = OCF_RUNNING_MASTER
+        replication = OCF_SUCCESS
+
+    else:
+        log.error("unknown qemu status: vm status: %s colo mode: %s" \
+                    % (vm_status["status"], colo_status["mode"]))
+        role = OCF_ERR_GENERIC
+        replication = OCF_ERR_GENERIC
+
+    return role, replication
+
+# Get the host of the nbd node
+def qmp_get_nbd_remote(fd):
+    block_nodes = qmp_execute(fd, [{"execute": "query-named-block-nodes"}])
+    for node in block_nodes["return"]:
+        if node["node-name"] == "nbd0":
+            url = str(node["image"]["filename"])
+            return str.split(url, "//")[1].split("/")[0].split(":")[0]
+    return None
+
+# Check if we are currently resyncing
+def qmp_check_resync(fd):
+    answer = qmp_execute(fd, [{"execute": "query-block-jobs"}])
+    for job in answer["return"]:
+        if job["device"] == "resync":
+            return job
+    return None
+
+def qmp_start_resync(fd, remote):
+    qmp_execute(fd, [
+        {"execute": "blockdev-add", "arguments": {"driver": "nbd", "node-name": "nbd0", "server": {"type": "inet", "host": str(remote), "port": str(NBD_PORT)}, "export": "parent0"}},
+        {"execute": "blockdev-mirror", "arguments": {"device": "colo-disk0", "job-id": "resync", "target": "nbd0", "sync": "full", "auto-dismiss": False}}
+        ])
+
+def qmp_cancel_resync(fd):
+    if qmp_check_resync(fd)["status"] != "concluded":
+        qmp_execute(fd, [{"execute": "block-job-cancel", "arguments": {"device": "resync", "force": True}}], True)
+        # Wait for the block-job to finish
+        while qmp_check_resync(fd)["status"] != "concluded":
+            time.sleep(1)
+
+    qmp_execute(fd, [
+        {"execute": "block-job-dismiss", "arguments": {"id": "resync"}},
+        {"execute": "blockdev-del", "arguments": {"node-name": "nbd0"}}
+        ])
+
+def qmp_start_colo(fd, remote):
+    # Check if we have a filter-rewriter
+    answer = qmp_execute(fd, [{"execute": "qom-list", "arguments": {"path": "/objects/rew0"}}], True)
+    if "error" in answer:
+        if answer["error"]["class"] == "DeviceNotFound":
+            have_filter_rewriter = False
+        else:
+            log.error("while checking for filter-rewriter:\n%s" \
+                        % json.dumps(answer))
+            raise Error()
+    else:
+        have_filter_rewriter = True
+
+    # Pause VM and cancel resync
+    qmp_execute(fd, [
+        {"execute": "stop"},
+        {"execute": "block-job-cancel", "arguments": {"device": "resync"}}
+        ])
+
+    # Wait for the block-job to finish
+    while qmp_check_resync(fd)["status"] != "concluded":
+        time.sleep(1)
+
+    # Add the replication node
+    qmp_execute(fd, [
+        {"execute": "block-job-dismiss", "arguments": {"id": "resync"}},
+        {"execute": "blockdev-add", "arguments": {"driver": "replication", "node-name": "replication0", "mode": "primary", "file": "nbd0"}},
+        {"execute": "x-blockdev-change", "arguments": {"parent": "colo-disk0", "node": "replication0"}}
+        ])
+
+    # Connect mirror and compare_in to secondary
+    qmp_execute(fd, [
+        {"execute": "chardev-add", "arguments": {"id": "mirror0", "backend": {"type": "socket", "data": {"addr": {"type": "inet", "data": {"host": str(remote), "port": str(MIROR_PORT)}}, "server": False, "reconnect": 1}}}},
+        {"execute": "chardev-add", "arguments": {"id": "compare1", "backend": {"type": "socket", "data": {"addr": {"type": "inet", "data": {"host": str(remote), "port": str(COMPARE_IN_PORT)}}, "server": False, "reconnect": 1}}}}
+        ])
+
+    # Add the COLO filters
+    if have_filter_rewriter:
+        qmp_execute(fd, [
+            {"execute": "object-add", "arguments": {"qom-type": "filter-mirror", "id": "m0", "props": {"insert": "before", "position": "id=rew0", "netdev": "hn0", "queue": "tx", "outdev": "mirror0"}}},
+            {"execute": "object-add", "arguments": {"qom-type": "filter-redirector", "id": "redire0", "props": {"insert": "before", "position": "id=rew0", "netdev": "hn0", "queue": "rx", "indev": "comp_out"}}},
+            {"execute": "object-add", "arguments": {"qom-type": "filter-redirector", "id": "redire1", "props": {"insert": "before", "position": "id=rew0", "netdev": "hn0", "queue": "rx", "outdev": "comp0"}}},
+            {"execute": "object-add", "arguments": {"qom-type": "iothread", "id": "iothread1"}},
+            {"execute": "object-add", "arguments": {"qom-type": "colo-compare", "id": "comp0", "props": {"primary_in": "comp0-0", "secondary_in": "compare1", "outdev": "comp_out0", "iothread": "iothread1"}}}
+            ])
+    else:
+        qmp_execute(fd, [
+            {"execute": "object-add", "arguments": {"qom-type": "filter-mirror", "id": "m0", "props": {"netdev": "hn0", "queue": "tx", "outdev": "mirror0"}}},
+            {"execute": "object-add", "arguments": {"qom-type": "filter-redirector", "id": "redire0", "props": {"netdev": "hn0", "queue": "rx", "indev": "comp_out"}}},
+            {"execute": "object-add", "arguments": {"qom-type": "filter-redirector", "id": "redire1", "props": {"netdev": "hn0", "queue": "rx", "outdev": "comp0"}}},
+            {"execute": "object-add", "arguments": {"qom-type": "iothread", "id": "iothread1"}},
+            {"execute": "object-add", "arguments": {"qom-type": "colo-compare", "id": "comp0", "props": {"primary_in": "comp0-0", "secondary_in": "compare1", "outdev": "comp_out0", "iothread": "iothread1"}}}
+            ])
+
+    # Start COLO
+    qmp_execute(fd, [
+        {"execute": "migrate-set-capabilities", "arguments": {"capabilities": [{"capability": "x-colo", "state": True }] }},
+        {"execute": "migrate-set-parameters" , "arguments": {"x-checkpoint-delay": int(OCF_RESKEY_checkpoint_interval)}},
+        {"execute": "migrate", "arguments": {"uri": "tcp:%s:%s" % (remote, MIGRATE_PORT)}}
+        ])
+
+    # Wait for COLO to start
+    while qmp_execute(fd, [{"execute": "query-status"}])["return"]["status"] \
+            == "paused":
+        time.sleep(1)
+
+def qmp_primary_failover(fd):
+    qmp_execute(fd, [
+        {"execute": "x-blockdev-change", "arguments": {"parent": "colo-disk0", "child": "children.1"}},
+        {"execute": "blockdev-del", "arguments": {"node-name": "replication0"}},
+        {"execute": "blockdev-del", "arguments": {"node-name": "nbd0"}},
+        {"execute": "object-del", "arguments": {"id": "comp0"}},
+        {"execute": "object-del", "arguments": {"id": "iothread1"}},
+        {"execute": "object-del", "arguments": {"id": "m0"}},
+        {"execute": "object-del", "arguments": {"id": "redire0"}},
+        {"execute": "object-del", "arguments": {"id": "redire1"}},
+        {"execute": "chardev-remove", "arguments": {"id": "mirror0"}},
+        {"execute": "chardev-remove", "arguments": {"id": "compare1"}},
+        {"execute": "x-colo-lost-heartbeat"}
+        ])
+
+def qmp_secondary_failover(fd):
+    # Stop the NBD server, and resume
+    qmp_execute(fd, [
+        {"execute": "nbd-server-stop"},
+        {"execute": "x-colo-lost-heartbeat"}
+        ])
+
+    # Prepare for continuing replication when we have a new secondary
+    qmp_execute(fd, [
+        {"execute": "object-del", "arguments": {"id": "f2"}},
+        {"execute": "object-del", "arguments": {"id": "f1"}},
+        {"execute": "chardev-remove", "arguments": {"id": "red1"}},
+        {"execute": "chardev-remove", "arguments": {"id": "red0"}},
+        {"execute": "chardev-add", "arguments": {"id": "comp0", "backend": {"type": "socket", "data": {"addr": {"type": "unix", "data": {"path": str(COMP_SOCK)}}, "server": True}}}},
+        {"execute": "chardev-add", "arguments": {"id": "comp0-0", "backend": {"type": "socket", "data": {"addr": {"type": "unix", "data": {"path": str(COMP_SOCK)}}, "server": False}}}},
+        {"execute": "chardev-add", "arguments": {"id": "comp_out", "backend": {"type": "socket", "data": {"addr": {"type": "unix", "data": {"path": str(COMP_OUT_SOCK)}}, "server": True}}}},
+        {"execute": "chardev-add", "arguments": {"id": "comp_out0", "backend": {"type": "socket", "data": {"addr": {"type": "unix", "data": {"path": str(COMP_OUT_SOCK)}}, "server": False}}}}
+        ])
+
+# Sanity checks: check parameters, files, binaries, etc.
+def qemu_colo_validate_all():
+    # Check resource parameters
+    if not str.isdigit(OCF_RESKEY_base_port):
+        log.error("base_port needs to be a number")
+        return OCF_ERR_CONFIGURED
+
+    if not str.isdigit(OCF_RESKEY_checkpoint_interval):
+        log.error("checkpoint_interval needs to be a number")
+        return OCF_ERR_CONFIGURED
+
+    if not OCF_RESKEY_active_hidden_dir:
+        log.error("active_hidden_dir needs to be specified")
+        return OCF_ERR_CONFIGURED
+
+    if not OCF_RESKEY_disk_size:
+        log.error("disk_size needs to be specified")
+        return OCF_ERR_CONFIGURED
+
+    # Check resource meta configuration
+    if OCF_ACTION != "stop":
+        if OCF_RESKEY_CRM_meta_master_max != 1:
+            log.error("only one master allowed")
+            return OCF_ERR_CONFIGURED
+
+        if OCF_RESKEY_CRM_meta_clone_max > 2:
+            log.error("maximum 2 clones allowed")
+            return OCF_ERR_CONFIGURED
+
+        if OCF_RESKEY_CRM_meta_master_node_max != 1:
+            log.error("only one master per node allowed")
+            return OCF_ERR_CONFIGURED
+
+        if OCF_RESKEY_CRM_meta_clone_node_max != 1:
+            log.error("only one clone per node allowed")
+            return OCF_ERR_CONFIGURED
+
+    # Check if notify is enabled
+    if OCF_ACTION != "stop" and OCF_ACTION != "monitor":
+        if not is_true(OCF_RESKEY_CRM_meta_notify) \
+           and not OCF_RESKEY_CRM_meta_notify_start_uname:
+            log.error("notify needs to be enabled")
+            return OCF_ERR_CONFIGURED
+
+    # Check that globally-unique is disabled
+    if is_true(OCF_RESKEY_CRM_meta_globally_unique):
+        log.error("globally-unique needs to be disabled")
+        return OCF_ERR_CONFIGURED
+
+    # Check binaries
+    if not check_binary(OCF_RESKEY_binary):
+        return OCF_ERR_INSTALLED
+
+    if not check_binary("qemu-img"):
+        return OCF_ERR_INSTALLED
+
+    # Check paths and files
+    if not is_writable(OCF_RESKEY_active_hidden_dir) \
+        or not os.path.isdir(OCF_RESKEY_active_hidden_dir):
+        log.error("active and hidden image directory missing or not writable")
+        return OCF_ERR_PERM
+
+    return OCF_SUCCESS
+
+# Check if qemu is running
+def check_pid():
+    if not os.path.exists(PID_FILE):
+        return OCF_NOT_RUNNING, None
+
+    fd = open(PID_FILE, "r")
+    pid = int(str.strip(fd.readline()))
+    fd.close()
+    try:
+        os.kill(pid, 0)
+    except OSError:
+        log.info("qemu is not running")
+        return OCF_NOT_RUNNING, pid
+    else:
+        return OCF_SUCCESS, pid
+
+def qemu_colo_monitor(fail_fast = False):
+    status, pid = check_pid()
+    if status != OCF_SUCCESS:
+        return status, OCF_ERR_GENERIC
+
+    fd = qmp_open(fail_fast)
+
+    role, replication = qmp_check_state(fd)
+    if role != OCF_SUCCESS and role != OCF_RUNNING_MASTER:
+        return role, replication
+
+    if not fail_fast and OCF_RESKEY_CRM_meta_interval != 0:
+        # This isn't a probe monitor
+        block_job = qmp_check_resync(fd)
+        if block_job != None:
+            if "error" in block_job:
+                log.error("resync error: %s" % block_job["error"])
+                qmp_cancel_resync(fd)
+                # TODO: notify pacemaker about peer failure
+            elif block_job["ready"] == True:
+                log.info("resync done, starting colo")
+                peer = qmp_get_nbd_remote(fd)
+                qmp_start_colo(fd, peer)
+                # COLO started, our secondary now can be promoted if the
+                # primary fails
+                set_remote_master_score(peer, 100)
+            else:
+                pct_done = (float(block_job["offset"]) \
+                            / float(block_job["len"])) * 100
+                log.info("resync %.2f%% done" % pct_done)
+
+    fd.close()
+
+    return role, replication
+
+def qemu_colo_start():
+    if check_pid()[0] == OCF_SUCCESS:
+        log.info("qemu is already running")
+        return OCF_SUCCESS
+
+    run_command("qemu-img create -q -f qcow2 %s %s" \
+                % (ACTIVE_IMAGE, OCF_RESKEY_disk_size))
+    run_command("qemu-img create -q -f qcow2 %s %s" \
+                % (HIDDEN_IMAGE, OCF_RESKEY_disk_size))
+
+    rotate_logfile(QMP_LOG, 4)
+    rotate_logfile(QEMU_LOG, 4)
+    run_command(QEMU_SECONDARY_CMDLINE)
+
+    fd = qmp_open()
+    qmp_execute(fd, [
+        {"execute": "nbd-server-start", "arguments": {"addr": {"type": "inet", "data": {"host": str(OCF_RESKEY_listen_address), "port": str(NBD_PORT)}}}},
+        {"execute": "nbd-server-add", "arguments": {"device": "parent0", "writable": True}}
+        ])
+    fd.close()
+
+    return OCF_SUCCESS
+
+def env_do_shutdown_guest():
+    return OCF_RESKEY_CRM_meta_notify_active_uname \
+           and OCF_RESKEY_CRM_meta_notify_stop_uname \
+           and str.strip(OCF_RESKEY_CRM_meta_notify_active_uname) \
+               == str.strip(OCF_RESKEY_CRM_meta_notify_stop_uname)
+
+def env_find_secondary():
+    # slave(s) =
+    # OCF_RESKEY_CRM_meta_notify_slave_uname
+    # - OCF_RESKEY_CRM_meta_notify_stop_uname
+    # + OCF_RESKEY_CRM_meta_notify_start_uname
+    # Filter out hosts that are stopping and ourselves
+    for host in str.split(OCF_RESKEY_CRM_meta_notify_slave_uname, " "):
+        if host:
+            for stopping_host \
+                in str.split(OCF_RESKEY_CRM_meta_notify_stop_uname, " "):
+                if host == stopping_host:
+                    break
+            else:
+                if host != HOSTNAME:
+                    # we found a valid secondary
+                    return host
+
+    for host in str.split(OCF_RESKEY_CRM_meta_notify_start_uname, " "):
+        if host != HOSTNAME:
+            # we found a valid secondary
+            return host
+
+    # we found no secondary
+    return None
+
+def _qemu_colo_stop(monstatus, shutdown_guest):
+    # stop action must do everything possible to stop the resource
+    try:
+        now = time.time()
+        timeout = now + (OCF_RESKEY_CRM_meta_timeout/1000)-10
+        force_stop = False
+
+        if monstatus == OCF_NOT_RUNNING:
+            log.info("resource is already stopped")
+            return OCF_SUCCESS
+        elif monstatus == OCF_RUNNING_MASTER or monstatus == OCF_SUCCESS:
+            force_stop = False
+        else:
+            force_stop = True
+
+        if not force_stop:
+            fd = qmp_open(True)
+            if shutdown_guest:
+                if monstatus == OCF_RUNNING_MASTER:
+                    qmp_execute(fd, [{"execute": "system_powerdown"}])
+            else:
+                qmp_execute(fd, [{"execute": "quit"}])
+            fd.close()
+
+            # wait for qemu to stop
+            while time.time() < timeout:
+                status, pid = check_pid()
+                if status == OCF_NOT_RUNNING:
+                    # qemu stopped
+                    return OCF_SUCCESS
+                elif status == OCF_SUCCESS:
+                    # wait
+                    time.sleep(1)
+                else:
+                    # something went wrong, force stop instead
+                    break
+
+            log.warning("clean stop timeout reached")
+    except Exception as e:
+        log.warning("error while stopping: \"%s\"" % e.message)
+
+    log.info("force stopping qemu")
+
+    status, pid = check_pid()
+    if status == OCF_NOT_RUNNING:
+        return OCF_SUCCESS
+    try:
+        os.kill(pid, signal.SIGTERM)
+        time.sleep(2)
+        os.kill(pid, signal.SIGKILL)
+    except Exception:
+        pass
+
+    while check_pid()[0] != OCF_NOT_RUNNING:
+        time.sleep(1)
+
+    return OCF_SUCCESS
+
+def qemu_colo_stop():
+    shutdown_guest = env_do_shutdown_guest()
+    try:
+        role, replication = qemu_colo_monitor(True)
+    except Exception:
+        role, replication = OCF_ERR_GENERIC, OCF_ERR_GENERIC
+
+    status = _qemu_colo_stop(role, shutdown_guest)
+
+    if HOSTNAME == str.strip(OCF_RESKEY_CRM_meta_notify_master_uname):
+        peer = env_find_secondary()
+        if peer and (get_remote_master_score(peer) > 10) \
+           and not shutdown_guest:
+            # We where a healthy primary and had a healthy secondary. We where
+            # stopped so outdate ourselves, as the secondary will take over.
+            set_master_score(0)
+        else:
+            if role == OCF_RUNNING_MASTER:
+                # We where a healthy primary but had no healty secondary or it
+                # was stopped as well. So we have up-to-date data.
+                set_master_score(10)
+            else:
+                # We where a unhealthy primary but also had no healty secondary.
+                # So we still should have up-to-date data.
+                set_master_score(5)
+    else:
+        if get_master_score() > 10:
+            if role == OCF_SUCCESS:
+                if shutdown_guest:
+                    # We where a healthy secondary and (probably) had a healthy
+                    # primary and both where stopped. So we have up-to-date data
+                    # too.
+                    set_master_score(10)
+                else:
+                    # We where a healthy secondary and (probably) had a healthy
+                    # primary still running. So we are now out of date.
+                    set_master_score(0)
+            else:
+                # We where a unhealthy secondary. So we are now out of date.
+                set_master_score(0)
+
+    return status
+
+def qemu_colo_notify():
+    action = "%s-%s" % (OCF_RESKEY_CRM_meta_notify_type, \
+                        OCF_RESKEY_CRM_meta_notify_operation)
+
+    if action == "post-start":
+        if HOSTNAME == str.strip(OCF_RESKEY_CRM_meta_notify_master_uname):
+            peer = str.strip(OCF_RESKEY_CRM_meta_notify_start_uname)
+            fd = qmp_open()
+            qmp_start_resync(fd, peer)
+            fd.close()
+            # The secondary has inconsistent data until resync is finished
+            set_remote_master_score(peer, 0)
+
+    elif action == "pre-stop":
+        if not env_do_shutdown_guest() \
+           and HOSTNAME == str.strip(OCF_RESKEY_CRM_meta_notify_master_uname):
+            fd = qmp_open()
+            peer = qmp_get_nbd_remote(fd)
+            if peer == str.strip(OCF_RESKEY_CRM_meta_notify_stop_uname):
+                if qmp_check_resync(fd) != None:
+                    qmp_cancel_resync(fd)
+                elif peer and get_remote_master_score(peer) > 10:
+                    qmp_primary_failover(fd)
+            fd.close()
+
+    return OCF_SUCCESS
+
+def qemu_colo_promote():
+    role, replication = qemu_colo_monitor()
+
+    if role == OCF_SUCCESS and replication == OCF_NOT_RUNNING:
+        status = _qemu_colo_stop(role, False)
+        if status != OCF_SUCCESS:
+            return status
+
+        rotate_logfile(QMP_LOG, 4)
+        rotate_logfile(QEMU_LOG, 4)
+        run_command(QEMU_PRIMARY_CMDLINE)
+        set_master_score(101)
+
+        peer = env_find_secondary()
+        if peer:
+            fd = qmp_open()
+            qmp_start_resync(fd, peer)
+            # The secondary has inconsistent data until resync is finished
+            set_remote_master_score(peer, 0)
+            fd.close()
+        return OCF_SUCCESS
+    elif role == OCF_SUCCESS and replication == OCF_SUCCESS:
+        fd = qmp_open()
+        qmp_secondary_failover(fd)
+        set_master_score(101)
+
+        peer = env_find_secondary()
+        if peer:
+            qmp_start_resync(fd, peer)
+            # The secondary has inconsistent data until resync is finished
+            set_remote_master_score(peer, 0)
+        fd.close()
+        return OCF_SUCCESS
+    else:
+        return OCF_ERR_GENERIC
+
+def qemu_colo_demote():
+    status = qemu_colo_stop()
+    if status != OCF_SUCCESS:
+        return status
+    return qemu_colo_start()
+
+
+if OCF_ACTION == "meta-data":
+    qemu_colo_meta_data()
+    exit(OCF_SUCCESS)
+
+logs_open()
+
+status = qemu_colo_validate_all()
+# Exit here if our sanity checks fail, but try to continue if we need to stop
+if status != OCF_SUCCESS and OCF_ACTION != "stop":
+    exit(status)
+
+setup_constants()
+
+os.system("echo 'Action: %s' >> /tmp/ra-env.log" % OCF_ACTION)
+os.system("env >> /tmp/ra-env.log")
+
+try:
+    if OCF_ACTION == "start":
+        status = qemu_colo_start()
+    elif OCF_ACTION == "stop":
+        status = qemu_colo_stop()
+    elif OCF_ACTION == "monitor":
+        status = qemu_colo_monitor()[0]
+    elif OCF_ACTION == "notify":
+        status = qemu_colo_notify()
+    elif OCF_ACTION == "promote":
+        status = qemu_colo_promote()
+    elif OCF_ACTION == "demote":
+        status = qemu_colo_demote()
+    elif OCF_ACTION == "validate-all":
+        status = qemu_colo_validate_all()
+    else:
+        status = OCF_ERR_UNIMPLEMENTED
+except Error:
+    exit(OCF_ERR_GENERIC)
+else:
+    exit(status)
--
2.20.1



  parent reply	other threads:[~2019-11-21 17:54 UTC|newest]

Thread overview: 13+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-11-21 17:49 [PATCH 0/4] colo: Introduce resource agent and high-level test Lukas Straub
2019-11-21 17:49 ` [PATCH 1/4] block/quorum.c: stable children names Lukas Straub
2019-11-21 18:04   ` Eric Blake
2019-11-21 18:34     ` Lukas Straub
2019-11-26 14:21       ` Alberto Garcia
2019-11-27 21:20         ` Lukas Straub
2019-11-21 17:49 ` Lukas Straub [this message]
2019-11-21 17:49 ` [PATCH 3/4] colo: Introduce high-level test Lukas Straub
2019-11-21 17:49 ` [PATCH 4/4] MAINTAINERS: Add myself as maintainer for COLO resource agent Lukas Straub
2019-11-22  9:46 ` [PATCH 0/4] colo: Introduce resource agent and high-level test Dr. David Alan Gilbert
2019-11-27 21:11   ` Lukas Straub
2019-12-18  9:27     ` Lukas Straub
2019-12-18 19:46       ` Dr. David Alan Gilbert

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=4b51cd192df72d880841794199a10d7a834688d4.1574356137.git.lukasstraub2@web.de \
    --to=lukasstraub2@web.de \
    --cc=berto@igalia.com \
    --cc=chen.zhang@intel.com \
    --cc=dgilbert@redhat.com \
    --cc=jasowang@redhat.com \
    --cc=qemu-devel@nongnu.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.