All of lore.kernel.org
 help / color / mirror / Atom feed
* [Qemu-devel] [PATCH V17 00/12] quorum block filter
@ 2014-02-12 22:06 Benoît Canet
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 01/12] quorum: Create quorum.c, add QuorumChildRequest and QuorumAIOCB Benoît Canet
                   ` (12 more replies)
  0 siblings, 13 replies; 27+ messages in thread
From: Benoît Canet @ 2014-02-12 22:06 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, Benoît Canet, famz, mreitz, stefanha

I post this for review in prevision of 2.0 feature freeze.
Even if the series look correct please wait before merging because:

The QMP events in the "Add quorum mechanism" definitively needs to be reviewed
by Eric as they where changed.

I did not found any bugs while testing this version but I am willing to test the
code further before it's applied even it's reviewed by.

Best regards

Benoît

v17:
   minor comment cleanup requested by kevin in quorum_open [Kevin]
   better handling of qdict in quorum_open (avoid a crash on exit) [Benoît]
   remove uneeded NULL assigment on close [Benoît]
   rename QuorumChildRequest variables [Benoît]

in v16:
   fix unaligned comment [Benoît]
   s/QuorumSingleAIOCB/QuorumChildRequest/ [Kevin]
   s/total/num_children/ [Kevin]
   Remove QuourmAIOCB finished field as bdrv_aio_cancel is now used [Benoît]
   fix typedef struct [Kevin]
   Document blkverify mode [Kevin]
   s/callback/callbacks/ [Kevin]
   free data structures on quorum_aio_cancel [Kevin]
   drop bqs [Kevin]
   replace acb->bqs usage by acb->common.bs->opaque [Kevin]
   fix compilation error on patch 3 [Kevin]
   better document qemu_iovec_compare [Kevin]
   don't zero structure intended to be freed [Kevin]
   Do simple if [Kevin]
   merge for loops [Kevin]
   s/bs->file/s->bs[i]/ [Kevin]
   pass individual qiov to read one patch sooner [Kevin]
   remove uneeded forward declaration [Kevin]
   simplify compare function [Kevin]
   initialize vote list at the right place [Kevin]
   node_name should not be NULL assert it [Kevin]
   s/i/max/g in get_vote_winner [Kevin]
   document why quorum_iovec_compare exists [Kevin]
   convert comment into assert [Kevin]
   set quorum default to true [Kevin]
   simplify quorum success condition after max addition quorum_vote test [Benoît]
   remove uneeded test after vote [Kevin]
   change comment on quorum failure searching [Benoît]
   introduce reference in QUORUM_FAILURE [Kevin]
   remove unneeded KEY_ defines  [Benoît]
   Use QemuOptList to shorten code [Kevin]
   remove uneeded includes [Benoît]
   fill authorization table [Benoît]
   extract quorum_has_too_much_io_failed [Benoît]
   call this function when not reading [Benoît]
   report error on io failure [Benoît]
        !! need Eric review
   don't flush on exit [Kevin]


Benoît Canet (12):
  quorum: Create quorum.c, add QuorumChildRequest and QuorumAIOCB.
  quorum: Create BDRVQuorumState and BlkDriver and do init.
  quorum: Add quorum_aio_writev and its dependencies.
  blkverify: Extract qemu_iovec_clone() and qemu_iovec_compare() from
    blkverify.
  quorum: Add quorum_aio_readv.
  quorum: Add quorum mechanism.
  quorum: Add quorum_getlength().
  quorum: Add quorum_invalidate_cache().
  quorum: Add quorum_co_flush().
  quorum: Implement recursive .bdrv_recurse_is_first_non_filter in
    quorum.
  quorum: Add quorum_open() and quorum_close().
  quorum: Add unit test.

 block/Makefile.objs        |   1 +
 block/blkverify.c          | 108 +-----
 block/quorum.c             | 867 +++++++++++++++++++++++++++++++++++++++++++++
 configure                  |  36 ++
 docs/qmp/qmp-events.txt    |  36 ++
 include/monitor/monitor.h  |   2 +
 include/qemu-common.h      |   2 +
 monitor.c                  |   4 +
 qapi-schema.json           |  21 +-
 tests/qemu-iotests/081     |  95 +++++
 tests/qemu-iotests/081.out |  34 ++
 tests/qemu-iotests/group   |   1 +
 util/iov.c                 | 106 ++++++
 13 files changed, 1206 insertions(+), 107 deletions(-)
 create mode 100644 block/quorum.c
 create mode 100755 tests/qemu-iotests/081
 create mode 100644 tests/qemu-iotests/081.out

-- 
1.8.3.2

^ permalink raw reply	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH V17 01/12] quorum: Create quorum.c, add QuorumChildRequest and QuorumAIOCB.
  2014-02-12 22:06 [Qemu-devel] [PATCH V17 00/12] quorum block filter Benoît Canet
@ 2014-02-12 22:06 ` Benoît Canet
  2014-02-15  1:17   ` Max Reitz
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 02/12] quorum: Create BDRVQuorumState and BlkDriver and do init Benoît Canet
                   ` (11 subsequent siblings)
  12 siblings, 1 reply; 27+ messages in thread
From: Benoît Canet @ 2014-02-12 22:06 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, Benoît Canet, mreitz, stefanha

From: Benoît Canet <benoit@irqsave.net>

Quorum is a block filter mirroring writes to num_children children.
For reads quorum reads each children and does a vote.
If more than vote_threshold versions are identicals the quorum is reached and
this winning version is returned to the guest. So quorum prevents bit corruption.
For high availability purpose minority errors are reported via QMP but the guest
does not see them.

This patch create the driver C source file and introduce the structures that
will be used in asynchronous reads and writes.

Signed-off-by: Benoit Canet <benoit@irqsave.net>
Reviewed-by: Max Reitz <mreitz@redhat.com>
---
 block/Makefile.objs |  1 +
 block/quorum.c      | 53 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 54 insertions(+)
 create mode 100644 block/quorum.c

diff --git a/block/Makefile.objs b/block/Makefile.objs
index e254a21..716556f 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -3,6 +3,7 @@ block-obj-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-c
 block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
 block-obj-y += qed-check.o
 block-obj-$(CONFIG_VHDX) += vhdx.o vhdx-endian.o vhdx-log.o
+block-obj-y += quorum.o
 block-obj-y += parallels.o blkdebug.o blkverify.o
 block-obj-y += snapshot.o qapi.o
 block-obj-$(CONFIG_WIN32) += raw-win32.o win32-aio.o
diff --git a/block/quorum.c b/block/quorum.c
new file mode 100644
index 0000000..950f5cc
--- /dev/null
+++ b/block/quorum.c
@@ -0,0 +1,53 @@
+/*
+ * Quorum Block filter
+ *
+ * Copyright (C) 2012-2014 Nodalink, EURL.
+ *
+ * Author:
+ *   Benoît Canet <benoit.canet@irqsave.net>
+ *
+ * Based on the design and code of blkverify.c (Copyright (C) 2010 IBM, Corp)
+ * and blkmirror.c (Copyright (C) 2011 Red Hat, Inc).
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+
+#include "block/block_int.h"
+
+typedef struct QuorumAIOCB QuorumAIOCB;
+
+/* Quorum will create one instance of the following structure per operation it
+ * performs on its children.
+ * So for each read/write operation coming from the upper layer there will be
+ * $children_count QuorumChildRequest.
+ */
+typedef struct QuorumChildRequest {
+    BlockDriverAIOCB *aiocb;
+    QEMUIOVector qiov;
+    uint8_t *buf;
+    int ret;
+    QuorumAIOCB *parent;
+} QuorumChildRequest;
+
+/* Quorum will use the following structure to track progress of each read/write
+ * operation received by the upper layer.
+ * This structure hold pointers to the QuorumChildRequest structures instances
+ * used to do operations on each children and track overall progress.
+ */
+struct QuorumAIOCB {
+    BlockDriverAIOCB common;
+
+    /* Request metadata */
+    uint64_t sector_num;
+    int nb_sectors;
+
+    QEMUIOVector *qiov;         /* calling IOV */
+
+    QuorumChildRequest *qcrs;   /* individual child requests */
+    int count;                  /* number of completed AIOCB */
+    int success_count;          /* number of successfully completed AIOCB */
+
+    bool is_read;
+    int vote_ret;
+};
-- 
1.8.3.2

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH V17 02/12] quorum: Create BDRVQuorumState and BlkDriver and do init.
  2014-02-12 22:06 [Qemu-devel] [PATCH V17 00/12] quorum block filter Benoît Canet
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 01/12] quorum: Create quorum.c, add QuorumChildRequest and QuorumAIOCB Benoît Canet
@ 2014-02-12 22:06 ` Benoît Canet
  2014-02-15  1:22   ` Max Reitz
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 03/12] quorum: Add quorum_aio_writev and its dependencies Benoît Canet
                   ` (10 subsequent siblings)
  12 siblings, 1 reply; 27+ messages in thread
From: Benoît Canet @ 2014-02-12 22:06 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, Benoît Canet, mreitz, stefanha

From: Benoît Canet <benoit@irqsave.net>

Create the structure holding the quorum settings and write the minimal block
driver instanciation boilerplate.

Signed-off-by: Benoit Canet <benoit@irqsave.net>
Reviewed-by: Max Reitz <mreitz@redhat.com>
---
 block/quorum.c | 31 +++++++++++++++++++++++++++++++
 1 file changed, 31 insertions(+)

diff --git a/block/quorum.c b/block/quorum.c
index 950f5cc..36c5bb8 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -15,6 +15,23 @@
 
 #include "block/block_int.h"
 
+/* the following structure holds the state of one quorum instance */
+typedef struct BDRVQuorumState {
+    BlockDriverState **bs; /* children BlockDriverStates */
+    int num_children;      /* children count */
+    int threshold;         /* if less than threshold children reads gave the
+                            * same result a quorum error occurs.
+                            */
+    bool is_blkverify;     /* true if the driver is in blkverify mode
+                            * Writes are mirrored on two children devices.
+                            * On reads the two children devices contents are
+                            * compared and when a difference is spotted its
+                            * location is printed and the code abort.
+                            * It is useful to debug other block drivers by
+                            * comparing them with a reference one.
+                            */
+} BDRVQuorumState;
+
 typedef struct QuorumAIOCB QuorumAIOCB;
 
 /* Quorum will create one instance of the following structure per operation it
@@ -51,3 +68,17 @@ struct QuorumAIOCB {
     bool is_read;
     int vote_ret;
 };
+
+static BlockDriver bdrv_quorum = {
+    .format_name        = "quorum",
+    .protocol_name      = "quorum",
+
+    .instance_size      = sizeof(BDRVQuorumState),
+};
+
+static void bdrv_quorum_init(void)
+{
+    bdrv_register(&bdrv_quorum);
+}
+
+block_init(bdrv_quorum_init);
-- 
1.8.3.2

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH V17 03/12] quorum: Add quorum_aio_writev and its dependencies.
  2014-02-12 22:06 [Qemu-devel] [PATCH V17 00/12] quorum block filter Benoît Canet
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 01/12] quorum: Create quorum.c, add QuorumChildRequest and QuorumAIOCB Benoît Canet
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 02/12] quorum: Create BDRVQuorumState and BlkDriver and do init Benoît Canet
@ 2014-02-12 22:06 ` Benoît Canet
  2014-02-15  1:31   ` Max Reitz
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 04/12] blkverify: Extract qemu_iovec_clone() and qemu_iovec_compare() from blkverify Benoît Canet
                   ` (9 subsequent siblings)
  12 siblings, 1 reply; 27+ messages in thread
From: Benoît Canet @ 2014-02-12 22:06 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, Benoît Canet, mreitz, stefanha

From: Benoît Canet <benoit@irqsave.net>

Writes are mirrored num_children times on num_children devices.

Signed-off-by: Benoit Canet <benoit@irqsave.net>
---
 block/quorum.c | 103 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 103 insertions(+)

diff --git a/block/quorum.c b/block/quorum.c
index 36c5bb8..197cdca 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -69,11 +69,114 @@ struct QuorumAIOCB {
     int vote_ret;
 };
 
+static void quorum_aio_cancel(BlockDriverAIOCB *blockacb)
+{
+    QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common);
+    BDRVQuorumState *s = acb->common.bs->opaque;
+    int i;
+
+    /* cancel all callbacks */
+    for (i = 0; i < s->num_children; i++) {
+        bdrv_aio_cancel(acb->qcrs[i].aiocb);
+    }
+
+    g_free(acb->qcrs);
+    qemu_aio_release(acb);
+}
+
+static AIOCBInfo quorum_aiocb_info = {
+    .aiocb_size         = sizeof(QuorumAIOCB),
+    .cancel             = quorum_aio_cancel,
+};
+
+static void quorum_aio_finalize(QuorumAIOCB *acb)
+{
+    int ret = 0;
+
+    acb->common.cb(acb->common.opaque, ret);
+
+    g_free(acb->qcrs);
+    qemu_aio_release(acb);
+}
+
+static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
+                                   BlockDriverState *bs,
+                                   QEMUIOVector *qiov,
+                                   uint64_t sector_num,
+                                   int nb_sectors,
+                                   BlockDriverCompletionFunc *cb,
+                                   void *opaque)
+{
+    QuorumAIOCB *acb = qemu_aio_get(&quorum_aiocb_info, bs, cb, opaque);
+    int i;
+
+    acb->common.bs->opaque = s;
+    acb->sector_num = sector_num;
+    acb->nb_sectors = nb_sectors;
+    acb->qiov = qiov;
+    acb->qcrs = g_new0(QuorumChildRequest, s->num_children);
+    acb->count = 0;
+    acb->success_count = 0;
+    acb->is_read = false;
+    acb->vote_ret = 0;
+
+    for (i = 0; i < s->num_children; i++) {
+        acb->qcrs[i].buf = NULL;
+        acb->qcrs[i].ret = 0;
+        acb->qcrs[i].parent = acb;
+    }
+
+    return acb;
+}
+
+static void quorum_aio_cb(void *opaque, int ret)
+{
+    QuorumChildRequest *sacb = opaque;
+    QuorumAIOCB *acb = sacb->parent;
+    BDRVQuorumState *s = acb->common.bs->opaque;
+
+    sacb->ret = ret;
+    acb->count++;
+    if (ret == 0) {
+        acb->success_count++;
+    }
+    assert(acb->count <= s->num_children);
+    assert(acb->success_count <= s->num_children);
+    if (acb->count < s->num_children) {
+        return;
+    }
+
+    quorum_aio_finalize(acb);
+}
+
+static BlockDriverAIOCB *quorum_aio_writev(BlockDriverState *bs,
+                                          int64_t sector_num,
+                                          QEMUIOVector *qiov,
+                                          int nb_sectors,
+                                          BlockDriverCompletionFunc *cb,
+                                          void *opaque)
+{
+    BDRVQuorumState *s = bs->opaque;
+    QuorumAIOCB *acb = quorum_aio_get(s, bs, qiov, sector_num, nb_sectors,
+                                      cb, opaque);
+    int i;
+
+    for (i = 0; i < s->num_children; i++) {
+        acb->qcrs[i].aiocb = bdrv_aio_writev(s->bs[i], sector_num, qiov,
+                                             nb_sectors, &quorum_aio_cb,
+                                             &acb->qcrs[i]);
+    }
+
+    return &acb->common;
+}
+
 static BlockDriver bdrv_quorum = {
     .format_name        = "quorum",
     .protocol_name      = "quorum",
 
     .instance_size      = sizeof(BDRVQuorumState),
+
+    .bdrv_aio_writev    = quorum_aio_writev,
 };
 
 static void bdrv_quorum_init(void)
-- 
1.8.3.2

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH V17 04/12] blkverify: Extract qemu_iovec_clone() and qemu_iovec_compare() from blkverify.
  2014-02-12 22:06 [Qemu-devel] [PATCH V17 00/12] quorum block filter Benoît Canet
                   ` (2 preceding siblings ...)
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 03/12] quorum: Add quorum_aio_writev and its dependencies Benoît Canet
@ 2014-02-12 22:06 ` Benoît Canet
  2014-02-15  1:36   ` Max Reitz
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 05/12] quorum: Add quorum_aio_readv Benoît Canet
                   ` (8 subsequent siblings)
  12 siblings, 1 reply; 27+ messages in thread
From: Benoît Canet @ 2014-02-12 22:06 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, Benoît Canet, mreitz, stefanha

From: Benoît Canet <benoit@irqsave.net>

qemu_iovec_compare() will be used to compare IOs vectors in quorum blkverify
mode. The patch extract these functions in order to factorize the code.

Signed-off-by: Benoit Canet <benoit@irqsave.net>
Reviewed-by: Max Reitz <mreitz@redhat.com>
---
 block/blkverify.c     | 108 +-------------------------------------------------
 include/qemu-common.h |   2 +
 util/iov.c            | 106 +++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 110 insertions(+), 106 deletions(-)

diff --git a/block/blkverify.c b/block/blkverify.c
index b57da59..c86ad7a 100644
--- a/block/blkverify.c
+++ b/block/blkverify.c
@@ -173,110 +173,6 @@ static int64_t blkverify_getlength(BlockDriverState *bs)
     return bdrv_getlength(s->test_file);
 }
 
-/**
- * Check that I/O vector contents are identical
- *
- * @a:          I/O vector
- * @b:          I/O vector
- * @ret:        Offset to first mismatching byte or -1 if match
- */
-static ssize_t blkverify_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
-{
-    int i;
-    ssize_t offset = 0;
-
-    assert(a->niov == b->niov);
-    for (i = 0; i < a->niov; i++) {
-        size_t len = 0;
-        uint8_t *p = (uint8_t *)a->iov[i].iov_base;
-        uint8_t *q = (uint8_t *)b->iov[i].iov_base;
-
-        assert(a->iov[i].iov_len == b->iov[i].iov_len);
-        while (len < a->iov[i].iov_len && *p++ == *q++) {
-            len++;
-        }
-
-        offset += len;
-
-        if (len != a->iov[i].iov_len) {
-            return offset;
-        }
-    }
-    return -1;
-}
-
-typedef struct {
-    int src_index;
-    struct iovec *src_iov;
-    void *dest_base;
-} IOVectorSortElem;
-
-static int sortelem_cmp_src_base(const void *a, const void *b)
-{
-    const IOVectorSortElem *elem_a = a;
-    const IOVectorSortElem *elem_b = b;
-
-    /* Don't overflow */
-    if (elem_a->src_iov->iov_base < elem_b->src_iov->iov_base) {
-        return -1;
-    } else if (elem_a->src_iov->iov_base > elem_b->src_iov->iov_base) {
-        return 1;
-    } else {
-        return 0;
-    }
-}
-
-static int sortelem_cmp_src_index(const void *a, const void *b)
-{
-    const IOVectorSortElem *elem_a = a;
-    const IOVectorSortElem *elem_b = b;
-
-    return elem_a->src_index - elem_b->src_index;
-}
-
-/**
- * Copy contents of I/O vector
- *
- * The relative relationships of overlapping iovecs are preserved.  This is
- * necessary to ensure identical semantics in the cloned I/O vector.
- */
-static void blkverify_iovec_clone(QEMUIOVector *dest, const QEMUIOVector *src,
-                                  void *buf)
-{
-    IOVectorSortElem sortelems[src->niov];
-    void *last_end;
-    int i;
-
-    /* Sort by source iovecs by base address */
-    for (i = 0; i < src->niov; i++) {
-        sortelems[i].src_index = i;
-        sortelems[i].src_iov = &src->iov[i];
-    }
-    qsort(sortelems, src->niov, sizeof(sortelems[0]), sortelem_cmp_src_base);
-
-    /* Allocate buffer space taking into account overlapping iovecs */
-    last_end = NULL;
-    for (i = 0; i < src->niov; i++) {
-        struct iovec *cur = sortelems[i].src_iov;
-        ptrdiff_t rewind = 0;
-
-        /* Detect overlap */
-        if (last_end && last_end > cur->iov_base) {
-            rewind = last_end - cur->iov_base;
-        }
-
-        sortelems[i].dest_base = buf - rewind;
-        buf += cur->iov_len - MIN(rewind, cur->iov_len);
-        last_end = MAX(cur->iov_base + cur->iov_len, last_end);
-    }
-
-    /* Sort by source iovec index and build destination iovec */
-    qsort(sortelems, src->niov, sizeof(sortelems[0]), sortelem_cmp_src_index);
-    for (i = 0; i < src->niov; i++) {
-        qemu_iovec_add(dest, sortelems[i].dest_base, src->iov[i].iov_len);
-    }
-}
-
 static BlkverifyAIOCB *blkverify_aio_get(BlockDriverState *bs, bool is_write,
                                          int64_t sector_num, QEMUIOVector *qiov,
                                          int nb_sectors,
@@ -340,7 +236,7 @@ static void blkverify_aio_cb(void *opaque, int ret)
 
 static void blkverify_verify_readv(BlkverifyAIOCB *acb)
 {
-    ssize_t offset = blkverify_iovec_compare(acb->qiov, &acb->raw_qiov);
+    ssize_t offset = qemu_iovec_compare(acb->qiov, &acb->raw_qiov);
     if (offset != -1) {
         blkverify_err(acb, "contents mismatch in sector %" PRId64,
                       acb->sector_num + (int64_t)(offset / BDRV_SECTOR_SIZE));
@@ -358,7 +254,7 @@ static BlockDriverAIOCB *blkverify_aio_readv(BlockDriverState *bs,
     acb->verify = blkverify_verify_readv;
     acb->buf = qemu_blockalign(bs->file, qiov->size);
     qemu_iovec_init(&acb->raw_qiov, acb->qiov->niov);
-    blkverify_iovec_clone(&acb->raw_qiov, qiov, acb->buf);
+    qemu_iovec_clone(&acb->raw_qiov, qiov, acb->buf);
 
     bdrv_aio_readv(s->test_file, sector_num, qiov, nb_sectors,
                    blkverify_aio_cb, acb);
diff --git a/include/qemu-common.h b/include/qemu-common.h
index 5054836..d70783e 100644
--- a/include/qemu-common.h
+++ b/include/qemu-common.h
@@ -346,6 +346,8 @@ size_t qemu_iovec_from_buf(QEMUIOVector *qiov, size_t offset,
                            const void *buf, size_t bytes);
 size_t qemu_iovec_memset(QEMUIOVector *qiov, size_t offset,
                          int fillc, size_t bytes);
+ssize_t qemu_iovec_compare(QEMUIOVector *a, QEMUIOVector *b);
+void qemu_iovec_clone(QEMUIOVector *dest, const QEMUIOVector *src, void *buf);
 
 bool buffer_is_zero(const void *buf, size_t len);
 
diff --git a/util/iov.c b/util/iov.c
index bb46c04..03934da 100644
--- a/util/iov.c
+++ b/util/iov.c
@@ -378,6 +378,112 @@ size_t qemu_iovec_memset(QEMUIOVector *qiov, size_t offset,
     return iov_memset(qiov->iov, qiov->niov, offset, fillc, bytes);
 }
 
+/**
+ * Check that I/O vector contents are identical
+ *
+ * The IO vectors must have the same structure (same length of all parts).
+ * A typical usage is to compare vectors created with qemu_iovec_clone().
+ *
+ * @a:          I/O vector
+ * @b:          I/O vector
+ * @ret:        Offset to first mismatching byte or -1 if match
+ */
+ssize_t qemu_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
+{
+    int i;
+    ssize_t offset = 0;
+
+    assert(a->niov == b->niov);
+    for (i = 0; i < a->niov; i++) {
+        size_t len = 0;
+        uint8_t *p = (uint8_t *)a->iov[i].iov_base;
+        uint8_t *q = (uint8_t *)b->iov[i].iov_base;
+
+        assert(a->iov[i].iov_len == b->iov[i].iov_len);
+        while (len < a->iov[i].iov_len && *p++ == *q++) {
+            len++;
+        }
+
+        offset += len;
+
+        if (len != a->iov[i].iov_len) {
+            return offset;
+        }
+    }
+    return -1;
+}
+
+typedef struct {
+    int src_index;
+    struct iovec *src_iov;
+    void *dest_base;
+} IOVectorSortElem;
+
+static int sortelem_cmp_src_base(const void *a, const void *b)
+{
+    const IOVectorSortElem *elem_a = a;
+    const IOVectorSortElem *elem_b = b;
+
+    /* Don't overflow */
+    if (elem_a->src_iov->iov_base < elem_b->src_iov->iov_base) {
+        return -1;
+    } else if (elem_a->src_iov->iov_base > elem_b->src_iov->iov_base) {
+        return 1;
+    } else {
+        return 0;
+    }
+}
+
+static int sortelem_cmp_src_index(const void *a, const void *b)
+{
+    const IOVectorSortElem *elem_a = a;
+    const IOVectorSortElem *elem_b = b;
+
+    return elem_a->src_index - elem_b->src_index;
+}
+
+/**
+ * Copy contents of I/O vector
+ *
+ * The relative relationships of overlapping iovecs are preserved.  This is
+ * necessary to ensure identical semantics in the cloned I/O vector.
+ */
+void qemu_iovec_clone(QEMUIOVector *dest, const QEMUIOVector *src, void *buf)
+{
+    IOVectorSortElem sortelems[src->niov];
+    void *last_end;
+    int i;
+
+    /* Sort by source iovecs by base address */
+    for (i = 0; i < src->niov; i++) {
+        sortelems[i].src_index = i;
+        sortelems[i].src_iov = &src->iov[i];
+    }
+    qsort(sortelems, src->niov, sizeof(sortelems[0]), sortelem_cmp_src_base);
+
+    /* Allocate buffer space taking into account overlapping iovecs */
+    last_end = NULL;
+    for (i = 0; i < src->niov; i++) {
+        struct iovec *cur = sortelems[i].src_iov;
+        ptrdiff_t rewind = 0;
+
+        /* Detect overlap */
+        if (last_end && last_end > cur->iov_base) {
+            rewind = last_end - cur->iov_base;
+        }
+
+        sortelems[i].dest_base = buf - rewind;
+        buf += cur->iov_len - MIN(rewind, cur->iov_len);
+        last_end = MAX(cur->iov_base + cur->iov_len, last_end);
+    }
+
+    /* Sort by source iovec index and build destination iovec */
+    qsort(sortelems, src->niov, sizeof(sortelems[0]), sortelem_cmp_src_index);
+    for (i = 0; i < src->niov; i++) {
+        qemu_iovec_add(dest, sortelems[i].dest_base, src->iov[i].iov_len);
+    }
+}
+
 size_t iov_discard_front(struct iovec **iov, unsigned int *iov_cnt,
                          size_t bytes)
 {
-- 
1.8.3.2

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH V17 05/12] quorum: Add quorum_aio_readv.
  2014-02-12 22:06 [Qemu-devel] [PATCH V17 00/12] quorum block filter Benoît Canet
                   ` (3 preceding siblings ...)
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 04/12] blkverify: Extract qemu_iovec_clone() and qemu_iovec_compare() from blkverify Benoît Canet
@ 2014-02-12 22:06 ` Benoît Canet
  2014-02-15  1:40   ` Max Reitz
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 06/12] quorum: Add quorum mechanism Benoît Canet
                   ` (7 subsequent siblings)
  12 siblings, 1 reply; 27+ messages in thread
From: Benoît Canet @ 2014-02-12 22:06 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, Benoît Canet, mreitz, stefanha

From: Benoît Canet <benoit@irqsave.net>

Add code to do num_children reads in parallel and cleanup the structures
afterward.

Signed-off-by: Benoit Canet <benoit@irqsave.net>
Reviewed-by: Max Reitz <mreitz@redhat.com>
---
 block/quorum.c | 39 ++++++++++++++++++++++++++++++++++++++-
 1 file changed, 38 insertions(+), 1 deletion(-)

diff --git a/block/quorum.c b/block/quorum.c
index 197cdca..c7a5d79 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -91,10 +91,18 @@ static AIOCBInfo quorum_aiocb_info = {
 
 static void quorum_aio_finalize(QuorumAIOCB *acb)
 {
-    int ret = 0;
+    BDRVQuorumState *s = acb->common.bs->opaque;
+    int i, ret = 0;
 
     acb->common.cb(acb->common.opaque, ret);
 
+    if (acb->is_read) {
+        for (i = 0; i < s->num_children; i++) {
+            qemu_vfree(acb->qcrs[i].buf);
+            qemu_iovec_destroy(&acb->qcrs[i].qiov);
+        }
+    }
+
     g_free(acb->qcrs);
     qemu_aio_release(acb);
 }
@@ -149,6 +157,34 @@ static void quorum_aio_cb(void *opaque, int ret)
     quorum_aio_finalize(acb);
 }
 
+static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
+                                         int64_t sector_num,
+                                         QEMUIOVector *qiov,
+                                         int nb_sectors,
+                                         BlockDriverCompletionFunc *cb,
+                                         void *opaque)
+{
+    BDRVQuorumState *s = bs->opaque;
+    QuorumAIOCB *acb = quorum_aio_get(s, bs, qiov, sector_num,
+                                      nb_sectors, cb, opaque);
+    int i;
+
+    acb->is_read = true;
+
+    for (i = 0; i < s->num_children; i++) {
+        acb->qcrs[i].buf = qemu_blockalign(s->bs[i], qiov->size);
+        qemu_iovec_init(&acb->qcrs[i].qiov, qiov->niov);
+        qemu_iovec_clone(&acb->qcrs[i].qiov, qiov, acb->qcrs[i].buf);
+    }
+
+    for (i = 0; i < s->num_children; i++) {
+        bdrv_aio_readv(s->bs[i], sector_num, &acb->qcrs[i].qiov, nb_sectors,
+                       quorum_aio_cb, &acb->qcrs[i]);
+    }
+
+    return &acb->common;
+}
+
 static BlockDriverAIOCB *quorum_aio_writev(BlockDriverState *bs,
                                           int64_t sector_num,
                                           QEMUIOVector *qiov,
@@ -176,6 +212,7 @@ static BlockDriver bdrv_quorum = {
 
     .instance_size      = sizeof(BDRVQuorumState),
 
+    .bdrv_aio_readv     = quorum_aio_readv,
     .bdrv_aio_writev    = quorum_aio_writev,
 };
 
-- 
1.8.3.2

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH V17 06/12] quorum: Add quorum mechanism.
  2014-02-12 22:06 [Qemu-devel] [PATCH V17 00/12] quorum block filter Benoît Canet
                   ` (4 preceding siblings ...)
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 05/12] quorum: Add quorum_aio_readv Benoît Canet
@ 2014-02-12 22:06 ` Benoît Canet
  2014-02-15  1:53   ` Max Reitz
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 07/12] quorum: Add quorum_getlength() Benoît Canet
                   ` (6 subsequent siblings)
  12 siblings, 1 reply; 27+ messages in thread
From: Benoît Canet @ 2014-02-12 22:06 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, Benoît Canet, mreitz, stefanha

From: Benoît Canet <benoit@irqsave.net>

This patchset enable the core of the quorum mechanism.
The num_children reads are compared to get the majority version and if this
version exist more than threshold time the guest won't see the error at all.
QMP events are used to report individual driver errors or whole quorum.
error to the management.

Use gnutls's SHA-256 to compare versions.

--enable-quorum must be used to enable the feature.

Signed-off-by: Benoit Canet <benoit@irqsave.net>
---
 block/Makefile.objs       |   2 +-
 block/quorum.c            | 394 ++++++++++++++++++++++++++++++++++++++++++++++
 configure                 |  36 +++++
 docs/qmp/qmp-events.txt   |  36 +++++
 include/monitor/monitor.h |   2 +
 monitor.c                 |   2 +
 6 files changed, 471 insertions(+), 1 deletion(-)

diff --git a/block/Makefile.objs b/block/Makefile.objs
index 716556f..425d7fb 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -3,7 +3,7 @@ block-obj-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-c
 block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
 block-obj-y += qed-check.o
 block-obj-$(CONFIG_VHDX) += vhdx.o vhdx-endian.o vhdx-log.o
-block-obj-y += quorum.o
+block-obj-$(CONFIG_QUORUM) += quorum.o
 block-obj-y += parallels.o blkdebug.o blkverify.o
 block-obj-y += snapshot.o qapi.o
 block-obj-$(CONFIG_WIN32) += raw-win32.o win32-aio.o
diff --git a/block/quorum.c b/block/quorum.c
index c7a5d79..bfd8e9d 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -13,7 +13,43 @@
  * See the COPYING file in the top-level directory.
  */
 
+#include <gnutls/gnutls.h>
+#include <gnutls/crypto.h>
 #include "block/block_int.h"
+#include "qapi/qmp/qjson.h"
+
+#define HASH_LENGTH 32
+
+/* This union holds a vote hash value */
+typedef union QuorumVoteValue {
+    char h[HASH_LENGTH];       /* SHA-256 hash */
+    int64_t l;                 /* simpler 64 bits hash */
+} QuorumVoteValue;
+
+/* A vote item */
+typedef struct QuorumVoteItem {
+    int index;
+    QLIST_ENTRY(QuorumVoteItem) next;
+} QuorumVoteItem;
+
+/* this structure is a vote version. A version is the set of votes sharing the
+ * same vote value.
+ * The set of votes will be tracked with the items field and its cardinality is
+ * vote_count.
+ */
+typedef struct QuorumVoteVersion {
+    QuorumVoteValue value;
+    int index;
+    int vote_count;
+    QLIST_HEAD(, QuorumVoteItem) items;
+    QLIST_ENTRY(QuorumVoteVersion) next;
+} QuorumVoteVersion;
+
+/* this structure holds a group of vote versions together */
+typedef struct QuorumVotes {
+    QLIST_HEAD(, QuorumVoteVersion) vote_list;
+    bool (*compare)(QuorumVoteValue *a, QuorumVoteValue *b);
+} QuorumVotes;
 
 /* the following structure holds the state of one quorum instance */
 typedef struct BDRVQuorumState {
@@ -65,10 +101,14 @@ struct QuorumAIOCB {
     int count;                  /* number of completed AIOCB */
     int success_count;          /* number of successfully completed AIOCB */
 
+    QuorumVotes votes;
+
     bool is_read;
     int vote_ret;
 };
 
+static void quorum_vote(QuorumAIOCB *acb);
+
 static void quorum_aio_cancel(BlockDriverAIOCB *blockacb)
 {
     QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common);
@@ -94,6 +134,10 @@ static void quorum_aio_finalize(QuorumAIOCB *acb)
     BDRVQuorumState *s = acb->common.bs->opaque;
     int i, ret = 0;
 
+    if (acb->vote_ret) {
+        ret = acb->vote_ret;
+    }
+
     acb->common.cb(acb->common.opaque, ret);
 
     if (acb->is_read) {
@@ -107,6 +151,16 @@ static void quorum_aio_finalize(QuorumAIOCB *acb)
     qemu_aio_release(acb);
 }
 
+static bool quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
+{
+    return memcmp(a->h, b->h, HASH_LENGTH);
+}
+
+static bool quorum_64bits_compare(QuorumVoteValue *a, QuorumVoteValue *b)
+{
+    return a->l != b->l;
+}
+
 static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
                                    BlockDriverState *bs,
                                    QEMUIOVector *qiov,
@@ -125,6 +179,8 @@ static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
     acb->qcrs = g_new0(QuorumChildRequest, s->num_children);
     acb->count = 0;
     acb->success_count = 0;
+    acb->votes.compare = quorum_sha256_compare;
+    QLIST_INIT(&acb->votes.vote_list);
     acb->is_read = false;
     acb->vote_ret = 0;
 
@@ -137,6 +193,53 @@ static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
     return acb;
 }
 
+static void quorum_report_bad(QuorumAIOCB *acb, char *node_name, int ret)
+{
+    QObject *data;
+    assert(node_name);
+    data = qobject_from_jsonf("{ 'ret': %i"
+                              ", 'node-name': \"%s\""
+                              ", 'sector-num': %" PRId64
+                              ", 'sectors-count': %i }",
+                              ret,
+                              node_name,
+                              acb->sector_num,
+                              acb->nb_sectors);
+    monitor_protocol_event(QEVENT_QUORUM_REPORT_BAD, data);
+    qobject_decref(data);
+}
+
+static void quorum_report_failure(QuorumAIOCB *acb)
+{
+    QObject *data;
+    const char *reference = acb->common.bs->device_name[0] ?
+                            acb->common.bs->device_name :
+                            acb->common.bs->node_name;
+    data = qobject_from_jsonf("{ 'reference': \"%s\""
+                              ", 'sector-num': %" PRId64
+                              ", 'sectors-count': %i }",
+                              reference,
+                              acb->sector_num,
+                              acb->nb_sectors);
+    monitor_protocol_event(QEVENT_QUORUM_FAILURE, data);
+    qobject_decref(data);
+}
+
+static int quorum_vote_error(QuorumAIOCB *acb);
+
+static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb)
+{
+    BDRVQuorumState *s = acb->common.bs->opaque;
+
+    if (acb->success_count < s->threshold) {
+        acb->vote_ret = quorum_vote_error(acb);
+        quorum_report_failure(acb);
+        return true;
+    }
+
+    return false;
+}
+
 static void quorum_aio_cb(void *opaque, int ret)
 {
     QuorumChildRequest *sacb = opaque;
@@ -147,6 +250,8 @@ static void quorum_aio_cb(void *opaque, int ret)
     acb->count++;
     if (ret == 0) {
         acb->success_count++;
+    } else {
+        quorum_report_bad(acb, sacb->aiocb->bs->node_name, ret);
     }
     assert(acb->count <= s->num_children);
     assert(acb->success_count <= s->num_children);
@@ -154,9 +259,298 @@ static void quorum_aio_cb(void *opaque, int ret)
         return;
     }
 
+    /* Do the vote on read */
+    if (acb->is_read) {
+        quorum_vote(acb);
+    } else {
+        quorum_has_too_much_io_failed(acb);
+    }
+
     quorum_aio_finalize(acb);
 }
 
+static void quorum_report_bad_versions(BDRVQuorumState *s,
+                                       QuorumAIOCB *acb,
+                                       QuorumVoteValue *value)
+{
+    QuorumVoteVersion *version;
+    QuorumVoteItem *item;
+
+    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
+        if (!acb->votes.compare(&version->value, value)) {
+            continue;
+        }
+        QLIST_FOREACH(item, &version->items, next) {
+            quorum_report_bad(acb, s->bs[item->index]->node_name, 0);
+        }
+    }
+}
+
+static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
+{
+    int i;
+    assert(dest->niov == source->niov);
+    assert(dest->size == source->size);
+    for (i = 0; i < source->niov; i++) {
+        assert(dest->iov[i].iov_len == source->iov[i].iov_len);
+        memcpy(dest->iov[i].iov_base,
+               source->iov[i].iov_base,
+               source->iov[i].iov_len);
+    }
+}
+
+static void quorum_count_vote(QuorumVotes *votes,
+                              QuorumVoteValue *value,
+                              int index)
+{
+    QuorumVoteVersion *v = NULL, *version = NULL;
+    QuorumVoteItem *item;
+
+    /* look if we have something with this hash */
+    QLIST_FOREACH(v, &votes->vote_list, next) {
+        if (!votes->compare(&v->value, value)) {
+            version = v;
+            break;
+        }
+    }
+
+    /* It's a version not yet in the list add it */
+    if (!version) {
+        version = g_new0(QuorumVoteVersion, 1);
+        QLIST_INIT(&version->items);
+        memcpy(&version->value, value, sizeof(version->value));
+        version->index = index;
+        version->vote_count = 0;
+        QLIST_INSERT_HEAD(&votes->vote_list, version, next);
+    }
+
+    version->vote_count++;
+
+    item = g_new0(QuorumVoteItem, 1);
+    item->index = index;
+    QLIST_INSERT_HEAD(&version->items, item, next);
+}
+
+static void quorum_free_vote_list(QuorumVotes *votes)
+{
+    QuorumVoteVersion *version, *next_version;
+    QuorumVoteItem *item, *next_item;
+
+    QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) {
+        QLIST_REMOVE(version, next);
+        QLIST_FOREACH_SAFE(item, &version->items, next, next_item) {
+            QLIST_REMOVE(item, next);
+            g_free(item);
+        }
+        g_free(version);
+    }
+}
+
+static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash)
+{
+    int j, ret;
+    gnutls_hash_hd_t dig;
+    QEMUIOVector *qiov = &acb->qcrs[i].qiov;
+
+    ret = gnutls_hash_init(&dig, GNUTLS_DIG_SHA256);
+
+    if (ret < 0) {
+        return ret;
+    }
+
+    for (j = 0; j < qiov->niov; j++) {
+        ret = gnutls_hash(dig, qiov->iov[j].iov_base, qiov->iov[j].iov_len);
+        if (ret < 0) {
+            break;
+        }
+    }
+
+    gnutls_hash_deinit(dig, (void *) hash);
+    return ret;
+}
+
+static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes)
+{
+    int max = 0;
+    QuorumVoteVersion *candidate, *winner = NULL;
+
+    QLIST_FOREACH(candidate, &votes->vote_list, next) {
+        if (candidate->vote_count > max) {
+            max = candidate->vote_count;
+            winner = candidate;
+        }
+    }
+
+    return winner;
+}
+
+/* qemu_iovec_compare is handy for blkverify mode because it return the first
+ * differing byte location. Yet it is handcoded to compare vectors one byte
+ * after another so it does not benefit from the libc SIMD optimizations.
+ * quorum_iovec_compare is written for speed and should be used in the non
+ * blkverify mode of quorum.
+ */
+static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
+{
+    int i;
+    int result;
+
+    assert(a->niov == b->niov);
+    for (i = 0; i < a->niov; i++) {
+        assert(a->iov[i].iov_len == b->iov[i].iov_len);
+        result = memcmp(a->iov[i].iov_base,
+                        b->iov[i].iov_base,
+                        a->iov[i].iov_len);
+        if (result) {
+            return false;
+        }
+    }
+
+    return true;
+}
+
+static void GCC_FMT_ATTR(2, 3) quorum_err(QuorumAIOCB *acb,
+                                          const char *fmt, ...)
+{
+    va_list ap;
+
+    va_start(ap, fmt);
+    fprintf(stderr, "quorum: sector_num=%" PRId64 " nb_sectors=%d ",
+            acb->sector_num, acb->nb_sectors);
+    vfprintf(stderr, fmt, ap);
+    fprintf(stderr, "\n");
+    va_end(ap);
+    exit(1);
+}
+
+static bool quorum_compare(QuorumAIOCB *acb,
+                           QEMUIOVector *a,
+                           QEMUIOVector *b)
+{
+    BDRVQuorumState *s = acb->common.bs->opaque;
+    ssize_t offset;
+
+    /* This driver will replace blkverify in this particular case */
+    if (s->is_blkverify) {
+        offset = qemu_iovec_compare(a, b);
+        if (offset != -1) {
+            quorum_err(acb, "contents mismatch in sector %" PRId64,
+                       acb->sector_num +
+                       (uint64_t)(offset / BDRV_SECTOR_SIZE));
+        }
+        return true;
+    }
+
+    return quorum_iovec_compare(a, b);
+}
+
+/* Do a vote to get the error code */
+static int quorum_vote_error(QuorumAIOCB *acb)
+{
+    BDRVQuorumState *s = acb->common.bs->opaque;
+    QuorumVoteVersion *winner = NULL;
+    QuorumVotes error_votes;
+    QuorumVoteValue result_value;
+    int i, ret = 0;
+    bool error = false;
+
+    QLIST_INIT(&error_votes.vote_list);
+    error_votes.compare = quorum_64bits_compare;
+
+    for (i = 0; i < s->num_children; i++) {
+        ret = acb->qcrs[i].ret;
+        if (ret) {
+            error = true;
+            result_value.l = ret;
+            quorum_count_vote(&error_votes, &result_value, i);
+        }
+    }
+
+    if (error) {
+        winner = quorum_get_vote_winner(&error_votes);
+        ret = winner->value.l;
+    }
+
+    quorum_free_vote_list(&error_votes);
+
+    return ret;
+}
+
+static void quorum_vote(QuorumAIOCB *acb)
+{
+    bool quorum = true;
+    int i, j, ret;
+    QuorumVoteValue hash;
+    BDRVQuorumState *s = acb->common.bs->opaque;
+    QuorumVoteVersion *winner;
+
+    if (quorum_has_too_much_io_failed(acb)) {
+        return;
+    }
+
+    /* get the index of the first successful read */
+    for (i = 0; i < s->num_children; i++) {
+        if (!acb->qcrs[i].ret) {
+            break;
+        }
+    }
+
+    assert(i < s->num_children);
+
+    /* compare this read with all other successful read stoping at quorum
+     * failure
+     */
+    for (j = i + 1; j < s->num_children; j++) {
+        if (acb->qcrs[j].ret) {
+            continue;
+        }
+        quorum = quorum_compare(acb, &acb->qcrs[i].qiov, &acb->qcrs[j].qiov);
+        if (!quorum) {
+            break;
+       }
+    }
+
+    /* Every successful read agrees */
+    if (quorum) {
+        quorum_copy_qiov(acb->qiov, &acb->qcrs[i].qiov);
+        return;
+    }
+
+    /* compute hashs for each successful read, also store indexes */
+    for (i = 0; i < s->num_children; i++) {
+        if (acb->qcrs[i].ret) {
+            continue;
+        }
+        ret = quorum_compute_hash(acb, i, &hash);
+        /* if ever the hash computation failed */
+        if (ret < 0) {
+            acb->vote_ret = ret;
+            goto free_exit;
+        }
+        quorum_count_vote(&acb->votes, &hash, i);
+    }
+
+    /* vote to select the most represented version */
+    winner = quorum_get_vote_winner(&acb->votes);
+
+    /* if the winner count is smaller than threshold the read fails */
+    if (winner->vote_count < s->threshold) {
+        quorum_report_failure(acb);
+        acb->vote_ret = -EIO;
+        goto free_exit;
+    }
+
+    /* we have a winner: copy it */
+    quorum_copy_qiov(acb->qiov, &acb->qcrs[winner->index].qiov);
+
+    /* some versions are bad print them */
+    quorum_report_bad_versions(s, acb, &winner->value);
+
+free_exit:
+    /* free lists */
+    quorum_free_vote_list(&acb->votes);
+}
+
 static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
                                          int64_t sector_num,
                                          QEMUIOVector *qiov,
diff --git a/configure b/configure
index 5b20ce6..11f0a3a 100755
--- a/configure
+++ b/configure
@@ -264,6 +264,7 @@ gtkabi="2.0"
 tpm="no"
 libssh2=""
 vhdx=""
+quorum="no"
 
 # parse CC options first
 for opt do
@@ -1005,6 +1006,10 @@ for opt do
   ;;
   --disable-vhdx) vhdx="no"
   ;;
+  --disable-quorum) quorum="no"
+  ;;
+  --enable-quorum) quorum="yes"
+  ;;
   *) echo "ERROR: unknown option $opt"; show_help="yes"
   ;;
   esac
@@ -1261,6 +1266,8 @@ Advanced options (experts only):
   --enable-libssh2         enable ssh block device support
   --disable-vhdx           disables support for the Microsoft VHDX image format
   --enable-vhdx            enable support for the Microsoft VHDX image format
+  --disable-quorum         disable quorum block filter support
+  --enable-quorum          enable quorum block filter support
 
 NOTE: The object files are built at the place where configure is launched
 EOF
@@ -1930,6 +1937,30 @@ EOF
 fi
 
 ##########################################
+# Quorum probe (check for gnutls)
+if test "$quorum" != "no" ; then
+cat > $TMPC <<EOF
+#include <gnutls/gnutls.h>
+#include <gnutls/crypto.h>
+int main(void) {char data[4096], digest[32];
+gnutls_hash_fast(GNUTLS_DIG_SHA256, data, 4096, digest);
+return 0;
+}
+EOF
+quorum_tls_cflags=`$pkg_config --cflags gnutls 2> /dev/null`
+quorum_tls_libs=`$pkg_config --libs gnutls 2> /dev/null`
+if compile_prog "$quorum_tls_cflags" "$quorum_tls_libs" ; then
+  qcow_tls=yes
+  libs_softmmu="$quorum_tls_libs $libs_softmmu"
+  libs_tools="$quorum_tls_libs $libs_softmmu"
+  QEMU_CFLAGS="$QEMU_CFLAGS $quorum_tls_cflags"
+else
+  echo "gnutls > 2.10.0 required to compile Quorum"
+  exit 1
+fi
+fi
+
+##########################################
 # VNC SASL detection
 if test "$vnc" = "yes" -a "$vnc_sasl" != "no" ; then
   cat > $TMPC <<EOF
@@ -3865,6 +3896,7 @@ echo "libssh2 support   $libssh2"
 echo "TPM passthrough   $tpm_passthrough"
 echo "QOM debugging     $qom_cast_debug"
 echo "vhdx              $vhdx"
+echo "Quorum            $quorum"
 
 if test "$sdl_too_old" = "yes"; then
 echo "-> Your SDL version is too old - please upgrade to have SDL support"
@@ -4267,6 +4299,10 @@ if test "$libssh2" = "yes" ; then
   echo "CONFIG_LIBSSH2=y" >> $config_host_mak
 fi
 
+if test "$quorum" = "yes" ; then
+  echo "CONFIG_QUORUM=y" >> $config_host_mak
+fi
+
 if test "$virtio_blk_data_plane" = "yes" ; then
   echo 'CONFIG_VIRTIO_BLK_DATA_PLANE=$(CONFIG_VIRTIO)' >> $config_host_mak
 fi
diff --git a/docs/qmp/qmp-events.txt b/docs/qmp/qmp-events.txt
index a378c87..00f9515 100644
--- a/docs/qmp/qmp-events.txt
+++ b/docs/qmp/qmp-events.txt
@@ -500,3 +500,39 @@ Example:
 
 Note: If action is "reset", "shutdown", or "pause" the WATCHDOG event is
 followed respectively by the RESET, SHUTDOWN, or STOP events.
+
+QUORUM_FAILURE
+--------------
+
+Emitted by the Quorum block driver if it fails to establish a quorum.
+
+Data:
+
+- "reference":    device name if defined else node name.
+- "sector-num":   Number of the first sector of the failed read operation.
+- "sector-count": Failed read operation sector count.
+
+Example:
+
+{ "event": "QUORUM_FAILURE",
+     "data": { "reference": "usr1", "sector-num": 345435, "sector-count": 5 },
+     "timestamp": { "seconds": 1344522075, "microseconds": 745528 } }
+
+QUORUM_REPORT_BAD
+-----------------
+
+Emitted to report a corruption of a Quorum file.
+
+Data:
+
+- "ret":          The IO return code.
+- "node-name":    The graph node name of the block driver state.
+- "sector-num":   Number of the first sector of the failed read operation.
+- "sector-count": Failed read operation sector count.
+
+Example:
+
+{ "event": "QUORUM_REPORT_BAD",
+     "data": { "ret": 0, "node-name": "1.raw", "sector-num": 345435,
+               "sector-count": 5 },
+     "timestamp": { "seconds": 1344522075, "microseconds": 745528 } }
diff --git a/include/monitor/monitor.h b/include/monitor/monitor.h
index 7e5f752..a49ea11 100644
--- a/include/monitor/monitor.h
+++ b/include/monitor/monitor.h
@@ -49,6 +49,8 @@ typedef enum MonitorEvent {
     QEVENT_SPICE_MIGRATE_COMPLETED,
     QEVENT_GUEST_PANICKED,
     QEVENT_BLOCK_IMAGE_CORRUPTED,
+    QEVENT_QUORUM_FAILURE,
+    QEVENT_QUORUM_REPORT_BAD,
 
     /* Add to 'monitor_event_names' array in monitor.c when
      * defining new events here */
diff --git a/monitor.c b/monitor.c
index b1ea262..81ffa0f 100644
--- a/monitor.c
+++ b/monitor.c
@@ -507,6 +507,8 @@ static const char *monitor_event_names[] = {
     [QEVENT_SPICE_MIGRATE_COMPLETED] = "SPICE_MIGRATE_COMPLETED",
     [QEVENT_GUEST_PANICKED] = "GUEST_PANICKED",
     [QEVENT_BLOCK_IMAGE_CORRUPTED] = "BLOCK_IMAGE_CORRUPTED",
+    [QEVENT_QUORUM_FAILURE] = "QUORUM_FAILURE",
+    [QEVENT_QUORUM_REPORT_BAD] = "QUORUM_REPORT_BAD",
 };
 QEMU_BUILD_BUG_ON(ARRAY_SIZE(monitor_event_names) != QEVENT_MAX)
 
-- 
1.8.3.2

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH V17 07/12] quorum: Add quorum_getlength().
  2014-02-12 22:06 [Qemu-devel] [PATCH V17 00/12] quorum block filter Benoît Canet
                   ` (5 preceding siblings ...)
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 06/12] quorum: Add quorum mechanism Benoît Canet
@ 2014-02-12 22:06 ` Benoît Canet
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 08/12] quorum: Add quorum_invalidate_cache() Benoît Canet
                   ` (5 subsequent siblings)
  12 siblings, 0 replies; 27+ messages in thread
From: Benoît Canet @ 2014-02-12 22:06 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, Benoît Canet, mreitz, stefanha

From: Benoît Canet <benoit@irqsave.net>

Check that every bs file returns the same length.
Otherwise, return -EIO to disable the quorum and
avoid length discrepancy.

Signed-off-by: Benoit Canet <benoit@irqsave.net>
Reviewed-by: Max Reitz <mreitz@redhat.com>
---
 block/quorum.c | 26 ++++++++++++++++++++++++++
 1 file changed, 26 insertions(+)

diff --git a/block/quorum.c b/block/quorum.c
index bfd8e9d..2e54921 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -600,12 +600,38 @@ static BlockDriverAIOCB *quorum_aio_writev(BlockDriverState *bs,
     return &acb->common;
 }
 
+static int64_t quorum_getlength(BlockDriverState *bs)
+{
+    BDRVQuorumState *s = bs->opaque;
+    int64_t result;
+    int i;
+
+    /* check that all file have the same length */
+    result = bdrv_getlength(s->bs[0]);
+    if (result < 0) {
+        return result;
+    }
+    for (i = 1; i < s->num_children; i++) {
+        int64_t value = bdrv_getlength(s->bs[i]);
+        if (value < 0) {
+            return value;
+        }
+        if (value != result) {
+            return -EIO;
+        }
+    }
+
+    return result;
+}
+
 static BlockDriver bdrv_quorum = {
     .format_name        = "quorum",
     .protocol_name      = "quorum",
 
     .instance_size      = sizeof(BDRVQuorumState),
 
+    .bdrv_getlength     = quorum_getlength,
+
     .bdrv_aio_readv     = quorum_aio_readv,
     .bdrv_aio_writev    = quorum_aio_writev,
 };
-- 
1.8.3.2

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH V17 08/12] quorum: Add quorum_invalidate_cache().
  2014-02-12 22:06 [Qemu-devel] [PATCH V17 00/12] quorum block filter Benoît Canet
                   ` (6 preceding siblings ...)
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 07/12] quorum: Add quorum_getlength() Benoît Canet
@ 2014-02-12 22:06 ` Benoît Canet
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 09/12] quorum: Add quorum_co_flush() Benoît Canet
                   ` (4 subsequent siblings)
  12 siblings, 0 replies; 27+ messages in thread
From: Benoît Canet @ 2014-02-12 22:06 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, Benoît Canet, mreitz, stefanha

From: Benoît Canet <benoit@irqsave.net>

We really want that live migration works with quorum so implement
quorum_invalidate_cache().

Signed-off-by: Benoit Canet <benoit@irqsave.net>
Reviewed-by: Max Reitz <mreitz@redhat.com>
---
 block/quorum.c | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git a/block/quorum.c b/block/quorum.c
index 2e54921..5f8f626 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -624,6 +624,16 @@ static int64_t quorum_getlength(BlockDriverState *bs)
     return result;
 }
 
+static void quorum_invalidate_cache(BlockDriverState *bs)
+{
+    BDRVQuorumState *s = bs->opaque;
+    int i;
+
+    for (i = 0; i < s->num_children; i++) {
+        bdrv_invalidate_cache(s->bs[i]);
+    }
+}
+
 static BlockDriver bdrv_quorum = {
     .format_name        = "quorum",
     .protocol_name      = "quorum",
@@ -634,6 +644,7 @@ static BlockDriver bdrv_quorum = {
 
     .bdrv_aio_readv     = quorum_aio_readv,
     .bdrv_aio_writev    = quorum_aio_writev,
+    .bdrv_invalidate_cache = quorum_invalidate_cache,
 };
 
 static void bdrv_quorum_init(void)
-- 
1.8.3.2

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH V17 09/12] quorum: Add quorum_co_flush().
  2014-02-12 22:06 [Qemu-devel] [PATCH V17 00/12] quorum block filter Benoît Canet
                   ` (7 preceding siblings ...)
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 08/12] quorum: Add quorum_invalidate_cache() Benoît Canet
@ 2014-02-12 22:06 ` Benoît Canet
  2014-02-15  1:54   ` Max Reitz
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 10/12] quorum: Implement recursive .bdrv_recurse_is_first_non_filter in quorum Benoît Canet
                   ` (3 subsequent siblings)
  12 siblings, 1 reply; 27+ messages in thread
From: Benoît Canet @ 2014-02-12 22:06 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, Benoît Canet, mreitz, stefanha

From: Benoît Canet <benoit@irqsave.net>

Makes a vote to select error if any.

Signed-off-by: Benoit Canet <benoit@irqsave.net>
---
 block/quorum.c | 28 ++++++++++++++++++++++++++++
 1 file changed, 28 insertions(+)

diff --git a/block/quorum.c b/block/quorum.c
index 5f8f626..818c72c 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -634,12 +634,40 @@ static void quorum_invalidate_cache(BlockDriverState *bs)
     }
 }
 
+static coroutine_fn int quorum_co_flush(BlockDriverState *bs)
+{
+    BDRVQuorumState *s = bs->opaque;
+    QuorumVoteVersion *winner = NULL;
+    QuorumVotes error_votes;
+    QuorumVoteValue result_value;
+    int i;
+    int result = 0;
+
+    QLIST_INIT(&error_votes.vote_list);
+    error_votes.compare = quorum_64bits_compare;
+
+    for (i = 0; i < s->num_children; i++) {
+        result = bdrv_co_flush(s->bs[i]);
+        result_value.l = result;
+        quorum_count_vote(&error_votes, &result_value, i);
+    }
+
+    winner = quorum_get_vote_winner(&error_votes);
+    result = winner->value.l;
+
+    quorum_free_vote_list(&error_votes);
+
+    return result;
+}
+
 static BlockDriver bdrv_quorum = {
     .format_name        = "quorum",
     .protocol_name      = "quorum",
 
     .instance_size      = sizeof(BDRVQuorumState),
 
+    .bdrv_co_flush_to_disk = quorum_co_flush,
+
     .bdrv_getlength     = quorum_getlength,
 
     .bdrv_aio_readv     = quorum_aio_readv,
-- 
1.8.3.2

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH V17 10/12] quorum: Implement recursive .bdrv_recurse_is_first_non_filter in quorum.
  2014-02-12 22:06 [Qemu-devel] [PATCH V17 00/12] quorum block filter Benoît Canet
                   ` (8 preceding siblings ...)
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 09/12] quorum: Add quorum_co_flush() Benoît Canet
@ 2014-02-12 22:06 ` Benoît Canet
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 11/12] quorum: Add quorum_open() and quorum_close() Benoît Canet
                   ` (2 subsequent siblings)
  12 siblings, 0 replies; 27+ messages in thread
From: Benoît Canet @ 2014-02-12 22:06 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, Benoît Canet, mreitz, stefanha

From: Benoît Canet <benoit@irqsave.net>

This is used to activate quorum snapshot.

Signed-off-by: Benoit Canet <benoit@irqsave.net>
Reviewed-by: Max Reitz <mreitz@redhat.com>
---
 block/quorum.c | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)

diff --git a/block/quorum.c b/block/quorum.c
index 818c72c..8e033ef 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -660,6 +660,23 @@ static coroutine_fn int quorum_co_flush(BlockDriverState *bs)
     return result;
 }
 
+static bool quorum_recurse_is_first_non_filter(BlockDriverState *bs,
+                                               BlockDriverState *candidate)
+{
+    BDRVQuorumState *s = bs->opaque;
+    int i;
+
+    for (i = 0; i < s->num_children; i++) {
+        bool perm = bdrv_recurse_is_first_non_filter(s->bs[i],
+                                                     candidate);
+        if (perm) {
+            return true;
+        }
+    }
+
+    return false;
+}
+
 static BlockDriver bdrv_quorum = {
     .format_name        = "quorum",
     .protocol_name      = "quorum",
@@ -673,6 +690,8 @@ static BlockDriver bdrv_quorum = {
     .bdrv_aio_readv     = quorum_aio_readv,
     .bdrv_aio_writev    = quorum_aio_writev,
     .bdrv_invalidate_cache = quorum_invalidate_cache,
+
+    .bdrv_recurse_is_first_non_filter = quorum_recurse_is_first_non_filter,
 };
 
 static void bdrv_quorum_init(void)
-- 
1.8.3.2

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH V17 11/12] quorum: Add quorum_open() and quorum_close().
  2014-02-12 22:06 [Qemu-devel] [PATCH V17 00/12] quorum block filter Benoît Canet
                   ` (9 preceding siblings ...)
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 10/12] quorum: Implement recursive .bdrv_recurse_is_first_non_filter in quorum Benoît Canet
@ 2014-02-12 22:06 ` Benoît Canet
  2014-02-13  9:09   ` Benoît Canet
  2014-02-15  2:02   ` Max Reitz
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 12/12] quorum: Add unit test Benoît Canet
  2014-02-15  1:15 ` [Qemu-devel] [PATCH V17 00/12] quorum block filter Max Reitz
  12 siblings, 2 replies; 27+ messages in thread
From: Benoît Canet @ 2014-02-12 22:06 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, Benoît Canet, mreitz, stefanha

From: Benoît Canet <benoit@irqsave.net>

Example of command line:
-drive if=virtio,file.driver=quorum,\
file.children.0.file.filename=1.raw,\
file.children.0.node-name=1.raw,\
file.children.0.driver=raw,\
file.children.1.file.filename=2.raw,\
file.children.1.node-name=2.raw,\
file.children.1.driver=raw,\
file.children.2.file.filename=3.raw,\
file.children.2.node-name=3.raw,\
file.children.2.driver=raw,\
file.vote-threshold=2

file.blkverify=on with file.vote-threshold=2 and two files can be passed to
emulated blkverify.

Signed-off-by: Benoit Canet <benoit@irqsave.net>
---
 block/quorum.c   | 165 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 monitor.c        |   2 +
 qapi-schema.json |  21 ++++++-
 3 files changed, 187 insertions(+), 1 deletion(-)

diff --git a/block/quorum.c b/block/quorum.c
index 8e033ef..d3a90fb 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -20,6 +20,9 @@
 
 #define HASH_LENGTH 32
 
+#define QUORUM_OPT_VOTE_THRESHOLD "vote-threshold"
+#define QUORUM_OPT_BLKVERIFY      "blkverify"
+
 /* This union holds a vote hash value */
 typedef union QuorumVoteValue {
     char h[HASH_LENGTH];       /* SHA-256 hash */
@@ -677,12 +680,174 @@ static bool quorum_recurse_is_first_non_filter(BlockDriverState *bs,
     return false;
 }
 
+static int quorum_valid_threshold(int threshold,
+                                  int num_children,
+                                  Error **errp)
+{
+
+    if (threshold < 1) {
+        error_set(errp, QERR_INVALID_PARAMETER_VALUE,
+                  "vote-threshold", "value >= 1");
+        return -ERANGE;
+    }
+
+    if (threshold > num_children) {
+        error_setg(errp, "threshold may not exceed children count");
+        return -ERANGE;
+    }
+
+    return 0;
+}
+
+static QemuOptsList quorum_runtime_opts = {
+    .name = "quorum",
+    .head = QTAILQ_HEAD_INITIALIZER(quorum_runtime_opts.head),
+    .desc = {
+        {
+            .name = QUORUM_OPT_VOTE_THRESHOLD,
+            .type = QEMU_OPT_NUMBER,
+            .help = "The number of vote needed for reaching quorum",
+        },
+        {
+            .name = QUORUM_OPT_BLKVERIFY,
+            .type = QEMU_OPT_BOOL,
+            .help = "Trigger block verify mode if set",
+        },
+        { /* end of list */ }
+    },
+};
+
+static int quorum_open(BlockDriverState *bs,
+                       QDict *options,
+                       int flags,
+                       Error **errp)
+{
+    BDRVQuorumState *s = bs->opaque;
+    Error *local_err = NULL;
+    QemuOpts *opts;
+    bool *opened;
+    QDict *sub = NULL;
+    QList *list = NULL;
+    const QListEntry *lentry;
+    const QDictEntry *dentry;
+    int i;
+    int ret = 0;
+
+    qdict_flatten(options);
+    qdict_extract_subqdict(options, &sub, "children.");
+    qdict_array_split(sub, &list);
+
+    /* count how many different children are present and validate
+     * qdict_size(sub) address the open by reference case
+     */
+    s->num_children = !qlist_size(list) ? qdict_size(sub) : qlist_size(list);
+    if (s->num_children < 2) {
+        error_setg(&local_err,
+                   "Number of provided children must be greater than 1");
+        ret = -EINVAL;
+        goto exit;
+    }
+
+    opts = qemu_opts_create(&quorum_runtime_opts, NULL, 0, &error_abort);
+    qemu_opts_absorb_qdict(opts, options, &local_err);
+    if (error_is_set(&local_err)) {
+        ret = -EINVAL;
+        goto exit;
+    }
+
+    s->threshold = qemu_opt_get_number(opts, QUORUM_OPT_VOTE_THRESHOLD, 0);
+
+    /* and validate it against s->num_children */
+    ret = quorum_valid_threshold(s->threshold, s->num_children, &local_err);
+    if (ret < 0) {
+        goto exit;
+    }
+
+    /* is the driver in blkverify mode */
+    if (qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false) &&
+        s->num_children == 2 && s->threshold == 2) {
+        s->is_blkverify = true;
+    }
+
+    /* allocate the children BlockDriverState array */
+    s->bs = g_new0(BlockDriverState *, s->num_children);
+    opened = g_new0(bool, s->num_children);
+
+    /* Open by file name or options dict (command line or QMP) */
+    if (s->num_children == qlist_size(list)) {
+        for (i = 0, lentry = qlist_first(list); lentry;
+            lentry = qlist_next(lentry), i++) {
+            QDict *d = qobject_to_qdict(lentry->value);
+            QINCREF(d);
+            ret = bdrv_open(&s->bs[i], NULL, NULL,
+                            d,
+                            flags, NULL, &local_err);
+            if (ret < 0) {
+                goto close_exit;
+            }
+            opened[i] = true;
+        }
+    /* Open by QMP references */
+    } else {
+        for (i = 0, dentry = qdict_first(sub); dentry;
+             dentry = qdict_next(sub, dentry), i++) {
+            QString *string = qobject_to_qstring(dentry->value);
+            ret = bdrv_open(&s->bs[i], NULL,
+                            qstring_get_str(string),
+                            NULL, flags, NULL, &local_err);
+            if (ret < 0) {
+                goto close_exit;
+            }
+            opened[i] = true;
+        }
+    }
+
+    g_free(opened);
+    goto exit;
+
+close_exit:
+    /* cleanup on error */
+    for (i = 0; i < s->num_children; i++) {
+        if (!opened[i]) {
+            continue;
+        }
+        bdrv_unref(s->bs[i]);
+    }
+    g_free(s->bs);
+    g_free(opened);
+exit:
+    /* propagate error */
+    if (error_is_set(&local_err)) {
+        error_propagate(errp, local_err);
+    }
+    QDECREF(list);
+    QDECREF(sub);
+    return ret;
+}
+
+static void quorum_close(BlockDriverState *bs)
+{
+    BDRVQuorumState *s = bs->opaque;
+    int i;
+
+    for (i = 0; i < s->num_children; i++) {
+        bdrv_unref(s->bs[i]);
+    }
+
+    g_free(s->bs);
+}
+
 static BlockDriver bdrv_quorum = {
     .format_name        = "quorum",
     .protocol_name      = "quorum",
 
     .instance_size      = sizeof(BDRVQuorumState),
 
+    .bdrv_file_open     = quorum_open,
+    .bdrv_close         = quorum_close,
+
+    .authorizations     = { true, true },
+
     .bdrv_co_flush_to_disk = quorum_co_flush,
 
     .bdrv_getlength     = quorum_getlength,
diff --git a/monitor.c b/monitor.c
index 81ffa0f..8b175fb 100644
--- a/monitor.c
+++ b/monitor.c
@@ -639,6 +639,8 @@ static void monitor_protocol_event_init(void)
     monitor_protocol_event_throttle(QEVENT_RTC_CHANGE, 1000);
     monitor_protocol_event_throttle(QEVENT_BALLOON_CHANGE, 1000);
     monitor_protocol_event_throttle(QEVENT_WATCHDOG, 1000);
+    monitor_protocol_event_throttle(QEVENT_QUORUM_REPORT_BAD, 1000);
+    monitor_protocol_event_throttle(QEVENT_QUORUM_FAILURE, 1000);
 }
 
 /**
diff --git a/qapi-schema.json b/qapi-schema.json
index 7cfb5e5..990d0c5 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -4352,6 +4352,24 @@
             'raw': 'BlockdevRef' } }
 
 ##
+# @BlockdevOptionsQuorum
+#
+# Driver specific block device options for Quorum
+#
+# @blkverify:      #optional true if the driver must print content mismatch
+#
+# @children:       the children block device to use
+#
+# @vote_threshold: the vote limit under which a read will fail
+#
+# Since: 2.0
+##
+{ 'type': 'BlockdevOptionsQuorum',
+  'data': { '*blkverify': 'bool',
+            'children': [ 'BlockdevRef' ],
+            'vote-threshold': 'int' } }
+
+##
 # @BlockdevOptions
 #
 # Options for creating a block device.
@@ -4390,7 +4408,8 @@
       'vdi':        'BlockdevOptionsGenericFormat',
       'vhdx':       'BlockdevOptionsGenericFormat',
       'vmdk':       'BlockdevOptionsGenericCOWFormat',
-      'vpc':        'BlockdevOptionsGenericFormat'
+      'vpc':        'BlockdevOptionsGenericFormat',
+      'quorum':     'BlockdevOptionsQuorum'
   } }
 
 ##
-- 
1.8.3.2

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH V17 12/12] quorum: Add unit test.
  2014-02-12 22:06 [Qemu-devel] [PATCH V17 00/12] quorum block filter Benoît Canet
                   ` (10 preceding siblings ...)
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 11/12] quorum: Add quorum_open() and quorum_close() Benoît Canet
@ 2014-02-12 22:06 ` Benoît Canet
  2014-02-15  1:15 ` [Qemu-devel] [PATCH V17 00/12] quorum block filter Max Reitz
  12 siblings, 0 replies; 27+ messages in thread
From: Benoît Canet @ 2014-02-12 22:06 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, Benoît Canet, famz, Benoit Canet, mreitz, stefanha

Signed-off-by: Benoit Canet <benoit@irqsave.net>
Reviewed-by: Max Reitz <mreitz@redhat.com>
---
 tests/qemu-iotests/081     | 95 ++++++++++++++++++++++++++++++++++++++++++++++
 tests/qemu-iotests/081.out | 34 +++++++++++++++++
 tests/qemu-iotests/group   |  1 +
 3 files changed, 130 insertions(+)
 create mode 100755 tests/qemu-iotests/081
 create mode 100644 tests/qemu-iotests/081.out

diff --git a/tests/qemu-iotests/081 b/tests/qemu-iotests/081
new file mode 100755
index 0000000..be34544
--- /dev/null
+++ b/tests/qemu-iotests/081
@@ -0,0 +1,95 @@
+#!/bin/bash
+#
+# Test Quorum block driver
+#
+# Copyright (C) 2013 Nodalink, SARL.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+
+# creator
+owner=benoit@irqsave.net
+
+seq=`basename $0`
+echo "QA output created by $seq"
+
+here=`pwd`
+tmp=/tmp/$$
+status=1	# failure is the default!
+
+_cleanup()
+{
+    rm -rf $TEST_DIR/1.raw
+    rm -rf $TEST_DIR/2.raw
+    rm -rf $TEST_DIR/3.raw
+}
+trap "_cleanup; exit \$status" 0 1 2 3 15
+
+# get standard environment, filters and checks
+. ./common.rc
+. ./common.filter
+
+_supported_fmt raw
+_supported_proto generic
+_supported_os Linux
+
+quorum="file.driver=quorum,file.children.0.file.filename=$TEST_DIR/1.raw"
+quorum="$quorum,file.children.1.file.filename=$TEST_DIR/2.raw"
+quorum="$quorum,file.children.2.file.filename=$TEST_DIR/3.raw,file.vote-threshold=2"
+
+echo
+echo "== creating quorum files =="
+
+size=10M
+
+TEST_IMG="$TEST_DIR/1.raw" _make_test_img $size
+TEST_IMG="$TEST_DIR/2.raw" _make_test_img $size
+TEST_IMG="$TEST_DIR/3.raw" _make_test_img $size
+
+echo
+echo "== writing images =="
+
+$QEMU_IO -c "open -o $quorum" -c "write -P 0x32 0 $size" | _filter_qemu_io
+
+echo
+echo "== checking quorum write =="
+
+$QEMU_IO -c "read -P 0x32 0 $size" "$TEST_DIR/1.raw" | _filter_qemu_io
+$QEMU_IO -c "read -P 0x32 0 $size" "$TEST_DIR/2.raw" | _filter_qemu_io
+$QEMU_IO -c "read -P 0x32 0 $size" "$TEST_DIR/3.raw" | _filter_qemu_io
+
+echo
+echo "== corrupting image =="
+
+$QEMU_IO -c "write -P 0x42 0 $size" "$TEST_DIR/2.raw" | _filter_qemu_io
+
+echo
+echo "== checking quorum correction =="
+
+$QEMU_IO -c "open -o $quorum" -c "read -P 0x32 0 $size" | _filter_qemu_io
+
+echo
+echo "== breaking quorum =="
+
+$QEMU_IO -c "write -P 0x41 0 $size" "$TEST_DIR/1.raw" | _filter_qemu_io
+echo
+echo "== checking that quorum is broken =="
+
+$QEMU_IO -c "open -o $quorum" -c "read -P 0x32 0 $size" | _filter_qemu_io
+
+
+# success, all done
+echo "*** done"
+rm -f $seq.full
+status=0
diff --git a/tests/qemu-iotests/081.out b/tests/qemu-iotests/081.out
new file mode 100644
index 0000000..b5b55ab
--- /dev/null
+++ b/tests/qemu-iotests/081.out
@@ -0,0 +1,34 @@
+QA output created by 081
+
+== creating quorum files ==
+Formatting 'TEST_DIR/1.IMGFMT', fmt=IMGFMT size=10485760 
+Formatting 'TEST_DIR/2.IMGFMT', fmt=IMGFMT size=10485760 
+Formatting 'TEST_DIR/3.IMGFMT', fmt=IMGFMT size=10485760 
+
+== writing images ==
+wrote 10485760/10485760 bytes at offset 0
+10 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+
+== checking quorum write ==
+read 10485760/10485760 bytes at offset 0
+10 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+read 10485760/10485760 bytes at offset 0
+10 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+read 10485760/10485760 bytes at offset 0
+10 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+
+== corrupting image ==
+wrote 10485760/10485760 bytes at offset 0
+10 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+
+== checking quorum correction ==
+read 10485760/10485760 bytes at offset 0
+10 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+
+== breaking quorum ==
+wrote 10485760/10485760 bytes at offset 0
+10 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+
+== checking that quorum is broken ==
+qemu-io: can't open device (null): Could not read image for determining its format: Input/output error
+*** done
diff --git a/tests/qemu-iotests/group b/tests/qemu-iotests/group
index d8be74a..fe5d764 100644
--- a/tests/qemu-iotests/group
+++ b/tests/qemu-iotests/group
@@ -83,3 +83,4 @@
 074 rw auto
 077 rw auto
 079 rw auto
+081 rw auto
-- 
1.8.3.2

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH V17 11/12] quorum: Add quorum_open() and quorum_close().
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 11/12] quorum: Add quorum_open() and quorum_close() Benoît Canet
@ 2014-02-13  9:09   ` Benoît Canet
  2014-02-18  4:06     ` Fam Zheng
  2014-02-15  2:02   ` Max Reitz
  1 sibling, 1 reply; 27+ messages in thread
From: Benoît Canet @ 2014-02-13  9:09 UTC (permalink / raw)
  To: Benoît Canet; +Cc: kwolf, famz, qemu-devel, mreitz, stefanha

The Wednesday 12 Feb 2014 à 23:06:38 (+0100), Benoît Canet wrote :
> From: Benoît Canet <benoit@irqsave.net>
> 
> Example of command line:
> -drive if=virtio,file.driver=quorum,\
> file.children.0.file.filename=1.raw,\
> file.children.0.node-name=1.raw,\
> file.children.0.driver=raw,\
> file.children.1.file.filename=2.raw,\
> file.children.1.node-name=2.raw,\
> file.children.1.driver=raw,\
> file.children.2.file.filename=3.raw,\
> file.children.2.node-name=3.raw,\
> file.children.2.driver=raw,\
> file.vote-threshold=2

Extra file. are wrong.

> 
> file.blkverify=on with file.vote-threshold=2 and two files can be passed to
> emulated blkverify.
> 
> Signed-off-by: Benoit Canet <benoit@irqsave.net>
> ---
>  block/quorum.c   | 165 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
>  monitor.c        |   2 +
>  qapi-schema.json |  21 ++++++-
>  3 files changed, 187 insertions(+), 1 deletion(-)
> 
> diff --git a/block/quorum.c b/block/quorum.c
> index 8e033ef..d3a90fb 100644
> --- a/block/quorum.c
> +++ b/block/quorum.c
> @@ -20,6 +20,9 @@
>  
>  #define HASH_LENGTH 32
>  
> +#define QUORUM_OPT_VOTE_THRESHOLD "vote-threshold"
> +#define QUORUM_OPT_BLKVERIFY      "blkverify"
> +
>  /* This union holds a vote hash value */
>  typedef union QuorumVoteValue {
>      char h[HASH_LENGTH];       /* SHA-256 hash */
> @@ -677,12 +680,174 @@ static bool quorum_recurse_is_first_non_filter(BlockDriverState *bs,
>      return false;
>  }
>  
> +static int quorum_valid_threshold(int threshold,
> +                                  int num_children,
> +                                  Error **errp)
> +{
> +
> +    if (threshold < 1) {
> +        error_set(errp, QERR_INVALID_PARAMETER_VALUE,
> +                  "vote-threshold", "value >= 1");
> +        return -ERANGE;
> +    }
> +
> +    if (threshold > num_children) {
> +        error_setg(errp, "threshold may not exceed children count");
> +        return -ERANGE;
> +    }
> +
> +    return 0;
> +}
> +
> +static QemuOptsList quorum_runtime_opts = {
> +    .name = "quorum",
> +    .head = QTAILQ_HEAD_INITIALIZER(quorum_runtime_opts.head),
> +    .desc = {
> +        {
> +            .name = QUORUM_OPT_VOTE_THRESHOLD,
> +            .type = QEMU_OPT_NUMBER,
> +            .help = "The number of vote needed for reaching quorum",
> +        },
> +        {
> +            .name = QUORUM_OPT_BLKVERIFY,
> +            .type = QEMU_OPT_BOOL,
> +            .help = "Trigger block verify mode if set",
> +        },
> +        { /* end of list */ }
> +    },
> +};
> +
> +static int quorum_open(BlockDriverState *bs,
> +                       QDict *options,
> +                       int flags,
> +                       Error **errp)
> +{
> +    BDRVQuorumState *s = bs->opaque;
> +    Error *local_err = NULL;
> +    QemuOpts *opts;
> +    bool *opened;
> +    QDict *sub = NULL;
> +    QList *list = NULL;
> +    const QListEntry *lentry;
> +    const QDictEntry *dentry;
> +    int i;
> +    int ret = 0;
> +
> +    qdict_flatten(options);
> +    qdict_extract_subqdict(options, &sub, "children.");
> +    qdict_array_split(sub, &list);
> +
> +    /* count how many different children are present and validate
> +     * qdict_size(sub) address the open by reference case
> +     */
> +    s->num_children = !qlist_size(list) ? qdict_size(sub) : qlist_size(list);
> +    if (s->num_children < 2) {
> +        error_setg(&local_err,
> +                   "Number of provided children must be greater than 1");
> +        ret = -EINVAL;
> +        goto exit;
> +    }
> +
> +    opts = qemu_opts_create(&quorum_runtime_opts, NULL, 0, &error_abort);
> +    qemu_opts_absorb_qdict(opts, options, &local_err);
> +    if (error_is_set(&local_err)) {
> +        ret = -EINVAL;
> +        goto exit;
> +    }
> +
> +    s->threshold = qemu_opt_get_number(opts, QUORUM_OPT_VOTE_THRESHOLD, 0);
> +
> +    /* and validate it against s->num_children */
> +    ret = quorum_valid_threshold(s->threshold, s->num_children, &local_err);
> +    if (ret < 0) {
> +        goto exit;
> +    }
> +
> +    /* is the driver in blkverify mode */
> +    if (qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false) &&
> +        s->num_children == 2 && s->threshold == 2) {
> +        s->is_blkverify = true;
> +    }
> +
> +    /* allocate the children BlockDriverState array */
> +    s->bs = g_new0(BlockDriverState *, s->num_children);
> +    opened = g_new0(bool, s->num_children);
> +
> +    /* Open by file name or options dict (command line or QMP) */
> +    if (s->num_children == qlist_size(list)) {
> +        for (i = 0, lentry = qlist_first(list); lentry;
> +            lentry = qlist_next(lentry), i++) {
> +            QDict *d = qobject_to_qdict(lentry->value);
> +            QINCREF(d);
> +            ret = bdrv_open(&s->bs[i], NULL, NULL,
> +                            d,
> +                            flags, NULL, &local_err);
> +            if (ret < 0) {
> +                goto close_exit;
> +            }
> +            opened[i] = true;
> +        }
> +    /* Open by QMP references */
> +    } else {
> +        for (i = 0, dentry = qdict_first(sub); dentry;
> +             dentry = qdict_next(sub, dentry), i++) {
> +            QString *string = qobject_to_qstring(dentry->value);
> +            ret = bdrv_open(&s->bs[i], NULL,
> +                            qstring_get_str(string),
> +                            NULL, flags, NULL, &local_err);
> +            if (ret < 0) {
> +                goto close_exit;
> +            }
> +            opened[i] = true;
> +        }
> +    }
> +
> +    g_free(opened);
> +    goto exit;
> +
> +close_exit:
> +    /* cleanup on error */
> +    for (i = 0; i < s->num_children; i++) {
> +        if (!opened[i]) {
> +            continue;
> +        }
> +        bdrv_unref(s->bs[i]);
> +    }
> +    g_free(s->bs);
> +    g_free(opened);
> +exit:
> +    /* propagate error */
> +    if (error_is_set(&local_err)) {
> +        error_propagate(errp, local_err);
> +    }
> +    QDECREF(list);
> +    QDECREF(sub);
> +    return ret;
> +}
> +
> +static void quorum_close(BlockDriverState *bs)
> +{
> +    BDRVQuorumState *s = bs->opaque;
> +    int i;
> +
> +    for (i = 0; i < s->num_children; i++) {
> +        bdrv_unref(s->bs[i]);
Quorum crash here from time to time I don't understand why.

> +    }
> +
> +    g_free(s->bs);
> +}
> +
>  static BlockDriver bdrv_quorum = {
>      .format_name        = "quorum",
>      .protocol_name      = "quorum",
>  
>      .instance_size      = sizeof(BDRVQuorumState),
>  
> +    .bdrv_file_open     = quorum_open,
> +    .bdrv_close         = quorum_close,
> +
> +    .authorizations     = { true, true },
> +
>      .bdrv_co_flush_to_disk = quorum_co_flush,
>  
>      .bdrv_getlength     = quorum_getlength,
> diff --git a/monitor.c b/monitor.c
> index 81ffa0f..8b175fb 100644
> --- a/monitor.c
> +++ b/monitor.c
> @@ -639,6 +639,8 @@ static void monitor_protocol_event_init(void)
>      monitor_protocol_event_throttle(QEVENT_RTC_CHANGE, 1000);
>      monitor_protocol_event_throttle(QEVENT_BALLOON_CHANGE, 1000);
>      monitor_protocol_event_throttle(QEVENT_WATCHDOG, 1000);
> +    monitor_protocol_event_throttle(QEVENT_QUORUM_REPORT_BAD, 1000);
> +    monitor_protocol_event_throttle(QEVENT_QUORUM_FAILURE, 1000);
>  }
>  
>  /**
> diff --git a/qapi-schema.json b/qapi-schema.json
> index 7cfb5e5..990d0c5 100644
> --- a/qapi-schema.json
> +++ b/qapi-schema.json
> @@ -4352,6 +4352,24 @@
>              'raw': 'BlockdevRef' } }
>  
>  ##
> +# @BlockdevOptionsQuorum
> +#
> +# Driver specific block device options for Quorum
> +#
> +# @blkverify:      #optional true if the driver must print content mismatch
> +#
> +# @children:       the children block device to use
> +#
> +# @vote_threshold: the vote limit under which a read will fail
> +#
> +# Since: 2.0
> +##
> +{ 'type': 'BlockdevOptionsQuorum',
> +  'data': { '*blkverify': 'bool',
> +            'children': [ 'BlockdevRef' ],
> +            'vote-threshold': 'int' } }
> +
> +##
>  # @BlockdevOptions
>  #
>  # Options for creating a block device.
> @@ -4390,7 +4408,8 @@
>        'vdi':        'BlockdevOptionsGenericFormat',
>        'vhdx':       'BlockdevOptionsGenericFormat',
>        'vmdk':       'BlockdevOptionsGenericCOWFormat',
> -      'vpc':        'BlockdevOptionsGenericFormat'
> +      'vpc':        'BlockdevOptionsGenericFormat',
> +      'quorum':     'BlockdevOptionsQuorum'
>    } }
>  
>  ##
> -- 
> 1.8.3.2
> 

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH V17 00/12] quorum block filter
  2014-02-12 22:06 [Qemu-devel] [PATCH V17 00/12] quorum block filter Benoît Canet
                   ` (11 preceding siblings ...)
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 12/12] quorum: Add unit test Benoît Canet
@ 2014-02-15  1:15 ` Max Reitz
  2014-02-15  2:04   ` Max Reitz
  12 siblings, 1 reply; 27+ messages in thread
From: Max Reitz @ 2014-02-15  1:15 UTC (permalink / raw)
  To: Benoît Canet, qemu-devel; +Cc: kwolf, famz, stefanha

On 12.02.2014 23:06, Benoît Canet wrote:
> I post this for review in prevision of 2.0 feature freeze.
> Even if the series look correct please wait before merging because:
>
> The QMP events in the "Add quorum mechanism" definitively needs to be reviewed
> by Eric as they where changed.
>
> I did not found any bugs while testing this version but I am willing to test the
> code further before it's applied even it's reviewed by.
>
> Best regards
>
> Benoît

Just one thing in general: You left my reviewed-by note on many patches 
of the series although basically every single patch has changed (some 
more, some less). This makes it harder for me to review (okay, not 
really; I can (and should) still compare with the old versions for the 
actual changes, but if I fail to do so...) and can cause some patches to 
be merged because "I reviewed it" (although I actually didn't) - 
probably not in this case since it's hard here to pick single patches to 
merge, but well...

Max

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH V17 01/12] quorum: Create quorum.c, add QuorumChildRequest and QuorumAIOCB.
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 01/12] quorum: Create quorum.c, add QuorumChildRequest and QuorumAIOCB Benoît Canet
@ 2014-02-15  1:17   ` Max Reitz
  0 siblings, 0 replies; 27+ messages in thread
From: Max Reitz @ 2014-02-15  1:17 UTC (permalink / raw)
  To: Benoît Canet, qemu-devel; +Cc: kwolf, famz, Benoît Canet, stefanha

On 12.02.2014 23:06, Benoît Canet wrote:
> From: Benoît Canet <benoit@irqsave.net>
>
> Quorum is a block filter mirroring writes to num_children children.
> For reads quorum reads each children and does a vote.
> If more than vote_threshold versions are identicals the quorum is reached and

-pedantic: "identical", without "s"

> this winning version is returned to the guest. So quorum prevents bit corruption.
> For high availability purpose minority errors are reported via QMP but the guest
> does not see them.
>
> This patch create the driver C source file and introduce the structures that

"creates" and "introduces", with "s"

But since I'm not in -Werror mode, you may leave the reviewed-by. ;-)

Max

> will be used in asynchronous reads and writes.
>
> Signed-off-by: Benoit Canet <benoit@irqsave.net>
> Reviewed-by: Max Reitz <mreitz@redhat.com>
> ---
>   block/Makefile.objs |  1 +
>   block/quorum.c      | 53 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>   2 files changed, 54 insertions(+)
>   create mode 100644 block/quorum.c
>
> diff --git a/block/Makefile.objs b/block/Makefile.objs
> index e254a21..716556f 100644
> --- a/block/Makefile.objs
> +++ b/block/Makefile.objs
> @@ -3,6 +3,7 @@ block-obj-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-c
>   block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
>   block-obj-y += qed-check.o
>   block-obj-$(CONFIG_VHDX) += vhdx.o vhdx-endian.o vhdx-log.o
> +block-obj-y += quorum.o
>   block-obj-y += parallels.o blkdebug.o blkverify.o
>   block-obj-y += snapshot.o qapi.o
>   block-obj-$(CONFIG_WIN32) += raw-win32.o win32-aio.o
> diff --git a/block/quorum.c b/block/quorum.c
> new file mode 100644
> index 0000000..950f5cc
> --- /dev/null
> +++ b/block/quorum.c
> @@ -0,0 +1,53 @@
> +/*
> + * Quorum Block filter
> + *
> + * Copyright (C) 2012-2014 Nodalink, EURL.
> + *
> + * Author:
> + *   Benoît Canet <benoit.canet@irqsave.net>
> + *
> + * Based on the design and code of blkverify.c (Copyright (C) 2010 IBM, Corp)
> + * and blkmirror.c (Copyright (C) 2011 Red Hat, Inc).
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
> + * See the COPYING file in the top-level directory.
> + */
> +
> +#include "block/block_int.h"
> +
> +typedef struct QuorumAIOCB QuorumAIOCB;
> +
> +/* Quorum will create one instance of the following structure per operation it
> + * performs on its children.
> + * So for each read/write operation coming from the upper layer there will be
> + * $children_count QuorumChildRequest.
> + */
> +typedef struct QuorumChildRequest {
> +    BlockDriverAIOCB *aiocb;
> +    QEMUIOVector qiov;
> +    uint8_t *buf;
> +    int ret;
> +    QuorumAIOCB *parent;
> +} QuorumChildRequest;
> +
> +/* Quorum will use the following structure to track progress of each read/write
> + * operation received by the upper layer.
> + * This structure hold pointers to the QuorumChildRequest structures instances
> + * used to do operations on each children and track overall progress.
> + */
> +struct QuorumAIOCB {
> +    BlockDriverAIOCB common;
> +
> +    /* Request metadata */
> +    uint64_t sector_num;
> +    int nb_sectors;
> +
> +    QEMUIOVector *qiov;         /* calling IOV */
> +
> +    QuorumChildRequest *qcrs;   /* individual child requests */
> +    int count;                  /* number of completed AIOCB */
> +    int success_count;          /* number of successfully completed AIOCB */
> +
> +    bool is_read;
> +    int vote_ret;
> +};

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH V17 02/12] quorum: Create BDRVQuorumState and BlkDriver and do init.
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 02/12] quorum: Create BDRVQuorumState and BlkDriver and do init Benoît Canet
@ 2014-02-15  1:22   ` Max Reitz
  0 siblings, 0 replies; 27+ messages in thread
From: Max Reitz @ 2014-02-15  1:22 UTC (permalink / raw)
  To: Benoît Canet, qemu-devel; +Cc: kwolf, famz, Benoît Canet, stefanha

On 12.02.2014 23:06, Benoît Canet wrote:
> From: Benoît Canet <benoit@irqsave.net>
>
> Create the structure holding the quorum settings and write the minimal block
> driver instanciation boilerplate.
>
> Signed-off-by: Benoit Canet <benoit@irqsave.net>
> Reviewed-by: Max Reitz <mreitz@redhat.com>
> ---
>   block/quorum.c | 31 +++++++++++++++++++++++++++++++
>   1 file changed, 31 insertions(+)
>
> diff --git a/block/quorum.c b/block/quorum.c
> index 950f5cc..36c5bb8 100644
> --- a/block/quorum.c
> +++ b/block/quorum.c
> @@ -15,6 +15,23 @@
>   
>   #include "block/block_int.h"
>   
> +/* the following structure holds the state of one quorum instance */
> +typedef struct BDRVQuorumState {
> +    BlockDriverState **bs; /* children BlockDriverStates */
> +    int num_children;      /* children count */
> +    int threshold;         /* if less than threshold children reads gave the
> +                            * same result a quorum error occurs.
> +                            */
> +    bool is_blkverify;     /* true if the driver is in blkverify mode
> +                            * Writes are mirrored on two children devices.
> +                            * On reads the two children devices contents are

Probably better with apostrophe: "children devices' contents".

> +                            * compared and when a difference is spotted its

Since we're not rooting for an error, I'd prefer an "if" instead of "when".

> +                            * location is printed and the code abort.

"aborts"

Reviewed-by: Max Reitz <mreitz@redhat.com>

> +                            * It is useful to debug other block drivers by
> +                            * comparing them with a reference one.
> +                            */
> +} BDRVQuorumState;
> +
>   typedef struct QuorumAIOCB QuorumAIOCB;
>   
>   /* Quorum will create one instance of the following structure per operation it
> @@ -51,3 +68,17 @@ struct QuorumAIOCB {
>       bool is_read;
>       int vote_ret;
>   };
> +
> +static BlockDriver bdrv_quorum = {
> +    .format_name        = "quorum",
> +    .protocol_name      = "quorum",
> +
> +    .instance_size      = sizeof(BDRVQuorumState),
> +};
> +
> +static void bdrv_quorum_init(void)
> +{
> +    bdrv_register(&bdrv_quorum);
> +}
> +
> +block_init(bdrv_quorum_init);

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH V17 03/12] quorum: Add quorum_aio_writev and its dependencies.
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 03/12] quorum: Add quorum_aio_writev and its dependencies Benoît Canet
@ 2014-02-15  1:31   ` Max Reitz
  0 siblings, 0 replies; 27+ messages in thread
From: Max Reitz @ 2014-02-15  1:31 UTC (permalink / raw)
  To: Benoît Canet, qemu-devel; +Cc: kwolf, famz, Benoît Canet, stefanha

On 12.02.2014 23:06, Benoît Canet wrote:
> From: Benoît Canet <benoit@irqsave.net>
>
> Writes are mirrored num_children times on num_children devices.
>
> Signed-off-by: Benoit Canet <benoit@irqsave.net>
> ---
>   block/quorum.c | 103 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>   1 file changed, 103 insertions(+)

Reviewed-by: Max Reitz <mreitz@redhat.com>

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH V17 04/12] blkverify: Extract qemu_iovec_clone() and qemu_iovec_compare() from blkverify.
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 04/12] blkverify: Extract qemu_iovec_clone() and qemu_iovec_compare() from blkverify Benoît Canet
@ 2014-02-15  1:36   ` Max Reitz
  0 siblings, 0 replies; 27+ messages in thread
From: Max Reitz @ 2014-02-15  1:36 UTC (permalink / raw)
  To: Benoît Canet, qemu-devel; +Cc: kwolf, famz, Benoît Canet, stefanha

On 12.02.2014 23:06, Benoît Canet wrote:
> From: Benoît Canet <benoit@irqsave.net>
>
> qemu_iovec_compare() will be used to compare IOs vectors in quorum blkverify
> mode. The patch extract these functions in order to factorize the code.

"extracts" (just a sidenode: If I'm correcting (or thinking I'm 
correcting) spelling mistakes, I'm just saying "Try to fix it if you're 
doing a new version anyway", but I won't force you to; but if you don't 
correct it, prepare for me pointing it out again ;-))

Reviewed-by: Max Reitz <mreitz@redhat.com>

> Signed-off-by: Benoit Canet <benoit@irqsave.net>
> Reviewed-by: Max Reitz <mreitz@redhat.com>
> ---
>   block/blkverify.c     | 108 +-------------------------------------------------
>   include/qemu-common.h |   2 +
>   util/iov.c            | 106 +++++++++++++++++++++++++++++++++++++++++++++++++
>   3 files changed, 110 insertions(+), 106 deletions(-)
>
> diff --git a/block/blkverify.c b/block/blkverify.c
> index b57da59..c86ad7a 100644
> --- a/block/blkverify.c
> +++ b/block/blkverify.c
> @@ -173,110 +173,6 @@ static int64_t blkverify_getlength(BlockDriverState *bs)
>       return bdrv_getlength(s->test_file);
>   }
>   
> -/**
> - * Check that I/O vector contents are identical
> - *
> - * @a:          I/O vector
> - * @b:          I/O vector
> - * @ret:        Offset to first mismatching byte or -1 if match
> - */
> -static ssize_t blkverify_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
> -{
> -    int i;
> -    ssize_t offset = 0;
> -
> -    assert(a->niov == b->niov);
> -    for (i = 0; i < a->niov; i++) {
> -        size_t len = 0;
> -        uint8_t *p = (uint8_t *)a->iov[i].iov_base;
> -        uint8_t *q = (uint8_t *)b->iov[i].iov_base;
> -
> -        assert(a->iov[i].iov_len == b->iov[i].iov_len);
> -        while (len < a->iov[i].iov_len && *p++ == *q++) {
> -            len++;
> -        }
> -
> -        offset += len;
> -
> -        if (len != a->iov[i].iov_len) {
> -            return offset;
> -        }
> -    }
> -    return -1;
> -}
> -
> -typedef struct {
> -    int src_index;
> -    struct iovec *src_iov;
> -    void *dest_base;
> -} IOVectorSortElem;
> -
> -static int sortelem_cmp_src_base(const void *a, const void *b)
> -{
> -    const IOVectorSortElem *elem_a = a;
> -    const IOVectorSortElem *elem_b = b;
> -
> -    /* Don't overflow */
> -    if (elem_a->src_iov->iov_base < elem_b->src_iov->iov_base) {
> -        return -1;
> -    } else if (elem_a->src_iov->iov_base > elem_b->src_iov->iov_base) {
> -        return 1;
> -    } else {
> -        return 0;
> -    }
> -}
> -
> -static int sortelem_cmp_src_index(const void *a, const void *b)
> -{
> -    const IOVectorSortElem *elem_a = a;
> -    const IOVectorSortElem *elem_b = b;
> -
> -    return elem_a->src_index - elem_b->src_index;
> -}
> -
> -/**
> - * Copy contents of I/O vector
> - *
> - * The relative relationships of overlapping iovecs are preserved.  This is
> - * necessary to ensure identical semantics in the cloned I/O vector.
> - */
> -static void blkverify_iovec_clone(QEMUIOVector *dest, const QEMUIOVector *src,
> -                                  void *buf)
> -{
> -    IOVectorSortElem sortelems[src->niov];
> -    void *last_end;
> -    int i;
> -
> -    /* Sort by source iovecs by base address */
> -    for (i = 0; i < src->niov; i++) {
> -        sortelems[i].src_index = i;
> -        sortelems[i].src_iov = &src->iov[i];
> -    }
> -    qsort(sortelems, src->niov, sizeof(sortelems[0]), sortelem_cmp_src_base);
> -
> -    /* Allocate buffer space taking into account overlapping iovecs */
> -    last_end = NULL;
> -    for (i = 0; i < src->niov; i++) {
> -        struct iovec *cur = sortelems[i].src_iov;
> -        ptrdiff_t rewind = 0;
> -
> -        /* Detect overlap */
> -        if (last_end && last_end > cur->iov_base) {
> -            rewind = last_end - cur->iov_base;
> -        }
> -
> -        sortelems[i].dest_base = buf - rewind;
> -        buf += cur->iov_len - MIN(rewind, cur->iov_len);
> -        last_end = MAX(cur->iov_base + cur->iov_len, last_end);
> -    }
> -
> -    /* Sort by source iovec index and build destination iovec */
> -    qsort(sortelems, src->niov, sizeof(sortelems[0]), sortelem_cmp_src_index);
> -    for (i = 0; i < src->niov; i++) {
> -        qemu_iovec_add(dest, sortelems[i].dest_base, src->iov[i].iov_len);
> -    }
> -}
> -
>   static BlkverifyAIOCB *blkverify_aio_get(BlockDriverState *bs, bool is_write,
>                                            int64_t sector_num, QEMUIOVector *qiov,
>                                            int nb_sectors,
> @@ -340,7 +236,7 @@ static void blkverify_aio_cb(void *opaque, int ret)
>   
>   static void blkverify_verify_readv(BlkverifyAIOCB *acb)
>   {
> -    ssize_t offset = blkverify_iovec_compare(acb->qiov, &acb->raw_qiov);
> +    ssize_t offset = qemu_iovec_compare(acb->qiov, &acb->raw_qiov);
>       if (offset != -1) {
>           blkverify_err(acb, "contents mismatch in sector %" PRId64,
>                         acb->sector_num + (int64_t)(offset / BDRV_SECTOR_SIZE));
> @@ -358,7 +254,7 @@ static BlockDriverAIOCB *blkverify_aio_readv(BlockDriverState *bs,
>       acb->verify = blkverify_verify_readv;
>       acb->buf = qemu_blockalign(bs->file, qiov->size);
>       qemu_iovec_init(&acb->raw_qiov, acb->qiov->niov);
> -    blkverify_iovec_clone(&acb->raw_qiov, qiov, acb->buf);
> +    qemu_iovec_clone(&acb->raw_qiov, qiov, acb->buf);
>   
>       bdrv_aio_readv(s->test_file, sector_num, qiov, nb_sectors,
>                      blkverify_aio_cb, acb);
> diff --git a/include/qemu-common.h b/include/qemu-common.h
> index 5054836..d70783e 100644
> --- a/include/qemu-common.h
> +++ b/include/qemu-common.h
> @@ -346,6 +346,8 @@ size_t qemu_iovec_from_buf(QEMUIOVector *qiov, size_t offset,
>                              const void *buf, size_t bytes);
>   size_t qemu_iovec_memset(QEMUIOVector *qiov, size_t offset,
>                            int fillc, size_t bytes);
> +ssize_t qemu_iovec_compare(QEMUIOVector *a, QEMUIOVector *b);
> +void qemu_iovec_clone(QEMUIOVector *dest, const QEMUIOVector *src, void *buf);
>   
>   bool buffer_is_zero(const void *buf, size_t len);
>   
> diff --git a/util/iov.c b/util/iov.c
> index bb46c04..03934da 100644
> --- a/util/iov.c
> +++ b/util/iov.c
> @@ -378,6 +378,112 @@ size_t qemu_iovec_memset(QEMUIOVector *qiov, size_t offset,
>       return iov_memset(qiov->iov, qiov->niov, offset, fillc, bytes);
>   }
>   
> +/**
> + * Check that I/O vector contents are identical
> + *
> + * The IO vectors must have the same structure (same length of all parts).
> + * A typical usage is to compare vectors created with qemu_iovec_clone().
> + *
> + * @a:          I/O vector
> + * @b:          I/O vector
> + * @ret:        Offset to first mismatching byte or -1 if match
> + */
> +ssize_t qemu_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
> +{
> +    int i;
> +    ssize_t offset = 0;
> +
> +    assert(a->niov == b->niov);
> +    for (i = 0; i < a->niov; i++) {
> +        size_t len = 0;
> +        uint8_t *p = (uint8_t *)a->iov[i].iov_base;
> +        uint8_t *q = (uint8_t *)b->iov[i].iov_base;
> +
> +        assert(a->iov[i].iov_len == b->iov[i].iov_len);
> +        while (len < a->iov[i].iov_len && *p++ == *q++) {
> +            len++;
> +        }
> +
> +        offset += len;
> +
> +        if (len != a->iov[i].iov_len) {
> +            return offset;
> +        }
> +    }
> +    return -1;
> +}
> +
> +typedef struct {
> +    int src_index;
> +    struct iovec *src_iov;
> +    void *dest_base;
> +} IOVectorSortElem;
> +
> +static int sortelem_cmp_src_base(const void *a, const void *b)
> +{
> +    const IOVectorSortElem *elem_a = a;
> +    const IOVectorSortElem *elem_b = b;
> +
> +    /* Don't overflow */
> +    if (elem_a->src_iov->iov_base < elem_b->src_iov->iov_base) {
> +        return -1;
> +    } else if (elem_a->src_iov->iov_base > elem_b->src_iov->iov_base) {
> +        return 1;
> +    } else {
> +        return 0;
> +    }
> +}
> +
> +static int sortelem_cmp_src_index(const void *a, const void *b)
> +{
> +    const IOVectorSortElem *elem_a = a;
> +    const IOVectorSortElem *elem_b = b;
> +
> +    return elem_a->src_index - elem_b->src_index;
> +}
> +
> +/**
> + * Copy contents of I/O vector
> + *
> + * The relative relationships of overlapping iovecs are preserved.  This is
> + * necessary to ensure identical semantics in the cloned I/O vector.
> + */
> +void qemu_iovec_clone(QEMUIOVector *dest, const QEMUIOVector *src, void *buf)
> +{
> +    IOVectorSortElem sortelems[src->niov];
> +    void *last_end;
> +    int i;
> +
> +    /* Sort by source iovecs by base address */
> +    for (i = 0; i < src->niov; i++) {
> +        sortelems[i].src_index = i;
> +        sortelems[i].src_iov = &src->iov[i];
> +    }
> +    qsort(sortelems, src->niov, sizeof(sortelems[0]), sortelem_cmp_src_base);
> +
> +    /* Allocate buffer space taking into account overlapping iovecs */
> +    last_end = NULL;
> +    for (i = 0; i < src->niov; i++) {
> +        struct iovec *cur = sortelems[i].src_iov;
> +        ptrdiff_t rewind = 0;
> +
> +        /* Detect overlap */
> +        if (last_end && last_end > cur->iov_base) {
> +            rewind = last_end - cur->iov_base;
> +        }
> +
> +        sortelems[i].dest_base = buf - rewind;
> +        buf += cur->iov_len - MIN(rewind, cur->iov_len);
> +        last_end = MAX(cur->iov_base + cur->iov_len, last_end);
> +    }
> +
> +    /* Sort by source iovec index and build destination iovec */
> +    qsort(sortelems, src->niov, sizeof(sortelems[0]), sortelem_cmp_src_index);
> +    for (i = 0; i < src->niov; i++) {
> +        qemu_iovec_add(dest, sortelems[i].dest_base, src->iov[i].iov_len);
> +    }
> +}
> +
>   size_t iov_discard_front(struct iovec **iov, unsigned int *iov_cnt,
>                            size_t bytes)
>   {

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH V17 05/12] quorum: Add quorum_aio_readv.
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 05/12] quorum: Add quorum_aio_readv Benoît Canet
@ 2014-02-15  1:40   ` Max Reitz
  2014-02-17 10:57     ` Kevin Wolf
  0 siblings, 1 reply; 27+ messages in thread
From: Max Reitz @ 2014-02-15  1:40 UTC (permalink / raw)
  To: Benoît Canet, qemu-devel; +Cc: kwolf, famz, Benoît Canet, stefanha

On 12.02.2014 23:06, Benoît Canet wrote:
> From: Benoît Canet <benoit@irqsave.net>
>
> Add code to do num_children reads in parallel and cleanup the structures
> afterward.

"afterwards"

> Signed-off-by: Benoit Canet <benoit@irqsave.net>
> Reviewed-by: Max Reitz <mreitz@redhat.com>
> ---
>   block/quorum.c | 39 ++++++++++++++++++++++++++++++++++++++-
>   1 file changed, 38 insertions(+), 1 deletion(-)
>
> diff --git a/block/quorum.c b/block/quorum.c
> index 197cdca..c7a5d79 100644
> --- a/block/quorum.c
> +++ b/block/quorum.c
> @@ -91,10 +91,18 @@ static AIOCBInfo quorum_aiocb_info = {
>   
>   static void quorum_aio_finalize(QuorumAIOCB *acb)
>   {
> -    int ret = 0;
> +    BDRVQuorumState *s = acb->common.bs->opaque;
> +    int i, ret = 0;
>   
>       acb->common.cb(acb->common.opaque, ret);
>   
> +    if (acb->is_read) {
> +        for (i = 0; i < s->num_children; i++) {
> +            qemu_vfree(acb->qcrs[i].buf);
> +            qemu_iovec_destroy(&acb->qcrs[i].qiov);
> +        }
> +    }
> +
>       g_free(acb->qcrs);
>       qemu_aio_release(acb);
>   }
> @@ -149,6 +157,34 @@ static void quorum_aio_cb(void *opaque, int ret)
>       quorum_aio_finalize(acb);
>   }
>   
> +static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
> +                                         int64_t sector_num,
> +                                         QEMUIOVector *qiov,
> +                                         int nb_sectors,
> +                                         BlockDriverCompletionFunc *cb,
> +                                         void *opaque)
> +{
> +    BDRVQuorumState *s = bs->opaque;
> +    QuorumAIOCB *acb = quorum_aio_get(s, bs, qiov, sector_num,
> +                                      nb_sectors, cb, opaque);
> +    int i;
> +
> +    acb->is_read = true;
> +
> +    for (i = 0; i < s->num_children; i++) {
> +        acb->qcrs[i].buf = qemu_blockalign(s->bs[i], qiov->size);
> +        qemu_iovec_init(&acb->qcrs[i].qiov, qiov->niov);
> +        qemu_iovec_clone(&acb->qcrs[i].qiov, qiov, acb->qcrs[i].buf);
> +    }
> +
> +    for (i = 0; i < s->num_children; i++) {
> +        bdrv_aio_readv(s->bs[i], sector_num, &acb->qcrs[i].qiov, nb_sectors,
> +                       quorum_aio_cb, &acb->qcrs[i]);

I know Kevin told you to, but using the child's own QIOV here means 
quorum_aio_readv() won't work after this patch but only after patch 6. 
Just pointing it out, I don't like it, but Kevin told you to, so:

Reviewed-by: Max Reitz <mreitz@redhat.com>

> +    }
> +
> +    return &acb->common;
> +}
> +
>   static BlockDriverAIOCB *quorum_aio_writev(BlockDriverState *bs,
>                                             int64_t sector_num,
>                                             QEMUIOVector *qiov,
> @@ -176,6 +212,7 @@ static BlockDriver bdrv_quorum = {
>   
>       .instance_size      = sizeof(BDRVQuorumState),
>   
> +    .bdrv_aio_readv     = quorum_aio_readv,
>       .bdrv_aio_writev    = quorum_aio_writev,
>   };
>   

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH V17 06/12] quorum: Add quorum mechanism.
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 06/12] quorum: Add quorum mechanism Benoît Canet
@ 2014-02-15  1:53   ` Max Reitz
  0 siblings, 0 replies; 27+ messages in thread
From: Max Reitz @ 2014-02-15  1:53 UTC (permalink / raw)
  To: Benoît Canet, qemu-devel; +Cc: kwolf, famz, Benoît Canet, stefanha

On 12.02.2014 23:06, Benoît Canet wrote:
> From: Benoît Canet <benoit@irqsave.net>
>
> This patchset enable the core of the quorum mechanism.

"enables"

> The num_children reads are compared to get the majority version and if this
> version exist more than threshold time the guest won't see the error at all.

"exists", "times"

> QMP events are used to report individual driver errors or whole quorum.
> error to the management.

What is this supposed to mean?

> Use gnutls's SHA-256 to compare versions.
>
> --enable-quorum must be used to enable the feature.
>
> Signed-off-by: Benoit Canet <benoit@irqsave.net>
> ---
>   block/Makefile.objs       |   2 +-
>   block/quorum.c            | 394 ++++++++++++++++++++++++++++++++++++++++++++++
>   configure                 |  36 +++++
>   docs/qmp/qmp-events.txt   |  36 +++++
>   include/monitor/monitor.h |   2 +
>   monitor.c                 |   2 +
>   6 files changed, 471 insertions(+), 1 deletion(-)
>
> diff --git a/block/Makefile.objs b/block/Makefile.objs
> index 716556f..425d7fb 100644
> --- a/block/Makefile.objs
> +++ b/block/Makefile.objs
> @@ -3,7 +3,7 @@ block-obj-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-c
>   block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
>   block-obj-y += qed-check.o
>   block-obj-$(CONFIG_VHDX) += vhdx.o vhdx-endian.o vhdx-log.o
> -block-obj-y += quorum.o
> +block-obj-$(CONFIG_QUORUM) += quorum.o
>   block-obj-y += parallels.o blkdebug.o blkverify.o
>   block-obj-y += snapshot.o qapi.o
>   block-obj-$(CONFIG_WIN32) += raw-win32.o win32-aio.o
> diff --git a/block/quorum.c b/block/quorum.c
> index c7a5d79..bfd8e9d 100644
> --- a/block/quorum.c
> +++ b/block/quorum.c
> @@ -13,7 +13,43 @@
>    * See the COPYING file in the top-level directory.
>    */
>   
> +#include <gnutls/gnutls.h>
> +#include <gnutls/crypto.h>
>   #include "block/block_int.h"
> +#include "qapi/qmp/qjson.h"
> +
> +#define HASH_LENGTH 32
> +
> +/* This union holds a vote hash value */
> +typedef union QuorumVoteValue {
> +    char h[HASH_LENGTH];       /* SHA-256 hash */
> +    int64_t l;                 /* simpler 64 bits hash */
> +} QuorumVoteValue;
> +
> +/* A vote item */
> +typedef struct QuorumVoteItem {
> +    int index;
> +    QLIST_ENTRY(QuorumVoteItem) next;
> +} QuorumVoteItem;
> +
> +/* this structure is a vote version. A version is the set of votes sharing the
> + * same vote value.
> + * The set of votes will be tracked with the items field and its cardinality is
> + * vote_count.
> + */
> +typedef struct QuorumVoteVersion {
> +    QuorumVoteValue value;
> +    int index;
> +    int vote_count;
> +    QLIST_HEAD(, QuorumVoteItem) items;
> +    QLIST_ENTRY(QuorumVoteVersion) next;
> +} QuorumVoteVersion;
> +
> +/* this structure holds a group of vote versions together */
> +typedef struct QuorumVotes {
> +    QLIST_HEAD(, QuorumVoteVersion) vote_list;
> +    bool (*compare)(QuorumVoteValue *a, QuorumVoteValue *b);
> +} QuorumVotes;
>   
>   /* the following structure holds the state of one quorum instance */
>   typedef struct BDRVQuorumState {
> @@ -65,10 +101,14 @@ struct QuorumAIOCB {
>       int count;                  /* number of completed AIOCB */
>       int success_count;          /* number of successfully completed AIOCB */
>   
> +    QuorumVotes votes;
> +
>       bool is_read;
>       int vote_ret;
>   };
>   
> +static void quorum_vote(QuorumAIOCB *acb);
> +
>   static void quorum_aio_cancel(BlockDriverAIOCB *blockacb)
>   {
>       QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common);
> @@ -94,6 +134,10 @@ static void quorum_aio_finalize(QuorumAIOCB *acb)
>       BDRVQuorumState *s = acb->common.bs->opaque;
>       int i, ret = 0;
>   
> +    if (acb->vote_ret) {
> +        ret = acb->vote_ret;
> +    }
> +
>       acb->common.cb(acb->common.opaque, ret);
>   
>       if (acb->is_read) {
> @@ -107,6 +151,16 @@ static void quorum_aio_finalize(QuorumAIOCB *acb)
>       qemu_aio_release(acb);
>   }
>   
> +static bool quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
> +{
> +    return memcmp(a->h, b->h, HASH_LENGTH);
> +}
> +
> +static bool quorum_64bits_compare(QuorumVoteValue *a, QuorumVoteValue *b)
> +{
> +    return a->l != b->l;
> +}
> +

Okay, so these functions return true if the values differ and false if 
they're equal. Not what I'd naturally assume (so you should at least 
document it), but okay. So far.

>   static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
>                                      BlockDriverState *bs,
>                                      QEMUIOVector *qiov,
> @@ -125,6 +179,8 @@ static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
>       acb->qcrs = g_new0(QuorumChildRequest, s->num_children);
>       acb->count = 0;
>       acb->success_count = 0;
> +    acb->votes.compare = quorum_sha256_compare;
> +    QLIST_INIT(&acb->votes.vote_list);
>       acb->is_read = false;
>       acb->vote_ret = 0;
>   
> @@ -137,6 +193,53 @@ static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
>       return acb;
>   }
>   
> +static void quorum_report_bad(QuorumAIOCB *acb, char *node_name, int ret)
> +{
> +    QObject *data;
> +    assert(node_name);
> +    data = qobject_from_jsonf("{ 'ret': %i"
> +                              ", 'node-name': \"%s\""
> +                              ", 'sector-num': %" PRId64
> +                              ", 'sectors-count': %i }",
> +                              ret,
> +                              node_name,
> +                              acb->sector_num,
> +                              acb->nb_sectors);

These non-string parameters fit on a single line, but that's a matter of 
taste.

> +    monitor_protocol_event(QEVENT_QUORUM_REPORT_BAD, data);
> +    qobject_decref(data);
> +}
> +
> +static void quorum_report_failure(QuorumAIOCB *acb)
> +{
> +    QObject *data;
> +    const char *reference = acb->common.bs->device_name[0] ?
> +                            acb->common.bs->device_name :
> +                            acb->common.bs->node_name;
> +    data = qobject_from_jsonf("{ 'reference': \"%s\""
> +                              ", 'sector-num': %" PRId64
> +                              ", 'sectors-count': %i }",
> +                              reference,
> +                              acb->sector_num,
> +                              acb->nb_sectors);

Same here.

> +    monitor_protocol_event(QEVENT_QUORUM_FAILURE, data);
> +    qobject_decref(data);
> +}
> +
> +static int quorum_vote_error(QuorumAIOCB *acb);
> +
> +static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb)
> +{
> +    BDRVQuorumState *s = acb->common.bs->opaque;
> +
> +    if (acb->success_count < s->threshold) {
> +        acb->vote_ret = quorum_vote_error(acb);
> +        quorum_report_failure(acb);
> +        return true;
> +    }
> +
> +    return false;
> +}
> +
>   static void quorum_aio_cb(void *opaque, int ret)
>   {
>       QuorumChildRequest *sacb = opaque;
> @@ -147,6 +250,8 @@ static void quorum_aio_cb(void *opaque, int ret)
>       acb->count++;
>       if (ret == 0) {
>           acb->success_count++;
> +    } else {
> +        quorum_report_bad(acb, sacb->aiocb->bs->node_name, ret);
>       }
>       assert(acb->count <= s->num_children);
>       assert(acb->success_count <= s->num_children);
> @@ -154,9 +259,298 @@ static void quorum_aio_cb(void *opaque, int ret)
>           return;
>       }
>   
> +    /* Do the vote on read */
> +    if (acb->is_read) {
> +        quorum_vote(acb);
> +    } else {
> +        quorum_has_too_much_io_failed(acb);
> +    }
> +
>       quorum_aio_finalize(acb);
>   }
>   
> +static void quorum_report_bad_versions(BDRVQuorumState *s,
> +                                       QuorumAIOCB *acb,
> +                                       QuorumVoteValue *value)
> +{
> +    QuorumVoteVersion *version;
> +    QuorumVoteItem *item;
> +
> +    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
> +        if (!acb->votes.compare(&version->value, value)) {
> +            continue;
> +        }
> +        QLIST_FOREACH(item, &version->items, next) {
> +            quorum_report_bad(acb, s->bs[item->index]->node_name, 0);
> +        }
> +    }
> +}
> +
> +static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
> +{
> +    int i;
> +    assert(dest->niov == source->niov);
> +    assert(dest->size == source->size);
> +    for (i = 0; i < source->niov; i++) {
> +        assert(dest->iov[i].iov_len == source->iov[i].iov_len);
> +        memcpy(dest->iov[i].iov_base,
> +               source->iov[i].iov_base,
> +               source->iov[i].iov_len);
> +    }
> +}
> +
> +static void quorum_count_vote(QuorumVotes *votes,
> +                              QuorumVoteValue *value,
> +                              int index)
> +{
> +    QuorumVoteVersion *v = NULL, *version = NULL;
> +    QuorumVoteItem *item;
> +
> +    /* look if we have something with this hash */
> +    QLIST_FOREACH(v, &votes->vote_list, next) {
> +        if (!votes->compare(&v->value, value)) {
> +            version = v;
> +            break;
> +        }
> +    }
> +
> +    /* It's a version not yet in the list add it */
> +    if (!version) {
> +        version = g_new0(QuorumVoteVersion, 1);
> +        QLIST_INIT(&version->items);
> +        memcpy(&version->value, value, sizeof(version->value));
> +        version->index = index;
> +        version->vote_count = 0;
> +        QLIST_INSERT_HEAD(&votes->vote_list, version, next);
> +    }
> +
> +    version->vote_count++;
> +
> +    item = g_new0(QuorumVoteItem, 1);
> +    item->index = index;
> +    QLIST_INSERT_HEAD(&version->items, item, next);
> +}
> +
> +static void quorum_free_vote_list(QuorumVotes *votes)
> +{
> +    QuorumVoteVersion *version, *next_version;
> +    QuorumVoteItem *item, *next_item;
> +
> +    QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) {
> +        QLIST_REMOVE(version, next);
> +        QLIST_FOREACH_SAFE(item, &version->items, next, next_item) {
> +            QLIST_REMOVE(item, next);
> +            g_free(item);
> +        }
> +        g_free(version);
> +    }
> +}
> +
> +static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash)
> +{
> +    int j, ret;
> +    gnutls_hash_hd_t dig;
> +    QEMUIOVector *qiov = &acb->qcrs[i].qiov;
> +
> +    ret = gnutls_hash_init(&dig, GNUTLS_DIG_SHA256);
> +
> +    if (ret < 0) {
> +        return ret;
> +    }
> +
> +    for (j = 0; j < qiov->niov; j++) {
> +        ret = gnutls_hash(dig, qiov->iov[j].iov_base, qiov->iov[j].iov_len);
> +        if (ret < 0) {
> +            break;
> +        }
> +    }
> +
> +    gnutls_hash_deinit(dig, (void *) hash);
> +    return ret;
> +}
> +
> +static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes)
> +{
> +    int max = 0;
> +    QuorumVoteVersion *candidate, *winner = NULL;
> +
> +    QLIST_FOREACH(candidate, &votes->vote_list, next) {
> +        if (candidate->vote_count > max) {
> +            max = candidate->vote_count;
> +            winner = candidate;
> +        }
> +    }
> +
> +    return winner;
> +}
> +
> +/* qemu_iovec_compare is handy for blkverify mode because it return the first

"returns"

> + * differing byte location. Yet it is handcoded to compare vectors one byte
> + * after another so it does not benefit from the libc SIMD optimizations.

Implying, the libc actually has SIMD optimizations. Last time I heard, 
repe cmpsd is as fast as SSE optimized versions at least on Sandy 
Bridge. And also implying, the compiler is unable to optimize 
qemu_iovec_compare() using SSE (Judging from how the function is 
written, my guess is it is able to).

But okay, C compilers are not everywhere as good as they are on x86, so 
I'm fine with it.

> + * quorum_iovec_compare is written for speed and should be used in the non
> + * blkverify mode of quorum.
> + */
> +static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
> +{
> +    int i;
> +    int result;
> +
> +    assert(a->niov == b->niov);
> +    for (i = 0; i < a->niov; i++) {
> +        assert(a->iov[i].iov_len == b->iov[i].iov_len);
> +        result = memcmp(a->iov[i].iov_base,
> +                        b->iov[i].iov_base,
> +                        a->iov[i].iov_len);
> +        if (result) {
> +            return false;
> +        }
> +    }
> +
> +    return true;
> +}

Ookay, so this function returns true if the QIOVs are equal and false if 
they differ. This behavior is exactly the opposite from what the value 
comparing functions do. I personally find this pretty confusing.

> +
> +static void GCC_FMT_ATTR(2, 3) quorum_err(QuorumAIOCB *acb,
> +                                          const char *fmt, ...)
> +{
> +    va_list ap;
> +
> +    va_start(ap, fmt);
> +    fprintf(stderr, "quorum: sector_num=%" PRId64 " nb_sectors=%d ",
> +            acb->sector_num, acb->nb_sectors);
> +    vfprintf(stderr, fmt, ap);
> +    fprintf(stderr, "\n");
> +    va_end(ap);
> +    exit(1);
> +}
> +
> +static bool quorum_compare(QuorumAIOCB *acb,
> +                           QEMUIOVector *a,
> +                           QEMUIOVector *b)
> +{
> +    BDRVQuorumState *s = acb->common.bs->opaque;
> +    ssize_t offset;
> +
> +    /* This driver will replace blkverify in this particular case */
> +    if (s->is_blkverify) {
> +        offset = qemu_iovec_compare(a, b);
> +        if (offset != -1) {
> +            quorum_err(acb, "contents mismatch in sector %" PRId64,
> +                       acb->sector_num +
> +                       (uint64_t)(offset / BDRV_SECTOR_SIZE));
> +        }
> +        return true;
> +    }
> +
> +    return quorum_iovec_compare(a, b);
> +}
> +
> +/* Do a vote to get the error code */
> +static int quorum_vote_error(QuorumAIOCB *acb)
> +{
> +    BDRVQuorumState *s = acb->common.bs->opaque;
> +    QuorumVoteVersion *winner = NULL;
> +    QuorumVotes error_votes;
> +    QuorumVoteValue result_value;
> +    int i, ret = 0;
> +    bool error = false;
> +
> +    QLIST_INIT(&error_votes.vote_list);
> +    error_votes.compare = quorum_64bits_compare;
> +
> +    for (i = 0; i < s->num_children; i++) {
> +        ret = acb->qcrs[i].ret;
> +        if (ret) {
> +            error = true;
> +            result_value.l = ret;
> +            quorum_count_vote(&error_votes, &result_value, i);
> +        }
> +    }
> +
> +    if (error) {
> +        winner = quorum_get_vote_winner(&error_votes);
> +        ret = winner->value.l;
> +    }
> +
> +    quorum_free_vote_list(&error_votes);
> +
> +    return ret;
> +}
> +
> +static void quorum_vote(QuorumAIOCB *acb)
> +{
> +    bool quorum = true;
> +    int i, j, ret;
> +    QuorumVoteValue hash;
> +    BDRVQuorumState *s = acb->common.bs->opaque;
> +    QuorumVoteVersion *winner;
> +
> +    if (quorum_has_too_much_io_failed(acb)) {
> +        return;
> +    }
> +
> +    /* get the index of the first successful read */
> +    for (i = 0; i < s->num_children; i++) {
> +        if (!acb->qcrs[i].ret) {
> +            break;
> +        }
> +    }
> +
> +    assert(i < s->num_children);
> +
> +    /* compare this read with all other successful read stoping at quorum

"reads" and "stopping"

> +     * failure
> +     */
> +    for (j = i + 1; j < s->num_children; j++) {
> +        if (acb->qcrs[j].ret) {
> +            continue;
> +        }
> +        quorum = quorum_compare(acb, &acb->qcrs[i].qiov, &acb->qcrs[j].qiov);
> +        if (!quorum) {
> +            break;
> +       }
> +    }
> +
> +    /* Every successful read agrees */
> +    if (quorum) {
> +        quorum_copy_qiov(acb->qiov, &acb->qcrs[i].qiov);
> +        return;
> +    }
> +
> +    /* compute hashs for each successful read, also store indexes */

"hashes" (by the way, I'm sorry for pointing out such mistakes only just 
know but not before)

> +    for (i = 0; i < s->num_children; i++) {
> +        if (acb->qcrs[i].ret) {
> +            continue;
> +        }
> +        ret = quorum_compute_hash(acb, i, &hash);
> +        /* if ever the hash computation failed */
> +        if (ret < 0) {
> +            acb->vote_ret = ret;
> +            goto free_exit;
> +        }
> +        quorum_count_vote(&acb->votes, &hash, i);
> +    }
> +
> +    /* vote to select the most represented version */
> +    winner = quorum_get_vote_winner(&acb->votes);
> +
> +    /* if the winner count is smaller than threshold the read fails */
> +    if (winner->vote_count < s->threshold) {
> +        quorum_report_failure(acb);
> +        acb->vote_ret = -EIO;
> +        goto free_exit;
> +    }
> +
> +    /* we have a winner: copy it */
> +    quorum_copy_qiov(acb->qiov, &acb->qcrs[winner->index].qiov);
> +
> +    /* some versions are bad print them */
> +    quorum_report_bad_versions(s, acb, &winner->value);
> +
> +free_exit:
> +    /* free lists */
> +    quorum_free_vote_list(&acb->votes);
> +}
> +
>   static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
>                                            int64_t sector_num,
>                                            QEMUIOVector *qiov,
> diff --git a/configure b/configure
> index 5b20ce6..11f0a3a 100755
> --- a/configure
> +++ b/configure
> @@ -264,6 +264,7 @@ gtkabi="2.0"
>   tpm="no"
>   libssh2=""
>   vhdx=""
> +quorum="no"
>   
>   # parse CC options first
>   for opt do
> @@ -1005,6 +1006,10 @@ for opt do
>     ;;
>     --disable-vhdx) vhdx="no"
>     ;;
> +  --disable-quorum) quorum="no"
> +  ;;
> +  --enable-quorum) quorum="yes"
> +  ;;
>     *) echo "ERROR: unknown option $opt"; show_help="yes"
>     ;;
>     esac
> @@ -1261,6 +1266,8 @@ Advanced options (experts only):
>     --enable-libssh2         enable ssh block device support
>     --disable-vhdx           disables support for the Microsoft VHDX image format
>     --enable-vhdx            enable support for the Microsoft VHDX image format
> +  --disable-quorum         disable quorum block filter support
> +  --enable-quorum          enable quorum block filter support
>   
>   NOTE: The object files are built at the place where configure is launched
>   EOF
> @@ -1930,6 +1937,30 @@ EOF
>   fi
>   
>   ##########################################
> +# Quorum probe (check for gnutls)
> +if test "$quorum" != "no" ; then
> +cat > $TMPC <<EOF
> +#include <gnutls/gnutls.h>
> +#include <gnutls/crypto.h>
> +int main(void) {char data[4096], digest[32];
> +gnutls_hash_fast(GNUTLS_DIG_SHA256, data, 4096, digest);
> +return 0;
> +}
> +EOF
> +quorum_tls_cflags=`$pkg_config --cflags gnutls 2> /dev/null`
> +quorum_tls_libs=`$pkg_config --libs gnutls 2> /dev/null`
> +if compile_prog "$quorum_tls_cflags" "$quorum_tls_libs" ; then
> +  qcow_tls=yes
> +  libs_softmmu="$quorum_tls_libs $libs_softmmu"
> +  libs_tools="$quorum_tls_libs $libs_softmmu"
> +  QEMU_CFLAGS="$QEMU_CFLAGS $quorum_tls_cflags"
> +else
> +  echo "gnutls > 2.10.0 required to compile Quorum"
> +  exit 1
> +fi
> +fi
> +
> +##########################################
>   # VNC SASL detection
>   if test "$vnc" = "yes" -a "$vnc_sasl" != "no" ; then
>     cat > $TMPC <<EOF
> @@ -3865,6 +3896,7 @@ echo "libssh2 support   $libssh2"
>   echo "TPM passthrough   $tpm_passthrough"
>   echo "QOM debugging     $qom_cast_debug"
>   echo "vhdx              $vhdx"
> +echo "Quorum            $quorum"
>   
>   if test "$sdl_too_old" = "yes"; then
>   echo "-> Your SDL version is too old - please upgrade to have SDL support"
> @@ -4267,6 +4299,10 @@ if test "$libssh2" = "yes" ; then
>     echo "CONFIG_LIBSSH2=y" >> $config_host_mak
>   fi
>   
> +if test "$quorum" = "yes" ; then
> +  echo "CONFIG_QUORUM=y" >> $config_host_mak
> +fi
> +
>   if test "$virtio_blk_data_plane" = "yes" ; then
>     echo 'CONFIG_VIRTIO_BLK_DATA_PLANE=$(CONFIG_VIRTIO)' >> $config_host_mak
>   fi
> diff --git a/docs/qmp/qmp-events.txt b/docs/qmp/qmp-events.txt
> index a378c87..00f9515 100644
> --- a/docs/qmp/qmp-events.txt
> +++ b/docs/qmp/qmp-events.txt
> @@ -500,3 +500,39 @@ Example:
>   
>   Note: If action is "reset", "shutdown", or "pause" the WATCHDOG event is
>   followed respectively by the RESET, SHUTDOWN, or STOP events.
> +
> +QUORUM_FAILURE
> +--------------
> +
> +Emitted by the Quorum block driver if it fails to establish a quorum.
> +
> +Data:
> +
> +- "reference":    device name if defined else node name.
> +- "sector-num":   Number of the first sector of the failed read operation.
> +- "sector-count": Failed read operation sector count.
> +
> +Example:
> +
> +{ "event": "QUORUM_FAILURE",
> +     "data": { "reference": "usr1", "sector-num": 345435, "sector-count": 5 },
> +     "timestamp": { "seconds": 1344522075, "microseconds": 745528 } }
> +
> +QUORUM_REPORT_BAD
> +-----------------
> +
> +Emitted to report a corruption of a Quorum file.
> +
> +Data:
> +
> +- "ret":          The IO return code.
> +- "node-name":    The graph node name of the block driver state.
> +- "sector-num":   Number of the first sector of the failed read operation.
> +- "sector-count": Failed read operation sector count.
> +
> +Example:
> +
> +{ "event": "QUORUM_REPORT_BAD",
> +     "data": { "ret": 0, "node-name": "1.raw", "sector-num": 345435,
> +               "sector-count": 5 },
> +     "timestamp": { "seconds": 1344522075, "microseconds": 745528 } }
> diff --git a/include/monitor/monitor.h b/include/monitor/monitor.h
> index 7e5f752..a49ea11 100644
> --- a/include/monitor/monitor.h
> +++ b/include/monitor/monitor.h
> @@ -49,6 +49,8 @@ typedef enum MonitorEvent {
>       QEVENT_SPICE_MIGRATE_COMPLETED,
>       QEVENT_GUEST_PANICKED,
>       QEVENT_BLOCK_IMAGE_CORRUPTED,
> +    QEVENT_QUORUM_FAILURE,
> +    QEVENT_QUORUM_REPORT_BAD,
>   
>       /* Add to 'monitor_event_names' array in monitor.c when
>        * defining new events here */
> diff --git a/monitor.c b/monitor.c
> index b1ea262..81ffa0f 100644
> --- a/monitor.c
> +++ b/monitor.c
> @@ -507,6 +507,8 @@ static const char *monitor_event_names[] = {
>       [QEVENT_SPICE_MIGRATE_COMPLETED] = "SPICE_MIGRATE_COMPLETED",
>       [QEVENT_GUEST_PANICKED] = "GUEST_PANICKED",
>       [QEVENT_BLOCK_IMAGE_CORRUPTED] = "BLOCK_IMAGE_CORRUPTED",
> +    [QEVENT_QUORUM_FAILURE] = "QUORUM_FAILURE",
> +    [QEVENT_QUORUM_REPORT_BAD] = "QUORUM_REPORT_BAD",
>   };
>   QEMU_BUILD_BUG_ON(ARRAY_SIZE(monitor_event_names) != QEVENT_MAX)
>   

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH V17 09/12] quorum: Add quorum_co_flush().
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 09/12] quorum: Add quorum_co_flush() Benoît Canet
@ 2014-02-15  1:54   ` Max Reitz
  0 siblings, 0 replies; 27+ messages in thread
From: Max Reitz @ 2014-02-15  1:54 UTC (permalink / raw)
  To: Benoît Canet, qemu-devel; +Cc: kwolf, famz, Benoît Canet, stefanha

On 12.02.2014 23:06, Benoît Canet wrote:
> From: Benoît Canet <benoit@irqsave.net>
>
> Makes a vote to select error if any.
>
> Signed-off-by: Benoit Canet <benoit@irqsave.net>
> ---
>   block/quorum.c | 28 ++++++++++++++++++++++++++++
>   1 file changed, 28 insertions(+)

Reviewed-by: Max Reitz <mreitz@redhat.com>

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH V17 11/12] quorum: Add quorum_open() and quorum_close().
  2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 11/12] quorum: Add quorum_open() and quorum_close() Benoît Canet
  2014-02-13  9:09   ` Benoît Canet
@ 2014-02-15  2:02   ` Max Reitz
  1 sibling, 0 replies; 27+ messages in thread
From: Max Reitz @ 2014-02-15  2:02 UTC (permalink / raw)
  To: Benoît Canet, qemu-devel; +Cc: kwolf, famz, Benoît Canet, stefanha

On 12.02.2014 23:06, Benoît Canet wrote:
> From: Benoît Canet <benoit@irqsave.net>
>
> Example of command line:
> -drive if=virtio,file.driver=quorum,\
> file.children.0.file.filename=1.raw,\
> file.children.0.node-name=1.raw,\
> file.children.0.driver=raw,\
> file.children.1.file.filename=2.raw,\
> file.children.1.node-name=2.raw,\
> file.children.1.driver=raw,\
> file.children.2.file.filename=3.raw,\
> file.children.2.node-name=3.raw,\
> file.children.2.driver=raw,\
> file.vote-threshold=2
>
> file.blkverify=on with file.vote-threshold=2 and two files can be passed to
> emulated blkverify.
>
> Signed-off-by: Benoit Canet <benoit@irqsave.net>
> ---
>   block/quorum.c   | 165 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
>   monitor.c        |   2 +
>   qapi-schema.json |  21 ++++++-
>   3 files changed, 187 insertions(+), 1 deletion(-)
>
> diff --git a/block/quorum.c b/block/quorum.c
> index 8e033ef..d3a90fb 100644
> --- a/block/quorum.c
> +++ b/block/quorum.c
> @@ -20,6 +20,9 @@
>   
>   #define HASH_LENGTH 32
>   
> +#define QUORUM_OPT_VOTE_THRESHOLD "vote-threshold"
> +#define QUORUM_OPT_BLKVERIFY      "blkverify"
> +
>   /* This union holds a vote hash value */
>   typedef union QuorumVoteValue {
>       char h[HASH_LENGTH];       /* SHA-256 hash */
> @@ -677,12 +680,174 @@ static bool quorum_recurse_is_first_non_filter(BlockDriverState *bs,
>       return false;
>   }
>   
> +static int quorum_valid_threshold(int threshold,
> +                                  int num_children,
> +                                  Error **errp)

This fits on a single line and I'm all for maximizing usage of the 80 
character limit.

> +{
> +
> +    if (threshold < 1) {
> +        error_set(errp, QERR_INVALID_PARAMETER_VALUE,
> +                  "vote-threshold", "value >= 1");
> +        return -ERANGE;
> +    }
> +
> +    if (threshold > num_children) {
> +        error_setg(errp, "threshold may not exceed children count");
> +        return -ERANGE;
> +    }
> +
> +    return 0;
> +}
> +
> +static QemuOptsList quorum_runtime_opts = {
> +    .name = "quorum",
> +    .head = QTAILQ_HEAD_INITIALIZER(quorum_runtime_opts.head),
> +    .desc = {
> +        {
> +            .name = QUORUM_OPT_VOTE_THRESHOLD,
> +            .type = QEMU_OPT_NUMBER,
> +            .help = "The number of vote needed for reaching quorum",
> +        },
> +        {
> +            .name = QUORUM_OPT_BLKVERIFY,
> +            .type = QEMU_OPT_BOOL,
> +            .help = "Trigger block verify mode if set",
> +        },
> +        { /* end of list */ }
> +    },
> +};
> +
> +static int quorum_open(BlockDriverState *bs,
> +                       QDict *options,
> +                       int flags,
> +                       Error **errp)

This takes two lines (although since you have to split at all, I can 
cope with this being split once per parameter).

> +{
> +    BDRVQuorumState *s = bs->opaque;
> +    Error *local_err = NULL;
> +    QemuOpts *opts;
> +    bool *opened;
> +    QDict *sub = NULL;
> +    QList *list = NULL;
> +    const QListEntry *lentry;
> +    const QDictEntry *dentry;
> +    int i;
> +    int ret = 0;
> +
> +    qdict_flatten(options);
> +    qdict_extract_subqdict(options, &sub, "children.");
> +    qdict_array_split(sub, &list);
> +
> +    /* count how many different children are present and validate
> +     * qdict_size(sub) address the open by reference case
> +     */
> +    s->num_children = !qlist_size(list) ? qdict_size(sub) : qlist_size(list);

Okay, so eventually we probably want to add these values in order to 
allow "mixed" parameters (some references, some "full" specifications). 
But for that to work without much hassle, qdict_array_split() still 
needs some extension (or be fixed?) to split not just QDicts but other 
objects as well. I think we (probably I) should amend 
qdict_array_split() first (perhaps add some boolean parameter to specify 
what it should do) and then fix this (that is, make mixed parameters 
work and allow detection of malformed parameters).

For this series, this is fine.

> +    if (s->num_children < 2) {
> +        error_setg(&local_err,
> +                   "Number of provided children must be greater than 1");
> +        ret = -EINVAL;
> +        goto exit;
> +    }
> +
> +    opts = qemu_opts_create(&quorum_runtime_opts, NULL, 0, &error_abort);
> +    qemu_opts_absorb_qdict(opts, options, &local_err);
> +    if (error_is_set(&local_err)) {
> +        ret = -EINVAL;
> +        goto exit;
> +    }
> +
> +    s->threshold = qemu_opt_get_number(opts, QUORUM_OPT_VOTE_THRESHOLD, 0);
> +
> +    /* and validate it against s->num_children */
> +    ret = quorum_valid_threshold(s->threshold, s->num_children, &local_err);
> +    if (ret < 0) {
> +        goto exit;
> +    }
> +
> +    /* is the driver in blkverify mode */
> +    if (qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false) &&
> +        s->num_children == 2 && s->threshold == 2) {

If qemu_opt_get_bool() returns true but the other conditions aren't met, 
I'd prefer, again, a warning.

> +        s->is_blkverify = true;
> +    }
> +
> +    /* allocate the children BlockDriverState array */
> +    s->bs = g_new0(BlockDriverState *, s->num_children);
> +    opened = g_new0(bool, s->num_children);
> +
> +    /* Open by file name or options dict (command line or QMP) */
> +    if (s->num_children == qlist_size(list)) {
> +        for (i = 0, lentry = qlist_first(list); lentry;
> +            lentry = qlist_next(lentry), i++) {
> +            QDict *d = qobject_to_qdict(lentry->value);
> +            QINCREF(d);
> +            ret = bdrv_open(&s->bs[i], NULL, NULL,
> +                            d,
> +                            flags, NULL, &local_err);

This actually fits on a single line (sorry for my obsession with this).

> +            if (ret < 0) {
> +                goto close_exit;
> +            }
> +            opened[i] = true;
> +        }
> +    /* Open by QMP references */
> +    } else {
> +        for (i = 0, dentry = qdict_first(sub); dentry;
> +             dentry = qdict_next(sub, dentry), i++) {
> +            QString *string = qobject_to_qstring(dentry->value);
> +            ret = bdrv_open(&s->bs[i], NULL,
> +                            qstring_get_str(string),
> +                            NULL, flags, NULL, &local_err);

This takes two.

> +            if (ret < 0) {
> +                goto close_exit;
> +            }
> +            opened[i] = true;
> +        }
> +    }
> +
> +    g_free(opened);
> +    goto exit;
> +
> +close_exit:
> +    /* cleanup on error */
> +    for (i = 0; i < s->num_children; i++) {
> +        if (!opened[i]) {
> +            continue;
> +        }
> +        bdrv_unref(s->bs[i]);
> +    }
> +    g_free(s->bs);
> +    g_free(opened);
> +exit:
> +    /* propagate error */
> +    if (error_is_set(&local_err)) {
> +        error_propagate(errp, local_err);
> +    }
> +    QDECREF(list);
> +    QDECREF(sub);
> +    return ret;
> +}
> +
> +static void quorum_close(BlockDriverState *bs)
> +{
> +    BDRVQuorumState *s = bs->opaque;
> +    int i;
> +
> +    for (i = 0; i < s->num_children; i++) {
> +        bdrv_unref(s->bs[i]);
> +    }
> +
> +    g_free(s->bs);
> +}
> +
>   static BlockDriver bdrv_quorum = {
>       .format_name        = "quorum",
>       .protocol_name      = "quorum",
>   
>       .instance_size      = sizeof(BDRVQuorumState),
>   
> +    .bdrv_file_open     = quorum_open,
> +    .bdrv_close         = quorum_close,
> +
> +    .authorizations     = { true, true },
> +
>       .bdrv_co_flush_to_disk = quorum_co_flush,
>   
>       .bdrv_getlength     = quorum_getlength,
> diff --git a/monitor.c b/monitor.c
> index 81ffa0f..8b175fb 100644
> --- a/monitor.c
> +++ b/monitor.c
> @@ -639,6 +639,8 @@ static void monitor_protocol_event_init(void)
>       monitor_protocol_event_throttle(QEVENT_RTC_CHANGE, 1000);
>       monitor_protocol_event_throttle(QEVENT_BALLOON_CHANGE, 1000);
>       monitor_protocol_event_throttle(QEVENT_WATCHDOG, 1000);
> +    monitor_protocol_event_throttle(QEVENT_QUORUM_REPORT_BAD, 1000);
> +    monitor_protocol_event_throttle(QEVENT_QUORUM_FAILURE, 1000);

Hm, what is this for?

Max

>   }
>   
>   /**
> diff --git a/qapi-schema.json b/qapi-schema.json
> index 7cfb5e5..990d0c5 100644
> --- a/qapi-schema.json
> +++ b/qapi-schema.json
> @@ -4352,6 +4352,24 @@
>               'raw': 'BlockdevRef' } }
>   
>   ##
> +# @BlockdevOptionsQuorum
> +#
> +# Driver specific block device options for Quorum
> +#
> +# @blkverify:      #optional true if the driver must print content mismatch
> +#
> +# @children:       the children block device to use
> +#
> +# @vote_threshold: the vote limit under which a read will fail
> +#
> +# Since: 2.0
> +##
> +{ 'type': 'BlockdevOptionsQuorum',
> +  'data': { '*blkverify': 'bool',
> +            'children': [ 'BlockdevRef' ],
> +            'vote-threshold': 'int' } }
> +
> +##
>   # @BlockdevOptions
>   #
>   # Options for creating a block device.
> @@ -4390,7 +4408,8 @@
>         'vdi':        'BlockdevOptionsGenericFormat',
>         'vhdx':       'BlockdevOptionsGenericFormat',
>         'vmdk':       'BlockdevOptionsGenericCOWFormat',
> -      'vpc':        'BlockdevOptionsGenericFormat'
> +      'vpc':        'BlockdevOptionsGenericFormat',
> +      'quorum':     'BlockdevOptionsQuorum'
>     } }
>   
>   ##

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH V17 00/12] quorum block filter
  2014-02-15  1:15 ` [Qemu-devel] [PATCH V17 00/12] quorum block filter Max Reitz
@ 2014-02-15  2:04   ` Max Reitz
  0 siblings, 0 replies; 27+ messages in thread
From: Max Reitz @ 2014-02-15  2:04 UTC (permalink / raw)
  To: Benoît Canet, qemu-devel; +Cc: kwolf, famz, stefanha

On 15.02.2014 02:15, Max Reitz wrote:
> On 12.02.2014 23:06, Benoît Canet wrote:
>> I post this for review in prevision of 2.0 feature freeze.
>> Even if the series look correct please wait before merging because:
>>
>> The QMP events in the "Add quorum mechanism" definitively needs to be 
>> reviewed
>> by Eric as they where changed.
>>
>> I did not found any bugs while testing this version but I am willing 
>> to test the
>> code further before it's applied even it's reviewed by.
>>
>> Best regards
>>
>> Benoît
>
> Just one thing in general: You left my reviewed-by note on many 
> patches of the series although basically every single patch has 
> changed (some more, some less). This makes it harder for me to review 
> (okay, not really; I can (and should) still compare with the old 
> versions for the actual changes, but if I fail to do so...) and can 
> cause some patches to be merged because "I reviewed it" (although I 
> actually didn't) - probably not in this case since it's hard here to 
> pick single patches to merge, but well...
>
> Max

The reviewed-by for all the patches I didn't reply to (7, 8, 10, 12) is 
okay and I'm hereby renewing it.

Max

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH V17 05/12] quorum: Add quorum_aio_readv.
  2014-02-15  1:40   ` Max Reitz
@ 2014-02-17 10:57     ` Kevin Wolf
  0 siblings, 0 replies; 27+ messages in thread
From: Kevin Wolf @ 2014-02-17 10:57 UTC (permalink / raw)
  To: Max Reitz
  Cc: Benoît Canet, famz, Benoît Canet, qemu-devel, stefanha

Am 15.02.2014 um 02:40 hat Max Reitz geschrieben:
> On 12.02.2014 23:06, Benoît Canet wrote:
> >From: Benoît Canet <benoit@irqsave.net>
> >
> >Add code to do num_children reads in parallel and cleanup the structures
> >afterward.
> 
> "afterwards"
> 
> >Signed-off-by: Benoit Canet <benoit@irqsave.net>
> >Reviewed-by: Max Reitz <mreitz@redhat.com>
> >---
> >  block/quorum.c | 39 ++++++++++++++++++++++++++++++++++++++-
> >  1 file changed, 38 insertions(+), 1 deletion(-)
> >
> >diff --git a/block/quorum.c b/block/quorum.c
> >index 197cdca..c7a5d79 100644
> >--- a/block/quorum.c
> >+++ b/block/quorum.c
> >@@ -91,10 +91,18 @@ static AIOCBInfo quorum_aiocb_info = {
> >  static void quorum_aio_finalize(QuorumAIOCB *acb)
> >  {
> >-    int ret = 0;
> >+    BDRVQuorumState *s = acb->common.bs->opaque;
> >+    int i, ret = 0;
> >      acb->common.cb(acb->common.opaque, ret);
> >+    if (acb->is_read) {
> >+        for (i = 0; i < s->num_children; i++) {
> >+            qemu_vfree(acb->qcrs[i].buf);
> >+            qemu_iovec_destroy(&acb->qcrs[i].qiov);
> >+        }
> >+    }
> >+
> >      g_free(acb->qcrs);
> >      qemu_aio_release(acb);
> >  }
> >@@ -149,6 +157,34 @@ static void quorum_aio_cb(void *opaque, int ret)
> >      quorum_aio_finalize(acb);
> >  }
> >+static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
> >+                                         int64_t sector_num,
> >+                                         QEMUIOVector *qiov,
> >+                                         int nb_sectors,
> >+                                         BlockDriverCompletionFunc *cb,
> >+                                         void *opaque)
> >+{
> >+    BDRVQuorumState *s = bs->opaque;
> >+    QuorumAIOCB *acb = quorum_aio_get(s, bs, qiov, sector_num,
> >+                                      nb_sectors, cb, opaque);
> >+    int i;
> >+
> >+    acb->is_read = true;
> >+
> >+    for (i = 0; i < s->num_children; i++) {
> >+        acb->qcrs[i].buf = qemu_blockalign(s->bs[i], qiov->size);
> >+        qemu_iovec_init(&acb->qcrs[i].qiov, qiov->niov);
> >+        qemu_iovec_clone(&acb->qcrs[i].qiov, qiov, acb->qcrs[i].buf);
> >+    }
> >+
> >+    for (i = 0; i < s->num_children; i++) {
> >+        bdrv_aio_readv(s->bs[i], sector_num, &acb->qcrs[i].qiov, nb_sectors,
> >+                       quorum_aio_cb, &acb->qcrs[i]);
> 
> I know Kevin told you to, but using the child's own QIOV here means
> quorum_aio_readv() won't work after this patch but only after patch
> 6. Just pointing it out, I don't like it, but Kevin told you to, so:

Sorry, but this is the entirely wrong attitude. Just because I am a
maintainer that doesn't mean that I don't get things wrong. When I talk
bullshit (and probably I do more often than others), just point it out.

In this case, I didn't even tell him to do the change, but I asked in
patch 6:

> Why don't you do this from the beginning?

The answer appears to be that the read data isn't copied back to the
original qiov yet, but this happens only in the quorum_copy_qiov() calls
in quorum_vote().

So this is a bug in this patch, which can be fixed either by reverting
to the old state of using the original qiov here, or (perhaps more
nicely) by introducing quorum_copy_qiov() already in this patch.

Kevin

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH V17 11/12] quorum: Add quorum_open() and quorum_close().
  2014-02-13  9:09   ` Benoît Canet
@ 2014-02-18  4:06     ` Fam Zheng
  2014-02-18 12:31       ` Benoît Canet
  0 siblings, 1 reply; 27+ messages in thread
From: Fam Zheng @ 2014-02-18  4:06 UTC (permalink / raw)
  To: Benoît Canet; +Cc: kwolf, qemu-devel, stefanha, mreitz

On Thu, 02/13 10:09, Benoît Canet wrote:
> The Wednesday 12 Feb 2014 à 23:06:38 (+0100), Benoît Canet wrote :
> > +static void quorum_close(BlockDriverState *bs)
> > +{
> > +    BDRVQuorumState *s = bs->opaque;
> > +    int i;
> > +
> > +    for (i = 0; i < s->num_children; i++) {
> > +        bdrv_unref(s->bs[i]);
> Quorum crash here from time to time I don't understand why.

I think you could add printf or use gdb to examine every bdrv_unref on the
crashing bs, so you can track down to the unbalanced reference.

Fam

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH V17 11/12] quorum: Add quorum_open() and quorum_close().
  2014-02-18  4:06     ` Fam Zheng
@ 2014-02-18 12:31       ` Benoît Canet
  0 siblings, 0 replies; 27+ messages in thread
From: Benoît Canet @ 2014-02-18 12:31 UTC (permalink / raw)
  To: Fam Zheng; +Cc: Benoît Canet, kwolf, qemu-devel, mreitz, stefanha

The Tuesday 18 Feb 2014 à 12:06:57 (+0800), Fam Zheng wrote :
> On Thu, 02/13 10:09, Benoît Canet wrote:
> > The Wednesday 12 Feb 2014 à 23:06:38 (+0100), Benoît Canet wrote :
> > > +static void quorum_close(BlockDriverState *bs)
> > > +{
> > > +    BDRVQuorumState *s = bs->opaque;
> > > +    int i;
> > > +
> > > +    for (i = 0; i < s->num_children; i++) {
> > > +        bdrv_unref(s->bs[i]);
> > Quorum crash here from time to time I don't understand why.
> 
> I think you could add printf or use gdb to examine every bdrv_unref on the
> crashing bs, so you can track down to the unbalanced reference.

I found the cause of the crash since: I added a spurious QDECREF() in the
external snapshot prepare function.
It's fixed now.

Thanks,

Benoît

> 
> Fam

^ permalink raw reply	[flat|nested] 27+ messages in thread

end of thread, other threads:[~2014-02-18 12:31 UTC | newest]

Thread overview: 27+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2014-02-12 22:06 [Qemu-devel] [PATCH V17 00/12] quorum block filter Benoît Canet
2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 01/12] quorum: Create quorum.c, add QuorumChildRequest and QuorumAIOCB Benoît Canet
2014-02-15  1:17   ` Max Reitz
2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 02/12] quorum: Create BDRVQuorumState and BlkDriver and do init Benoît Canet
2014-02-15  1:22   ` Max Reitz
2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 03/12] quorum: Add quorum_aio_writev and its dependencies Benoît Canet
2014-02-15  1:31   ` Max Reitz
2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 04/12] blkverify: Extract qemu_iovec_clone() and qemu_iovec_compare() from blkverify Benoît Canet
2014-02-15  1:36   ` Max Reitz
2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 05/12] quorum: Add quorum_aio_readv Benoît Canet
2014-02-15  1:40   ` Max Reitz
2014-02-17 10:57     ` Kevin Wolf
2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 06/12] quorum: Add quorum mechanism Benoît Canet
2014-02-15  1:53   ` Max Reitz
2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 07/12] quorum: Add quorum_getlength() Benoît Canet
2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 08/12] quorum: Add quorum_invalidate_cache() Benoît Canet
2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 09/12] quorum: Add quorum_co_flush() Benoît Canet
2014-02-15  1:54   ` Max Reitz
2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 10/12] quorum: Implement recursive .bdrv_recurse_is_first_non_filter in quorum Benoît Canet
2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 11/12] quorum: Add quorum_open() and quorum_close() Benoît Canet
2014-02-13  9:09   ` Benoît Canet
2014-02-18  4:06     ` Fam Zheng
2014-02-18 12:31       ` Benoît Canet
2014-02-15  2:02   ` Max Reitz
2014-02-12 22:06 ` [Qemu-devel] [PATCH V17 12/12] quorum: Add unit test Benoît Canet
2014-02-15  1:15 ` [Qemu-devel] [PATCH V17 00/12] quorum block filter Max Reitz
2014-02-15  2:04   ` Max Reitz

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.