All of lore.kernel.org
 help / color / mirror / Atom feed
* [Qemu-devel] [PATCH V15 00/13] Quorum block filter
@ 2014-02-03 21:51 Benoît Canet
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 01/13] quorum: Create quorum.c, add QuorumSingleAIOCB and QuorumAIOCB Benoît Canet
                   ` (12 more replies)
  0 siblings, 13 replies; 29+ messages in thread
From: Benoît Canet @ 2014-02-03 21:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, Benoît Canet, stefanha, mreitz

in v15:
    remove now uneeded ret = s->threshold <= acb->success_count ? 0 : quorum_vote_error(acb);  in quorum_aio_finalize
    Change flush error handling [Max]
    s/againt/against/ [Max]
    add \n in fprintf [Max]
    get rid of quorum_get_first_error from the git history [Benoît]
    use if instead of conditions in the for loop [Max]

    qdict_flatten don't work for QMP references (I tried)

    need max review of writev patch and quorum patch

Benoît Canet (13):
  quorum: Create quorum.c, add QuorumSingleAIOCB and QuorumAIOCB.
  quorum: Create BDRVQuorumState and BlkDriver and do init.
  quorum: Add quorum_aio_writev and its dependencies.
  blkverify: Extract qemu_iovec_clone() and qemu_iovec_compare() from
    blkverify.
  quorum: Add quorum_aio_readv.
  quorum: Add quorum mechanism.
  quorum: Add quorum_getlength().
  quorum: Add quorum_invalidate_cache().
  quorum: Add quorum_co_get_block_status.
  quorum: Add quorum_co_flush().
  quorum: Implement recursive .bdrv_recurse_is_first_non_filter in
    quorum.
  quorum: Add quorum_open() and quorum_close().
  quorum: Add unit test.

 block/Makefile.objs        |   1 +
 block/blkverify.c          | 108 +-----
 block/quorum.c             | 909 +++++++++++++++++++++++++++++++++++++++++++++
 configure                  |  36 ++
 docs/qmp/qmp-events.txt    |  33 ++
 include/monitor/monitor.h  |   2 +
 include/qemu-common.h      |   2 +
 monitor.c                  |   2 +
 qapi-schema.json           |  21 +-
 tests/qemu-iotests/075     |  95 +++++
 tests/qemu-iotests/075.out |  34 ++
 tests/qemu-iotests/group   |   1 +
 util/iov.c                 | 103 +++++
 13 files changed, 1240 insertions(+), 107 deletions(-)
 create mode 100644 block/quorum.c
 create mode 100755 tests/qemu-iotests/075
 create mode 100644 tests/qemu-iotests/075.out

--
1.8.3.2

^ permalink raw reply	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH V15 01/13] quorum: Create quorum.c, add QuorumSingleAIOCB and QuorumAIOCB.
  2014-02-03 21:51 [Qemu-devel] [PATCH V15 00/13] Quorum block filter Benoît Canet
@ 2014-02-03 21:51 ` Benoît Canet
  2014-02-04 12:57   ` Kevin Wolf
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 02/13] quorum: Create BDRVQuorumState and BlkDriver and do init Benoît Canet
                   ` (11 subsequent siblings)
  12 siblings, 1 reply; 29+ messages in thread
From: Benoît Canet @ 2014-02-03 21:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, Benoît Canet, stefanha, mreitz

From: Benoît Canet <benoit@irqsave.net>

Signed-off-by: Benoit Canet <benoit@irqsave.net>
Reviewed-by: Max Reitz <mreitz@redhat.com>
---
 block/Makefile.objs |  1 +
 block/quorum.c      | 54 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 55 insertions(+)
 create mode 100644 block/quorum.c

diff --git a/block/Makefile.objs b/block/Makefile.objs
index 4e8c91e..a2650b9 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -3,6 +3,7 @@ block-obj-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-c
 block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
 block-obj-y += qed-check.o
 block-obj-$(CONFIG_VHDX) += vhdx.o vhdx-endian.o vhdx-log.o
+block-obj-y += quorum.o
 block-obj-y += parallels.o blkdebug.o blkverify.o
 block-obj-y += snapshot.o qapi.o
 block-obj-$(CONFIG_WIN32) += raw-win32.o win32-aio.o
diff --git a/block/quorum.c b/block/quorum.c
new file mode 100644
index 0000000..17695d6
--- /dev/null
+++ b/block/quorum.c
@@ -0,0 +1,54 @@
+/*
+ * Quorum Block filter
+ *
+ * Copyright (C) 2012-2014 Nodalink, EURL.
+ *
+ * Author:
+ *   Benoît Canet <benoit.canet@irqsave.net>
+ *
+ * Based on the design and code of blkverify.c (Copyright (C) 2010 IBM, Corp)
+ * and blkmirror.c (Copyright (C) 2011 Red Hat, Inc).
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+
+#include "block/block_int.h"
+
+typedef struct QuorumAIOCB QuorumAIOCB;
+
+/* Quorum will create one instance of the following structure per operation it
+ * performs on its children.
+ * So for each read/write operation coming from the upper layer there will be
+ * $children_count QuorumSingleAIOCB.
+ */
+typedef struct QuorumSingleAIOCB {
+    BlockDriverAIOCB *aiocb;
+    QEMUIOVector qiov;
+    uint8_t *buf;
+    int ret;
+    QuorumAIOCB *parent;
+} QuorumSingleAIOCB;
+
+/* Quorum will use the following structure to track progress of each read/write
+ * operation received by the upper layer.
+ * This structure hold pointers to the QuorumSingleAIOCB structures instances
+ * used to do operations on each children and track overall progress.
+ */
+struct QuorumAIOCB {
+    BlockDriverAIOCB common;
+
+    /* Request metadata */
+    uint64_t sector_num;
+    int nb_sectors;
+
+    QEMUIOVector *qiov;         /* calling IOV */
+
+    QuorumSingleAIOCB *aios;    /* individual AIOs */
+    int count;                  /* number of completed AIOCB */
+    int success_count;          /* number of successfully completed AIOCB */
+    bool *finished;             /* completion signal for cancel */
+
+    bool is_read;
+    int vote_ret;
+};
-- 
1.8.3.2

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH V15 02/13] quorum: Create BDRVQuorumState and BlkDriver and do init.
  2014-02-03 21:51 [Qemu-devel] [PATCH V15 00/13] Quorum block filter Benoît Canet
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 01/13] quorum: Create quorum.c, add QuorumSingleAIOCB and QuorumAIOCB Benoît Canet
@ 2014-02-03 21:51 ` Benoît Canet
  2014-02-04 13:08   ` Kevin Wolf
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 03/13] quorum: Add quorum_aio_writev and its dependencies Benoît Canet
                   ` (10 subsequent siblings)
  12 siblings, 1 reply; 29+ messages in thread
From: Benoît Canet @ 2014-02-03 21:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, Benoît Canet, stefanha, mreitz

From: Benoît Canet <benoit@irqsave.net>

Signed-off-by: Benoit Canet <benoit@irqsave.net>
Reviewed-by: Max Reitz <mreitz@redhat.com>
---
 block/quorum.c | 25 +++++++++++++++++++++++++
 1 file changed, 25 insertions(+)

diff --git a/block/quorum.c b/block/quorum.c
index 17695d6..157efdf 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -15,6 +15,16 @@
 
 #include "block/block_int.h"
 
+/* the following structure holds the state of one quorum instance */
+typedef struct {
+    BlockDriverState **bs; /* children BlockDriverStates */
+    int total;             /* children count */
+    int threshold;         /* if less than threshold children reads gave the
+                            * same result a quorum error occurs.
+                            */
+    bool is_blkverify;     /* true if the driver is in blkverify mode */
+} BDRVQuorumState;
+
 typedef struct QuorumAIOCB QuorumAIOCB;
 
 /* Quorum will create one instance of the following structure per operation it
@@ -37,6 +47,7 @@ typedef struct QuorumSingleAIOCB {
  */
 struct QuorumAIOCB {
     BlockDriverAIOCB common;
+    BDRVQuorumState *bqs;
 
     /* Request metadata */
     uint64_t sector_num;
@@ -52,3 +63,17 @@ struct QuorumAIOCB {
     bool is_read;
     int vote_ret;
 };
+
+static BlockDriver bdrv_quorum = {
+    .format_name        = "quorum",
+    .protocol_name      = "quorum",
+
+    .instance_size      = sizeof(BDRVQuorumState),
+};
+
+static void bdrv_quorum_init(void)
+{
+    bdrv_register(&bdrv_quorum);
+}
+
+block_init(bdrv_quorum_init);
-- 
1.8.3.2

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH V15 03/13] quorum: Add quorum_aio_writev and its dependencies.
  2014-02-03 21:51 [Qemu-devel] [PATCH V15 00/13] Quorum block filter Benoît Canet
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 01/13] quorum: Create quorum.c, add QuorumSingleAIOCB and QuorumAIOCB Benoît Canet
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 02/13] quorum: Create BDRVQuorumState and BlkDriver and do init Benoît Canet
@ 2014-02-03 21:51 ` Benoît Canet
  2014-02-04 13:57   ` Kevin Wolf
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 04/13] blkverify: Extract qemu_iovec_clone() and qemu_iovec_compare() from blkverify Benoît Canet
                   ` (9 subsequent siblings)
  12 siblings, 1 reply; 29+ messages in thread
From: Benoît Canet @ 2014-02-03 21:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, Benoît Canet, stefanha, mreitz

From: Benoît Canet <benoit@irqsave.net>

Signed-off-by: Benoit Canet <benoit@irqsave.net>
---
 block/quorum.c | 104 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 104 insertions(+)

diff --git a/block/quorum.c b/block/quorum.c
index 157efdf..81bffdd 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -64,11 +64,115 @@ struct QuorumAIOCB {
     int vote_ret;
 };
 
+static void quorum_aio_cancel(BlockDriverAIOCB *blockacb)
+{
+    QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common);
+    BDRVQuorumState *s = acb->bqs;
+    int i;
+
+    /* cancel all callback */
+    for (i = 0; i < s->total; i++) {
+        bdrv_aio_cancel(acb->aios[i].aiocb);
+    }
+}
+
+static AIOCBInfo quorum_aiocb_info = {
+    .aiocb_size         = sizeof(QuorumAIOCB),
+    .cancel             = quorum_aio_cancel,
+};
+
+static void quorum_aio_finalize(QuorumAIOCB *acb)
+{
+    BDRVQuorumState *s = acb->bqs;
+    int ret = 0;
+
+    acb->common.cb(acb->common.opaque, ret);
+    if (acb->finished) {
+        *acb->finished = true;
+    }
+    g_free(acb->aios);
+    qemu_aio_release(acb);
+}
+
+static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
+                                   BlockDriverState *bs,
+                                   QEMUIOVector *qiov,
+                                   uint64_t sector_num,
+                                   int nb_sectors,
+                                   BlockDriverCompletionFunc *cb,
+                                   void *opaque)
+{
+    QuorumAIOCB *acb = qemu_aio_get(&quorum_aiocb_info, bs, cb, opaque);
+    int i;
+
+    acb->bqs = s;
+    acb->sector_num = sector_num;
+    acb->nb_sectors = nb_sectors;
+    acb->qiov = qiov;
+    acb->aios = g_new0(QuorumSingleAIOCB, s->total);
+    acb->count = 0;
+    acb->success_count = 0;
+    acb->finished = NULL;
+    acb->is_read = false;
+    acb->vote_ret = 0;
+
+    for (i = 0; i < s->total; i++) {
+        acb->aios[i].buf = NULL;
+        acb->aios[i].ret = 0;
+        acb->aios[i].parent = acb;
+    }
+
+    return acb;
+}
+
+static void quorum_aio_cb(void *opaque, int ret)
+{
+    QuorumSingleAIOCB *sacb = opaque;
+    QuorumAIOCB *acb = sacb->parent;
+    BDRVQuorumState *s = acb->bqs;
+
+    sacb->ret = ret;
+    acb->count++;
+    if (ret == 0) {
+        acb->success_count++;
+    }
+    assert(acb->count <= s->total);
+    assert(acb->success_count <= s->total);
+    if (acb->count < s->total) {
+        return;
+    }
+
+    quorum_aio_finalize(acb);
+}
+
+static BlockDriverAIOCB *quorum_aio_writev(BlockDriverState *bs,
+                                          int64_t sector_num,
+                                          QEMUIOVector *qiov,
+                                          int nb_sectors,
+                                          BlockDriverCompletionFunc *cb,
+                                          void *opaque)
+{
+    BDRVQuorumState *s = bs->opaque;
+    QuorumAIOCB *acb = quorum_aio_get(s, bs, qiov, sector_num, nb_sectors,
+                                      cb, opaque);
+    int i;
+
+    for (i = 0; i < s->total; i++) {
+        acb->aios[i].aiocb = bdrv_aio_writev(s->bs[i], sector_num, qiov,
+                                             nb_sectors, &quorum_aio_cb,
+                                             &acb->aios[i]);
+    }
+
+    return &acb->common;
+}
+
 static BlockDriver bdrv_quorum = {
     .format_name        = "quorum",
     .protocol_name      = "quorum",
 
     .instance_size      = sizeof(BDRVQuorumState),
+
+    .bdrv_aio_writev    = quorum_aio_writev,
 };
 
 static void bdrv_quorum_init(void)
-- 
1.8.3.2

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH V15 04/13] blkverify: Extract qemu_iovec_clone() and qemu_iovec_compare() from blkverify.
  2014-02-03 21:51 [Qemu-devel] [PATCH V15 00/13] Quorum block filter Benoît Canet
                   ` (2 preceding siblings ...)
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 03/13] quorum: Add quorum_aio_writev and its dependencies Benoît Canet
@ 2014-02-03 21:51 ` Benoît Canet
  2014-02-04 14:04   ` Kevin Wolf
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 05/13] quorum: Add quorum_aio_readv Benoît Canet
                   ` (8 subsequent siblings)
  12 siblings, 1 reply; 29+ messages in thread
From: Benoît Canet @ 2014-02-03 21:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, Benoît Canet, stefanha, mreitz

From: Benoît Canet <benoit@irqsave.net>

Signed-off-by: Benoit Canet <benoit@irqsave.net>
Reviewed-by: Max Reitz <mreitz@redhat.com>
---
 block/blkverify.c     | 108 +-------------------------------------------------
 include/qemu-common.h |   2 +
 util/iov.c            | 103 +++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 107 insertions(+), 106 deletions(-)

diff --git a/block/blkverify.c b/block/blkverify.c
index fe94b59..91b638d 100644
--- a/block/blkverify.c
+++ b/block/blkverify.c
@@ -171,110 +171,6 @@ static int64_t blkverify_getlength(BlockDriverState *bs)
     return bdrv_getlength(s->test_file);
 }
 
-/**
- * Check that I/O vector contents are identical
- *
- * @a:          I/O vector
- * @b:          I/O vector
- * @ret:        Offset to first mismatching byte or -1 if match
- */
-static ssize_t blkverify_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
-{
-    int i;
-    ssize_t offset = 0;
-
-    assert(a->niov == b->niov);
-    for (i = 0; i < a->niov; i++) {
-        size_t len = 0;
-        uint8_t *p = (uint8_t *)a->iov[i].iov_base;
-        uint8_t *q = (uint8_t *)b->iov[i].iov_base;
-
-        assert(a->iov[i].iov_len == b->iov[i].iov_len);
-        while (len < a->iov[i].iov_len && *p++ == *q++) {
-            len++;
-        }
-
-        offset += len;
-
-        if (len != a->iov[i].iov_len) {
-            return offset;
-        }
-    }
-    return -1;
-}
-
-typedef struct {
-    int src_index;
-    struct iovec *src_iov;
-    void *dest_base;
-} IOVectorSortElem;
-
-static int sortelem_cmp_src_base(const void *a, const void *b)
-{
-    const IOVectorSortElem *elem_a = a;
-    const IOVectorSortElem *elem_b = b;
-
-    /* Don't overflow */
-    if (elem_a->src_iov->iov_base < elem_b->src_iov->iov_base) {
-        return -1;
-    } else if (elem_a->src_iov->iov_base > elem_b->src_iov->iov_base) {
-        return 1;
-    } else {
-        return 0;
-    }
-}
-
-static int sortelem_cmp_src_index(const void *a, const void *b)
-{
-    const IOVectorSortElem *elem_a = a;
-    const IOVectorSortElem *elem_b = b;
-
-    return elem_a->src_index - elem_b->src_index;
-}
-
-/**
- * Copy contents of I/O vector
- *
- * The relative relationships of overlapping iovecs are preserved.  This is
- * necessary to ensure identical semantics in the cloned I/O vector.
- */
-static void blkverify_iovec_clone(QEMUIOVector *dest, const QEMUIOVector *src,
-                                  void *buf)
-{
-    IOVectorSortElem sortelems[src->niov];
-    void *last_end;
-    int i;
-
-    /* Sort by source iovecs by base address */
-    for (i = 0; i < src->niov; i++) {
-        sortelems[i].src_index = i;
-        sortelems[i].src_iov = &src->iov[i];
-    }
-    qsort(sortelems, src->niov, sizeof(sortelems[0]), sortelem_cmp_src_base);
-
-    /* Allocate buffer space taking into account overlapping iovecs */
-    last_end = NULL;
-    for (i = 0; i < src->niov; i++) {
-        struct iovec *cur = sortelems[i].src_iov;
-        ptrdiff_t rewind = 0;
-
-        /* Detect overlap */
-        if (last_end && last_end > cur->iov_base) {
-            rewind = last_end - cur->iov_base;
-        }
-
-        sortelems[i].dest_base = buf - rewind;
-        buf += cur->iov_len - MIN(rewind, cur->iov_len);
-        last_end = MAX(cur->iov_base + cur->iov_len, last_end);
-    }
-
-    /* Sort by source iovec index and build destination iovec */
-    qsort(sortelems, src->niov, sizeof(sortelems[0]), sortelem_cmp_src_index);
-    for (i = 0; i < src->niov; i++) {
-        qemu_iovec_add(dest, sortelems[i].dest_base, src->iov[i].iov_len);
-    }
-}
-
 static BlkverifyAIOCB *blkverify_aio_get(BlockDriverState *bs, bool is_write,
                                          int64_t sector_num, QEMUIOVector *qiov,
                                          int nb_sectors,
@@ -338,7 +234,7 @@ static void blkverify_aio_cb(void *opaque, int ret)
 
 static void blkverify_verify_readv(BlkverifyAIOCB *acb)
 {
-    ssize_t offset = blkverify_iovec_compare(acb->qiov, &acb->raw_qiov);
+    ssize_t offset = qemu_iovec_compare(acb->qiov, &acb->raw_qiov);
     if (offset != -1) {
         blkverify_err(acb, "contents mismatch in sector %" PRId64,
                       acb->sector_num + (int64_t)(offset / BDRV_SECTOR_SIZE));
@@ -356,7 +252,7 @@ static BlockDriverAIOCB *blkverify_aio_readv(BlockDriverState *bs,
     acb->verify = blkverify_verify_readv;
     acb->buf = qemu_blockalign(bs->file, qiov->size);
     qemu_iovec_init(&acb->raw_qiov, acb->qiov->niov);
-    blkverify_iovec_clone(&acb->raw_qiov, qiov, acb->buf);
+    qemu_iovec_clone(&acb->raw_qiov, qiov, acb->buf);
 
     bdrv_aio_readv(s->test_file, sector_num, qiov, nb_sectors,
                    blkverify_aio_cb, acb);
diff --git a/include/qemu-common.h b/include/qemu-common.h
index 5054836..d70783e 100644
--- a/include/qemu-common.h
+++ b/include/qemu-common.h
@@ -346,6 +346,8 @@ size_t qemu_iovec_from_buf(QEMUIOVector *qiov, size_t offset,
                            const void *buf, size_t bytes);
 size_t qemu_iovec_memset(QEMUIOVector *qiov, size_t offset,
                          int fillc, size_t bytes);
+ssize_t qemu_iovec_compare(QEMUIOVector *a, QEMUIOVector *b);
+void qemu_iovec_clone(QEMUIOVector *dest, const QEMUIOVector *src, void *buf);
 
 bool buffer_is_zero(const void *buf, size_t len);
 
diff --git a/util/iov.c b/util/iov.c
index bb46c04..f869fb9 100644
--- a/util/iov.c
+++ b/util/iov.c
@@ -378,6 +378,109 @@ size_t qemu_iovec_memset(QEMUIOVector *qiov, size_t offset,
     return iov_memset(qiov->iov, qiov->niov, offset, fillc, bytes);
 }
 
+/**
+ * Check that I/O vector contents are identical
+ *
+ * @a:          I/O vector
+ * @b:          I/O vector
+ * @ret:        Offset to first mismatching byte or -1 if match
+ */
+ssize_t qemu_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
+{
+    int i;
+    ssize_t offset = 0;
+
+    assert(a->niov == b->niov);
+    for (i = 0; i < a->niov; i++) {
+        size_t len = 0;
+        uint8_t *p = (uint8_t *)a->iov[i].iov_base;
+        uint8_t *q = (uint8_t *)b->iov[i].iov_base;
+
+        assert(a->iov[i].iov_len == b->iov[i].iov_len);
+        while (len < a->iov[i].iov_len && *p++ == *q++) {
+            len++;
+        }
+
+        offset += len;
+
+        if (len != a->iov[i].iov_len) {
+            return offset;
+        }
+    }
+    return -1;
+}
+
+typedef struct {
+    int src_index;
+    struct iovec *src_iov;
+    void *dest_base;
+} IOVectorSortElem;
+
+static int sortelem_cmp_src_base(const void *a, const void *b)
+{
+    const IOVectorSortElem *elem_a = a;
+    const IOVectorSortElem *elem_b = b;
+
+    /* Don't overflow */
+    if (elem_a->src_iov->iov_base < elem_b->src_iov->iov_base) {
+        return -1;
+    } else if (elem_a->src_iov->iov_base > elem_b->src_iov->iov_base) {
+        return 1;
+    } else {
+        return 0;
+    }
+}
+
+static int sortelem_cmp_src_index(const void *a, const void *b)
+{
+    const IOVectorSortElem *elem_a = a;
+    const IOVectorSortElem *elem_b = b;
+
+    return elem_a->src_index - elem_b->src_index;
+}
+
+/**
+ * Copy contents of I/O vector
+ *
+ * The relative relationships of overlapping iovecs are preserved.  This is
+ * necessary to ensure identical semantics in the cloned I/O vector.
+ */
+void qemu_iovec_clone(QEMUIOVector *dest, const QEMUIOVector *src, void *buf)
+{
+    IOVectorSortElem sortelems[src->niov];
+    void *last_end;
+    int i;
+
+    /* Sort by source iovecs by base address */
+    for (i = 0; i < src->niov; i++) {
+        sortelems[i].src_index = i;
+        sortelems[i].src_iov = &src->iov[i];
+    }
+    qsort(sortelems, src->niov, sizeof(sortelems[0]), sortelem_cmp_src_base);
+
+    /* Allocate buffer space taking into account overlapping iovecs */
+    last_end = NULL;
+    for (i = 0; i < src->niov; i++) {
+        struct iovec *cur = sortelems[i].src_iov;
+        ptrdiff_t rewind = 0;
+
+        /* Detect overlap */
+        if (last_end && last_end > cur->iov_base) {
+            rewind = last_end - cur->iov_base;
+        }
+
+        sortelems[i].dest_base = buf - rewind;
+        buf += cur->iov_len - MIN(rewind, cur->iov_len);
+        last_end = MAX(cur->iov_base + cur->iov_len, last_end);
+    }
+
+    /* Sort by source iovec index and build destination iovec */
+    qsort(sortelems, src->niov, sizeof(sortelems[0]), sortelem_cmp_src_index);
+    for (i = 0; i < src->niov; i++) {
+        qemu_iovec_add(dest, sortelems[i].dest_base, src->iov[i].iov_len);
+    }
+}
+
 size_t iov_discard_front(struct iovec **iov, unsigned int *iov_cnt,
                          size_t bytes)
 {
-- 
1.8.3.2

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH V15 05/13] quorum: Add quorum_aio_readv.
  2014-02-03 21:51 [Qemu-devel] [PATCH V15 00/13] Quorum block filter Benoît Canet
                   ` (3 preceding siblings ...)
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 04/13] blkverify: Extract qemu_iovec_clone() and qemu_iovec_compare() from blkverify Benoît Canet
@ 2014-02-03 21:51 ` Benoît Canet
  2014-02-04 14:15   ` Kevin Wolf
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 06/13] quorum: Add quorum mechanism Benoît Canet
                   ` (7 subsequent siblings)
  12 siblings, 1 reply; 29+ messages in thread
From: Benoît Canet @ 2014-02-03 21:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, Benoît Canet, stefanha, mreitz

From: Benoît Canet <benoit@irqsave.net>

Signed-off-by: Benoit Canet <benoit@irqsave.net>
Reviewed-by: Max Reitz <mreitz@redhat.com>
---
 block/quorum.c | 38 ++++++++++++++++++++++++++++++++++++++
 1 file changed, 38 insertions(+)

diff --git a/block/quorum.c b/block/quorum.c
index 81bffdd..699b512 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -86,10 +86,19 @@ static void quorum_aio_finalize(QuorumAIOCB *acb)
     BDRVQuorumState *s = acb->bqs;
     int ret = 0;
 
+    for (i = 0; i < s->total; i++) {
+        qemu_vfree(acb->aios[i].buf);
+        acb->aios[i].buf = NULL;
+        acb->aios[i].ret = 0;
+    }
+
     acb->common.cb(acb->common.opaque, ret);
     if (acb->finished) {
         *acb->finished = true;
     }
+    for (i = 0; acb->is_read && i < s->total; i++) {
+        qemu_iovec_destroy(&acb->aios[i].qiov);
+    }
     g_free(acb->aios);
     qemu_aio_release(acb);
 }
@@ -145,6 +154,34 @@ static void quorum_aio_cb(void *opaque, int ret)
     quorum_aio_finalize(acb);
 }
 
+static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
+                                         int64_t sector_num,
+                                         QEMUIOVector *qiov,
+                                         int nb_sectors,
+                                         BlockDriverCompletionFunc *cb,
+                                         void *opaque)
+{
+    BDRVQuorumState *s = bs->opaque;
+    QuorumAIOCB *acb = quorum_aio_get(s, bs, qiov, sector_num,
+                                      nb_sectors, cb, opaque);
+    int i;
+
+    acb->is_read = true;
+
+    for (i = 0; i < s->total; i++) {
+        acb->aios[i].buf = qemu_blockalign(bs->file, qiov->size);
+        qemu_iovec_init(&acb->aios[i].qiov, qiov->niov);
+        qemu_iovec_clone(&acb->aios[i].qiov, qiov, acb->aios[i].buf);
+    }
+
+    for (i = 0; i < s->total; i++) {
+        bdrv_aio_readv(s->bs[i], sector_num, qiov, nb_sectors,
+                       quorum_aio_cb, &acb->aios[i]);
+    }
+
+    return &acb->common;
+}
+
 static BlockDriverAIOCB *quorum_aio_writev(BlockDriverState *bs,
                                           int64_t sector_num,
                                           QEMUIOVector *qiov,
@@ -172,6 +209,7 @@ static BlockDriver bdrv_quorum = {
 
     .instance_size      = sizeof(BDRVQuorumState),
 
+    .bdrv_aio_readv     = quorum_aio_readv,
     .bdrv_aio_writev    = quorum_aio_writev,
 };
 
-- 
1.8.3.2

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH V15 06/13] quorum: Add quorum mechanism.
  2014-02-03 21:51 [Qemu-devel] [PATCH V15 00/13] Quorum block filter Benoît Canet
                   ` (4 preceding siblings ...)
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 05/13] quorum: Add quorum_aio_readv Benoît Canet
@ 2014-02-03 21:51 ` Benoît Canet
  2014-02-04 15:40   ` Kevin Wolf
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 07/13] quorum: Add quorum_getlength() Benoît Canet
                   ` (6 subsequent siblings)
  12 siblings, 1 reply; 29+ messages in thread
From: Benoît Canet @ 2014-02-03 21:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, Benoît Canet, stefanha, mreitz

From: Benoît Canet <benoit@irqsave.net>

Use gnutls's SHA-256 to compare versions.

Signed-off-by: Benoit Canet <benoit@irqsave.net>
---
 block/Makefile.objs       |   2 +-
 block/quorum.c            | 386 +++++++++++++++++++++++++++++++++++++++++++++-
 configure                 |  36 +++++
 docs/qmp/qmp-events.txt   |  33 ++++
 include/monitor/monitor.h |   2 +
 monitor.c                 |   2 +
 6 files changed, 458 insertions(+), 3 deletions(-)

diff --git a/block/Makefile.objs b/block/Makefile.objs
index a2650b9..4ca9d43 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -3,7 +3,7 @@ block-obj-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-c
 block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
 block-obj-y += qed-check.o
 block-obj-$(CONFIG_VHDX) += vhdx.o vhdx-endian.o vhdx-log.o
-block-obj-y += quorum.o
+block-obj-$(CONFIG_QUORUM) += quorum.o
 block-obj-y += parallels.o blkdebug.o blkverify.o
 block-obj-y += snapshot.o qapi.o
 block-obj-$(CONFIG_WIN32) += raw-win32.o win32-aio.o
diff --git a/block/quorum.c b/block/quorum.c
index 699b512..837d261 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -13,7 +13,43 @@
  * See the COPYING file in the top-level directory.
  */
 
+#include <gnutls/gnutls.h>
+#include <gnutls/crypto.h>
 #include "block/block_int.h"
+#include "qapi/qmp/qjson.h"
+
+#define HASH_LENGTH 32
+
+/* This union holds a vote hash value */
+typedef union QuorumVoteValue {
+    char h[HASH_LENGTH];       /* SHA-256 hash */
+    int64_t l;                 /* simpler 64 bits hash */
+} QuorumVoteValue;
+
+/* A vote item */
+typedef struct QuorumVoteItem {
+    int index;
+    QLIST_ENTRY(QuorumVoteItem) next;
+} QuorumVoteItem;
+
+/* this structure is a vote version. A version is the set of votes sharing the
+ * same vote value.
+ * The set of votes will be tracked with the items field and its cardinality is
+ * vote_count.
+ */
+typedef struct QuorumVoteVersion {
+    QuorumVoteValue value;
+    int index;
+    int vote_count;
+    QLIST_HEAD(, QuorumVoteItem) items;
+    QLIST_ENTRY(QuorumVoteVersion) next;
+} QuorumVoteVersion;
+
+/* this structure holds a group of vote versions together */
+typedef struct QuorumVotes {
+    QLIST_HEAD(, QuorumVoteVersion) vote_list;
+    int (*compare)(QuorumVoteValue *a, QuorumVoteValue *b);
+} QuorumVotes;
 
 /* the following structure holds the state of one quorum instance */
 typedef struct {
@@ -60,10 +96,14 @@ struct QuorumAIOCB {
     int success_count;          /* number of successfully completed AIOCB */
     bool *finished;             /* completion signal for cancel */
 
+    QuorumVotes votes;
+
     bool is_read;
     int vote_ret;
 };
 
+static void quorum_vote(QuorumAIOCB *acb);
+
 static void quorum_aio_cancel(BlockDriverAIOCB *blockacb)
 {
     QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common);
@@ -81,10 +121,12 @@ static AIOCBInfo quorum_aiocb_info = {
     .cancel             = quorum_aio_cancel,
 };
 
+static int quorum_vote_error(QuorumAIOCB *acb);
+
 static void quorum_aio_finalize(QuorumAIOCB *acb)
 {
     BDRVQuorumState *s = acb->bqs;
-    int ret = 0;
+    int i, ret = 0;
 
     for (i = 0; i < s->total; i++) {
         qemu_vfree(acb->aios[i].buf);
@@ -92,6 +134,10 @@ static void quorum_aio_finalize(QuorumAIOCB *acb)
         acb->aios[i].ret = 0;
     }
 
+    if (acb->vote_ret) {
+        ret = acb->vote_ret;
+    }
+
     acb->common.cb(acb->common.opaque, ret);
     if (acb->finished) {
         *acb->finished = true;
@@ -103,6 +149,27 @@ static void quorum_aio_finalize(QuorumAIOCB *acb)
     qemu_aio_release(acb);
 }
 
+static int quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
+{
+    return memcmp(a->h, b->h, HASH_LENGTH);
+}
+
+static int quorum_64bits_compare(QuorumVoteValue *a, QuorumVoteValue *b)
+{
+    int64_t i = a->l;
+    int64_t j = b->l;
+
+    if (i < j) {
+        return -1;
+    }
+
+    if (i > j) {
+        return 1;
+    }
+
+    return 0;
+}
+
 static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
                                    BlockDriverState *bs,
                                    QEMUIOVector *qiov,
@@ -122,6 +189,7 @@ static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
     acb->count = 0;
     acb->success_count = 0;
     acb->finished = NULL;
+    acb->votes.compare = quorum_sha256_compare;
     acb->is_read = false;
     acb->vote_ret = 0;
 
@@ -151,9 +219,323 @@ static void quorum_aio_cb(void *opaque, int ret)
         return;
     }
 
+    /* Do the vote on read */
+    if (acb->is_read) {
+        quorum_vote(acb);
+    }
+
     quorum_aio_finalize(acb);
 }
 
+static void quorum_report_bad(QuorumAIOCB *acb, char *node_name)
+{
+    QObject *data;
+    data = qobject_from_jsonf("{ 'node-name': \"%s\""
+                              ", 'sector-num': %" PRId64
+                              ", 'sectors-count': %i }",
+                              node_name,
+                              acb->sector_num,
+                              acb->nb_sectors);
+    monitor_protocol_event(QEVENT_QUORUM_REPORT_BAD, data);
+    qobject_decref(data);
+}
+
+static void quorum_report_failure(QuorumAIOCB *acb)
+{
+    QObject *data;
+    data = qobject_from_jsonf("{ 'sector-num': %" PRId64
+                              ", 'sectors-count': %i }",
+                              acb->sector_num,
+                              acb->nb_sectors);
+    monitor_protocol_event(QEVENT_QUORUM_FAILURE, data);
+    qobject_decref(data);
+}
+
+static void quorum_report_bad_versions(BDRVQuorumState *s,
+                                       QuorumAIOCB *acb,
+                                       QuorumVoteValue *value)
+{
+    QuorumVoteVersion *version;
+    QuorumVoteItem *item;
+
+    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
+        if (!acb->votes.compare(&version->value, value)) {
+            continue;
+        }
+        QLIST_FOREACH(item, &version->items, next) {
+            quorum_report_bad(acb, s->bs[item->index]->node_name);
+        }
+    }
+}
+
+static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
+{
+    int i;
+    assert(dest->niov == source->niov);
+    assert(dest->size == source->size);
+    for (i = 0; i < source->niov; i++) {
+        assert(dest->iov[i].iov_len == source->iov[i].iov_len);
+        memcpy(dest->iov[i].iov_base,
+               source->iov[i].iov_base,
+               source->iov[i].iov_len);
+    }
+}
+
+static void quorum_count_vote(QuorumVotes *votes,
+                              QuorumVoteValue *value,
+                              int index)
+{
+    QuorumVoteVersion *v = NULL, *version = NULL;
+    QuorumVoteItem *item;
+
+    /* look if we have something with this hash */
+    QLIST_FOREACH(v, &votes->vote_list, next) {
+        if (!votes->compare(&v->value, value)) {
+            version = v;
+            break;
+        }
+    }
+
+    /* It's a version not yet in the list add it */
+    if (!version) {
+        version = g_new0(QuorumVoteVersion, 1);
+        QLIST_INIT(&version->items);
+        memcpy(&version->value, value, sizeof(version->value));
+        version->index = index;
+        version->vote_count = 0;
+        QLIST_INSERT_HEAD(&votes->vote_list, version, next);
+    }
+
+    version->vote_count++;
+
+    item = g_new0(QuorumVoteItem, 1);
+    item->index = index;
+    QLIST_INSERT_HEAD(&version->items, item, next);
+}
+
+static void quorum_free_vote_list(QuorumVotes *votes)
+{
+    QuorumVoteVersion *version, *next_version;
+    QuorumVoteItem *item, *next_item;
+
+    QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) {
+        QLIST_REMOVE(version, next);
+        QLIST_FOREACH_SAFE(item, &version->items, next, next_item) {
+            QLIST_REMOVE(item, next);
+            g_free(item);
+        }
+        g_free(version);
+    }
+}
+
+static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash)
+{
+    int j, ret;
+    gnutls_hash_hd_t dig;
+    QEMUIOVector *qiov = &acb->aios[i].qiov;
+
+    ret = gnutls_hash_init(&dig, GNUTLS_DIG_SHA256);
+
+    if (ret < 0) {
+        return ret;
+    }
+
+    for (j = 0; j < qiov->niov; j++) {
+        ret = gnutls_hash(dig, qiov->iov[j].iov_base, qiov->iov[j].iov_len);
+        if (ret < 0) {
+            break;
+        }
+    }
+
+    gnutls_hash_deinit(dig, (void *) hash);
+    return ret;
+}
+
+static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes)
+{
+    int i = 0;
+    QuorumVoteVersion *candidate, *winner = NULL;
+
+    QLIST_FOREACH(candidate, &votes->vote_list, next) {
+        if (candidate->vote_count > i) {
+            i = candidate->vote_count;
+            winner = candidate;
+        }
+    }
+
+    return winner;
+}
+
+static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
+{
+    int i;
+    int result;
+
+    assert(a->niov == b->niov);
+    for (i = 0; i < a->niov; i++) {
+        assert(a->iov[i].iov_len == b->iov[i].iov_len);
+        result = memcmp(a->iov[i].iov_base,
+                        b->iov[i].iov_base,
+                        a->iov[i].iov_len);
+        if (result) {
+            return false;
+        }
+    }
+
+    return true;
+}
+
+static void GCC_FMT_ATTR(2, 3) quorum_err(QuorumAIOCB *acb,
+                                          const char *fmt, ...)
+{
+    va_list ap;
+
+    va_start(ap, fmt);
+    fprintf(stderr, "quorum: sector_num=%" PRId64 " nb_sectors=%d ",
+            acb->sector_num, acb->nb_sectors);
+    vfprintf(stderr, fmt, ap);
+    fprintf(stderr, "\n");
+    va_end(ap);
+    exit(1);
+}
+
+static bool quorum_compare(QuorumAIOCB *acb,
+                           QEMUIOVector *a,
+                           QEMUIOVector *b)
+{
+    BDRVQuorumState *s = acb->bqs;
+    ssize_t offset;
+
+    /* This driver will replace blkverify in this particular case */
+    if (s->is_blkverify) {
+        offset = qemu_iovec_compare(a, b);
+        if (offset != -1) {
+            quorum_err(acb, "contents mismatch in sector %" PRId64,
+                       acb->sector_num +
+                       (uint64_t)(offset / BDRV_SECTOR_SIZE));
+        }
+        return true;
+    }
+
+    return quorum_iovec_compare(a, b);
+}
+
+/* Do a vote to get the error code */
+static int quorum_vote_error(QuorumAIOCB *acb)
+{
+    BDRVQuorumState *s = acb->bqs;
+    QuorumVoteVersion *winner = NULL;
+    QuorumVotes error_votes;
+    QuorumVoteValue result_value;
+    int i, ret = 0;
+    bool error = false;
+
+    QLIST_INIT(&error_votes.vote_list);
+    error_votes.compare = quorum_64bits_compare;
+
+    for (i = 0; i < s->total; i++) {
+        ret = acb->aios[i].ret;
+        if (ret) {
+            error = true;
+            result_value.l = ret;
+            quorum_count_vote(&error_votes, &result_value, i);
+        }
+    }
+
+    if (error) {
+        winner = quorum_get_vote_winner(&error_votes);
+        ret = winner->value.l;
+    }
+
+    quorum_free_vote_list(&error_votes);
+
+    return ret;
+}
+
+static void quorum_vote(QuorumAIOCB *acb)
+{
+    bool quorum = false;
+    int i, j, ret;
+    QuorumVoteValue hash;
+    BDRVQuorumState *s = acb->bqs;
+    QuorumVoteVersion *winner;
+
+    QLIST_INIT(&acb->votes.vote_list);
+
+    /* if we don't get enough successful read use the first error code */
+    if (acb->success_count < s->threshold) {
+        acb->vote_ret = quorum_vote_error(acb);
+        quorum_report_failure(acb);
+        return;
+    }
+
+    /* get the index of the first successful read (we are sure to find one) */
+    for (i = 0; i < s->total; i++) {
+        if (!acb->aios[i].ret) {
+            break;
+        }
+    }
+
+    /* compare this read with all other successful read looking for quorum */
+    for (j = i + 1; j < s->total; j++) {
+        if (acb->aios[j].ret) {
+            continue;
+        }
+        quorum = quorum_compare(acb, &acb->aios[i].qiov, &acb->aios[j].qiov);
+        if (!quorum) {
+            break;
+       }
+    }
+
+    /* Every successful read agrees and their count is higher or equal threshold
+     * -> Quorum
+     */
+    if (quorum && acb->success_count >= s->threshold) {
+        quorum_copy_qiov(acb->qiov, &acb->aios[i].qiov);
+        return;
+    }
+
+    /* compute hashs for each successful read, also store indexes */
+    for (i = 0; i < s->total; i++) {
+        if (acb->aios[i].ret) {
+            continue;
+        }
+        ret = quorum_compute_hash(acb, i, &hash);
+        /* if ever the hash computation failed */
+        if (ret < 0) {
+            acb->vote_ret = ret;
+            goto free_exit;
+        }
+        quorum_count_vote(&acb->votes, &hash, i);
+    }
+
+    /* vote to select the most represented version */
+    winner = quorum_get_vote_winner(&acb->votes);
+    /* every vote version are differents -> error */
+    if (!winner) {
+        quorum_report_failure(acb);
+        acb->vote_ret = -EIO;
+        goto free_exit;
+    }
+
+    /* if the winner count is smaller than threshold the read fails */
+    if (winner->vote_count < s->threshold) {
+        quorum_report_failure(acb);
+        acb->vote_ret = -EIO;
+        goto free_exit;
+    }
+
+    /* we have a winner: copy it */
+    quorum_copy_qiov(acb->qiov, &acb->aios[winner->index].qiov);
+
+    /* some versions are bad print them */
+    quorum_report_bad_versions(s, acb, &winner->value);
+
+free_exit:
+    /* free lists */
+    quorum_free_vote_list(&acb->votes);
+}
+
 static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
                                          int64_t sector_num,
                                          QEMUIOVector *qiov,
@@ -175,7 +557,7 @@ static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
     }
 
     for (i = 0; i < s->total; i++) {
-        bdrv_aio_readv(s->bs[i], sector_num, qiov, nb_sectors,
+        bdrv_aio_readv(s->bs[i], sector_num, &acb->aios[i].qiov, nb_sectors,
                        quorum_aio_cb, &acb->aios[i]);
     }
 
diff --git a/configure b/configure
index b472694..5a68738 100755
--- a/configure
+++ b/configure
@@ -263,6 +263,7 @@ gtkabi="2.0"
 tpm="no"
 libssh2=""
 vhdx=""
+quorum="no"
 
 # parse CC options first
 for opt do
@@ -1000,6 +1001,10 @@ for opt do
   ;;
   --disable-vhdx) vhdx="no"
   ;;
+  --disable-quorum) quorum="no"
+  ;;
+  --enable-quorum) quorum="yes"
+  ;;
   *) echo "ERROR: unknown option $opt"; show_help="yes"
   ;;
   esac
@@ -1254,6 +1259,8 @@ Advanced options (experts only):
   --enable-libssh2         enable ssh block device support
   --disable-vhdx           disables support for the Microsoft VHDX image format
   --enable-vhdx            enable support for the Microsoft VHDX image format
+  --disable-quorum         disable quorum block filter support
+  --enable-quorum          enable quorum block filter support
 
 NOTE: The object files are built at the place where configure is launched
 EOF
@@ -1923,6 +1930,30 @@ EOF
 fi
 
 ##########################################
+# Quorum probe (check for gnutls)
+if test "$quorum" != "no" ; then
+cat > $TMPC <<EOF
+#include <gnutls/gnutls.h>
+#include <gnutls/crypto.h>
+int main(void) {char data[4096], digest[32];
+gnutls_hash_fast(GNUTLS_DIG_SHA256, data, 4096, digest);
+return 0;
+}
+EOF
+quorum_tls_cflags=`$pkg_config --cflags gnutls 2> /dev/null`
+quorum_tls_libs=`$pkg_config --libs gnutls 2> /dev/null`
+if compile_prog "$quorum_tls_cflags" "$quorum_tls_libs" ; then
+  qcow_tls=yes
+  libs_softmmu="$quorum_tls_libs $libs_softmmu"
+  libs_tools="$quorum_tls_libs $libs_softmmu"
+  QEMU_CFLAGS="$QEMU_CFLAGS $quorum_tls_cflags"
+else
+  echo "gnutls > 2.10.0 required to compile Quorum"
+  exit 1
+fi
+fi
+
+##########################################
 # VNC SASL detection
 if test "$vnc" = "yes" -a "$vnc_sasl" != "no" ; then
   cat > $TMPC <<EOF
@@ -3843,6 +3874,7 @@ echo "libssh2 support   $libssh2"
 echo "TPM passthrough   $tpm_passthrough"
 echo "QOM debugging     $qom_cast_debug"
 echo "vhdx              $vhdx"
+echo "Quorum            $quorum"
 
 if test "$sdl_too_old" = "yes"; then
 echo "-> Your SDL version is too old - please upgrade to have SDL support"
@@ -4241,6 +4273,10 @@ if test "$libssh2" = "yes" ; then
   echo "CONFIG_LIBSSH2=y" >> $config_host_mak
 fi
 
+if test "$quorum" = "yes" ; then
+  echo "CONFIG_QUORUM=y" >> $config_host_mak
+fi
+
 if test "$virtio_blk_data_plane" = "yes" ; then
   echo 'CONFIG_VIRTIO_BLK_DATA_PLANE=$(CONFIG_VIRTIO)' >> $config_host_mak
 fi
diff --git a/docs/qmp/qmp-events.txt b/docs/qmp/qmp-events.txt
index 6b87e97..cd93c4a 100644
--- a/docs/qmp/qmp-events.txt
+++ b/docs/qmp/qmp-events.txt
@@ -500,3 +500,36 @@ Example:
 
 Note: If action is "reset", "shutdown", or "pause" the WATCHDOG event is
 followed respectively by the RESET, SHUTDOWN, or STOP events.
+
+QUORUM_FAILURE
+--------------
+
+Emitted by the Quorum block driver if it fails to establish a quorum.
+
+Data:
+
+- "sector-num":   Number of the first sector of the failed read operation.
+- "sector-count": Failed read operation sector count.
+
+Example:
+
+{ "event": "QUORUM_FAILURE",
+     "data": { "sector-num": 345435, "sector-count": 5 },
+     "timestamp": { "seconds": 1344522075, "microseconds": 745528 } }
+
+QUORUM_REPORT_BAD
+-----------------
+
+Emitted to report a corruption of a Quorum file.
+
+Data:
+
+- "node-name":    The graph node name of the block driver state.
+- "sector-num":   Number of the first sector of the failed read operation.
+- "sector-count": Failed read operation sector count.
+
+Example:
+
+{ "event": "QUORUM_REPORT_BAD",
+     "data": { "node-name": "1.raw", "sector-num": 345435, "sector-count": 5 },
+     "timestamp": { "seconds": 1344522075, "microseconds": 745528 } }
diff --git a/include/monitor/monitor.h b/include/monitor/monitor.h
index 7e5f752..a49ea11 100644
--- a/include/monitor/monitor.h
+++ b/include/monitor/monitor.h
@@ -49,6 +49,8 @@ typedef enum MonitorEvent {
     QEVENT_SPICE_MIGRATE_COMPLETED,
     QEVENT_GUEST_PANICKED,
     QEVENT_BLOCK_IMAGE_CORRUPTED,
+    QEVENT_QUORUM_FAILURE,
+    QEVENT_QUORUM_REPORT_BAD,
 
     /* Add to 'monitor_event_names' array in monitor.c when
      * defining new events here */
diff --git a/monitor.c b/monitor.c
index 80456fb..f8f6aea 100644
--- a/monitor.c
+++ b/monitor.c
@@ -507,6 +507,8 @@ static const char *monitor_event_names[] = {
     [QEVENT_SPICE_MIGRATE_COMPLETED] = "SPICE_MIGRATE_COMPLETED",
     [QEVENT_GUEST_PANICKED] = "GUEST_PANICKED",
     [QEVENT_BLOCK_IMAGE_CORRUPTED] = "BLOCK_IMAGE_CORRUPTED",
+    [QEVENT_QUORUM_FAILURE] = "QUORUM_FAILURE",
+    [QEVENT_QUORUM_REPORT_BAD] = "QUORUM_REPORT_BAD",
 };
 QEMU_BUILD_BUG_ON(ARRAY_SIZE(monitor_event_names) != QEVENT_MAX)
 
-- 
1.8.3.2

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH V15 07/13] quorum: Add quorum_getlength().
  2014-02-03 21:51 [Qemu-devel] [PATCH V15 00/13] Quorum block filter Benoît Canet
                   ` (5 preceding siblings ...)
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 06/13] quorum: Add quorum mechanism Benoît Canet
@ 2014-02-03 21:51 ` Benoît Canet
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 08/13] quorum: Add quorum_invalidate_cache() Benoît Canet
                   ` (5 subsequent siblings)
  12 siblings, 0 replies; 29+ messages in thread
From: Benoît Canet @ 2014-02-03 21:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, Benoît Canet, stefanha, mreitz

From: Benoît Canet <benoit@irqsave.net>

Check that every bs file returns the same length.
Otherwise, return -EIO to disable the quorum and
avoid length discrepancy.

Signed-off-by: Benoit Canet <benoit@irqsave.net>
Reviewed-by: Max Reitz <mreitz@redhat.com>
---
 block/quorum.c | 26 ++++++++++++++++++++++++++
 1 file changed, 26 insertions(+)

diff --git a/block/quorum.c b/block/quorum.c
index 837d261..2f302b4 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -585,12 +585,38 @@ static BlockDriverAIOCB *quorum_aio_writev(BlockDriverState *bs,
     return &acb->common;
 }
 
+static int64_t quorum_getlength(BlockDriverState *bs)
+{
+    BDRVQuorumState *s = bs->opaque;
+    int64_t result;
+    int i;
+
+    /* check that all file have the same length */
+    result = bdrv_getlength(s->bs[0]);
+    if (result < 0) {
+        return result;
+    }
+    for (i = 1; i < s->total; i++) {
+        int64_t value = bdrv_getlength(s->bs[i]);
+        if (value < 0) {
+            return value;
+        }
+        if (value != result) {
+            return -EIO;
+        }
+    }
+
+    return result;
+}
+
 static BlockDriver bdrv_quorum = {
     .format_name        = "quorum",
     .protocol_name      = "quorum",
 
     .instance_size      = sizeof(BDRVQuorumState),
 
+    .bdrv_getlength     = quorum_getlength,
+
     .bdrv_aio_readv     = quorum_aio_readv,
     .bdrv_aio_writev    = quorum_aio_writev,
 };
-- 
1.8.3.2

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH V15 08/13] quorum: Add quorum_invalidate_cache().
  2014-02-03 21:51 [Qemu-devel] [PATCH V15 00/13] Quorum block filter Benoît Canet
                   ` (6 preceding siblings ...)
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 07/13] quorum: Add quorum_getlength() Benoît Canet
@ 2014-02-03 21:51 ` Benoît Canet
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 09/13] quorum: Add quorum_co_get_block_status Benoît Canet
                   ` (4 subsequent siblings)
  12 siblings, 0 replies; 29+ messages in thread
From: Benoît Canet @ 2014-02-03 21:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, Benoît Canet, stefanha, mreitz

From: Benoît Canet <benoit@irqsave.net>

Signed-off-by: Benoit Canet <benoit@irqsave.net>
Reviewed-by: Max Reitz <mreitz@redhat.com>
---
 block/quorum.c | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git a/block/quorum.c b/block/quorum.c
index 2f302b4..cef4424 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -609,6 +609,16 @@ static int64_t quorum_getlength(BlockDriverState *bs)
     return result;
 }
 
+static void quorum_invalidate_cache(BlockDriverState *bs)
+{
+    BDRVQuorumState *s = bs->opaque;
+    int i;
+
+    for (i = 0; i < s->total; i++) {
+        bdrv_invalidate_cache(s->bs[i]);
+    }
+}
+
 static BlockDriver bdrv_quorum = {
     .format_name        = "quorum",
     .protocol_name      = "quorum",
@@ -619,6 +629,7 @@ static BlockDriver bdrv_quorum = {
 
     .bdrv_aio_readv     = quorum_aio_readv,
     .bdrv_aio_writev    = quorum_aio_writev,
+    .bdrv_invalidate_cache = quorum_invalidate_cache,
 };
 
 static void bdrv_quorum_init(void)
-- 
1.8.3.2

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH V15 09/13] quorum: Add quorum_co_get_block_status.
  2014-02-03 21:51 [Qemu-devel] [PATCH V15 00/13] Quorum block filter Benoît Canet
                   ` (7 preceding siblings ...)
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 08/13] quorum: Add quorum_invalidate_cache() Benoît Canet
@ 2014-02-03 21:51 ` Benoît Canet
  2014-02-04 15:49   ` Kevin Wolf
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 10/13] quorum: Add quorum_co_flush() Benoît Canet
                   ` (3 subsequent siblings)
  12 siblings, 1 reply; 29+ messages in thread
From: Benoît Canet @ 2014-02-03 21:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, Benoît Canet, stefanha, mreitz

From: Benoît Canet <benoit@irqsave.net>

Signed-off-by: Benoit Canet <benoit@irqsave.net>
Reviewed-by: Max Reitz <mreitz@redhat.com>
---
 block/quorum.c | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 51 insertions(+)

diff --git a/block/quorum.c b/block/quorum.c
index cef4424..677a96d 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -619,6 +619,56 @@ static void quorum_invalidate_cache(BlockDriverState *bs)
     }
 }
 
+static int64_t coroutine_fn quorum_co_get_block_status(BlockDriverState *bs,
+                                                       int64_t sector_num,
+                                                       int nb_sectors,
+                                                       int *pnum)
+{
+    BDRVQuorumState *s = bs->opaque;
+    QuorumVoteVersion *winner = NULL;
+    QuorumVotes result_votes, num_votes;
+    QuorumVoteValue result_value, num_value;
+    int i, num;
+    int64_t result = 0;
+
+    QLIST_INIT(&result_votes.vote_list);
+    QLIST_INIT(&num_votes.vote_list);
+    result_votes.compare = quorum_64bits_compare;
+    num_votes.compare = quorum_64bits_compare;
+
+    for (i = 0; i < s->total; i++) {
+        result = bdrv_get_block_status(s->bs[i], sector_num, nb_sectors, &num);
+        /* skip failed requests */
+        if (result < 0) {
+            continue;
+        }
+        result_value.l = result & BDRV_BLOCK_DATA;
+        num_value.l = num;
+        quorum_count_vote(&result_votes, &result_value, i);
+        quorum_count_vote(&num_votes, &num_value, i);
+    }
+
+    winner = quorum_get_vote_winner(&result_votes);
+    if (winner->vote_count < s->threshold) {
+        result = -EIO;
+        goto free_exit;
+    }
+    result = winner->value.l;
+
+    winner = quorum_get_vote_winner(&num_votes);
+    if (winner->vote_count < s->threshold) {
+        result = -EIO;
+        goto free_exit;
+    }
+    *pnum = winner->value.l;
+
+free_exit:
+    quorum_free_vote_list(&result_votes);
+    quorum_free_vote_list(&num_votes);
+
+    return result;
+}
+
 static BlockDriver bdrv_quorum = {
     .format_name        = "quorum",
     .protocol_name      = "quorum",
@@ -630,6 +680,7 @@ static BlockDriver bdrv_quorum = {
     .bdrv_aio_readv     = quorum_aio_readv,
     .bdrv_aio_writev    = quorum_aio_writev,
     .bdrv_invalidate_cache = quorum_invalidate_cache,
+    .bdrv_co_get_block_status = quorum_co_get_block_status,
 };
 
 static void bdrv_quorum_init(void)
-- 
1.8.3.2

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH V15 10/13] quorum: Add quorum_co_flush().
  2014-02-03 21:51 [Qemu-devel] [PATCH V15 00/13] Quorum block filter Benoît Canet
                   ` (8 preceding siblings ...)
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 09/13] quorum: Add quorum_co_get_block_status Benoît Canet
@ 2014-02-03 21:51 ` Benoît Canet
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 11/13] quorum: Implement recursive .bdrv_recurse_is_first_non_filter in quorum Benoît Canet
                   ` (2 subsequent siblings)
  12 siblings, 0 replies; 29+ messages in thread
From: Benoît Canet @ 2014-02-03 21:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, Benoît Canet, stefanha, mreitz

From: Benoît Canet <benoit@irqsave.net>

Makes a vote to select error if any.

Signed-off-by: Benoit Canet <benoit@irqsave.net>
---
 block/quorum.c | 28 ++++++++++++++++++++++++++++
 1 file changed, 28 insertions(+)

diff --git a/block/quorum.c b/block/quorum.c
index 677a96d..c9e3b2a 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -669,12 +669,40 @@ free_exit:
     return result;
 }
 
+static coroutine_fn int quorum_co_flush(BlockDriverState *bs)
+{
+    BDRVQuorumState *s = bs->opaque;
+    QuorumVoteVersion *winner = NULL;
+    QuorumVotes error_votes;
+    QuorumVoteValue result_value;
+    int i;
+    int result = 0;
+
+    QLIST_INIT(&error_votes.vote_list);
+    error_votes.compare = quorum_64bits_compare;
+
+    for (i = 0; i < s->total; i++) {
+        result = bdrv_co_flush(s->bs[i]);
+        result_value.l = result;
+        quorum_count_vote(&error_votes, &result_value, i);
+    }
+
+    winner = quorum_get_vote_winner(&error_votes);
+    result = winner->value.l;
+
+    quorum_free_vote_list(&error_votes);
+
+    return result;
+}
+
 static BlockDriver bdrv_quorum = {
     .format_name        = "quorum",
     .protocol_name      = "quorum",
 
     .instance_size      = sizeof(BDRVQuorumState),
 
+    .bdrv_co_flush_to_disk = quorum_co_flush,
+
     .bdrv_getlength     = quorum_getlength,
 
     .bdrv_aio_readv     = quorum_aio_readv,
-- 
1.8.3.2

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH V15 11/13] quorum: Implement recursive .bdrv_recurse_is_first_non_filter in quorum.
  2014-02-03 21:51 [Qemu-devel] [PATCH V15 00/13] Quorum block filter Benoît Canet
                   ` (9 preceding siblings ...)
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 10/13] quorum: Add quorum_co_flush() Benoît Canet
@ 2014-02-03 21:51 ` Benoît Canet
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 12/13] quorum: Add quorum_open() and quorum_close() Benoît Canet
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 13/13] quorum: Add unit test Benoît Canet
  12 siblings, 0 replies; 29+ messages in thread
From: Benoît Canet @ 2014-02-03 21:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, Benoît Canet, stefanha, mreitz

From: Benoît Canet <benoit@irqsave.net>

Signed-off-by: Benoit Canet <benoit@irqsave.net>
Reviewed-by: Max Reitz <mreitz@redhat.com>
---
 block/quorum.c | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)

diff --git a/block/quorum.c b/block/quorum.c
index c9e3b2a..1e683f8 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -695,6 +695,23 @@ static coroutine_fn int quorum_co_flush(BlockDriverState *bs)
     return result;
 }
 
+static bool quorum_recurse_is_first_non_filter(BlockDriverState *bs,
+                                               BlockDriverState *candidate)
+{
+    BDRVQuorumState *s = bs->opaque;
+    int i;
+
+    for (i = 0; i < s->total; i++) {
+        bool perm = bdrv_recurse_is_first_non_filter(s->bs[i],
+                                                     candidate);
+        if (perm) {
+            return true;
+        }
+    }
+
+    return false;
+}
+
 static BlockDriver bdrv_quorum = {
     .format_name        = "quorum",
     .protocol_name      = "quorum",
@@ -709,6 +726,8 @@ static BlockDriver bdrv_quorum = {
     .bdrv_aio_writev    = quorum_aio_writev,
     .bdrv_invalidate_cache = quorum_invalidate_cache,
     .bdrv_co_get_block_status = quorum_co_get_block_status,
+
+    .bdrv_recurse_is_first_non_filter = quorum_recurse_is_first_non_filter,
 };
 
 static void bdrv_quorum_init(void)
-- 
1.8.3.2

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH V15 12/13] quorum: Add quorum_open() and quorum_close().
  2014-02-03 21:51 [Qemu-devel] [PATCH V15 00/13] Quorum block filter Benoît Canet
                   ` (10 preceding siblings ...)
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 11/13] quorum: Implement recursive .bdrv_recurse_is_first_non_filter in quorum Benoît Canet
@ 2014-02-03 21:51 ` Benoît Canet
  2014-02-04 16:08   ` Kevin Wolf
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 13/13] quorum: Add unit test Benoît Canet
  12 siblings, 1 reply; 29+ messages in thread
From: Benoît Canet @ 2014-02-03 21:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, Benoît Canet, stefanha, mreitz

From: Benoît Canet <benoit@irqsave.net>

Example of command line:
-drive if=virtio,file.driver=quorum,\
file.children.0.file.filename=1.raw,\
file.children.0.node-name=1.raw,\
file.children.0.driver=raw,\
file.children.1.file.filename=2.raw,\
file.children.1.node-name=2.raw,\
file.children.1.driver=raw,\
file.children.2.file.filename=3.raw,\
file.children.2.node-name=3.raw,\
file.children.2.driver=raw,\
file.vote_threshold=2

file.blkverify=on with file.vote_threshold=2 and two files can be passed to
emulated blkverify.

Signed-off-by: Benoit Canet <benoit@irqsave.net>
---
 block/quorum.c   | 171 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 qapi-schema.json |  21 ++++++-
 2 files changed, 191 insertions(+), 1 deletion(-)

diff --git a/block/quorum.c b/block/quorum.c
index 1e683f8..d2bea29 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -17,8 +17,12 @@
 #include <gnutls/crypto.h>
 #include "block/block_int.h"
 #include "qapi/qmp/qjson.h"
+#include "qapi/qmp/types.h"
+#include "qemu-common.h"
 
 #define HASH_LENGTH 32
+#define KEY_PREFIX "children."
+#define KEY_FILENAME_SUFFIX ".file.filename"
 
 /* This union holds a vote hash value */
 typedef union QuorumVoteValue {
@@ -712,12 +716,179 @@ static bool quorum_recurse_is_first_non_filter(BlockDriverState *bs,
     return false;
 }
 
+static int quorum_valid_threshold(int threshold,
+                                  int total,
+                                  Error **errp)
+{
+
+    if (threshold < 1) {
+        error_set(errp, QERR_INVALID_PARAMETER_VALUE,
+                  "vote-threshold", "value >= 1");
+        return -ERANGE;
+    }
+
+    if (threshold > total) {
+        error_setg(errp, "threshold may not exceed children count");
+        return -ERANGE;
+    }
+
+    return 0;
+}
+
+static int quorum_open(BlockDriverState *bs,
+                       QDict *options,
+                       int flags,
+                       Error **errp)
+{
+    BDRVQuorumState *s = bs->opaque;
+    Error *local_err = NULL;
+    bool *opened;
+    QDict *sub = NULL;
+    QList *list = NULL;
+    const QListEntry *lentry;
+    const QDictEntry *dentry;
+    const char *value;
+    char *next;
+    int i;
+    int ret = 0;
+    unsigned long long threshold = 0;
+
+    qdict_flatten(options);
+    qdict_extract_subqdict(options, &sub, "children.");
+    qdict_array_split(sub, &list);
+
+    /* count how many different children are present and validate */
+    s->total = !qlist_size(list) ? qdict_size(sub) : qlist_size(list);
+    if (s->total < 2) {
+        error_setg(&local_err,
+                   "Number of provided children must be greater than 1");
+        ret = -EINVAL;
+        goto exit;
+    }
+
+    ret = qdict_get_try_int(options, "vote-threshold", -1);
+    /* from QMP */
+    if (ret != -1) {
+        qdict_del(options, "vote-threshold");
+        s->threshold = ret;
+    /* from command line */
+    } else {
+        /* retrieve the threshold option from the command line */
+        value = qdict_get_try_str(options, "vote_threshold");
+        if (!value) {
+            error_setg(&local_err,
+                       "vote_threshold must be provided");
+            ret = -EINVAL;
+            goto exit;
+        }
+        qdict_del(options, "vote_threshold");
+
+        ret = parse_uint(value, &threshold, &next, 10);
+
+        /* no int found -> scan fail */
+        if (ret < 0) {
+            error_setg(&local_err,
+                       "invalid vote_threshold specified");
+            ret = -EINVAL;
+            goto exit;
+        }
+        s->threshold = threshold;
+    }
+
+    /* and validate it against s->total */
+    ret = quorum_valid_threshold(s->threshold, s->total, &local_err);
+    if (ret < 0) {
+        goto exit;
+    }
+
+    /* is the driver in blkverify mode */
+    value = qdict_get_try_str(options, "blkverify");
+    if (value && !strcmp(value, "on")  &&
+        s->total == 2 && s->threshold == 2) {
+        s->is_blkverify = true;
+    } else if (value && strcmp(value, "off")) {
+        fprintf(stderr, "blkverify mode is set by setting blkverify=on "
+                        "and using two files with vote_threshold=2\n");
+    }
+    qdict_del(options, "blkverify");
+
+    /* allocate the children BlockDriverState array */
+    s->bs = g_new0(BlockDriverState *, s->total);
+    opened = g_new0(bool, s->total);
+
+    /* Open by file name or options dict (command line or QMP) */
+    if (s->total == qlist_size(list)) {
+        for (i = 0, lentry = qlist_first(list); lentry;
+            lentry = qlist_next(lentry), i++) {
+            ret = bdrv_open(&s->bs[i], NULL, NULL,
+                            qobject_to_qdict(lentry->value),
+                            flags, NULL, &local_err);
+            if (ret < 0) {
+                goto close_exit;
+            }
+            opened[i] = true;
+        }
+    /* Open by QMP references */
+    } else {
+        for (i = 0, dentry = qdict_first(sub); dentry;
+             dentry = qdict_next(sub, dentry), i++) {
+            ret = bdrv_open(&s->bs[i], NULL,
+                            qstring_get_str(qobject_to_qstring(dentry->value)),
+                            NULL, flags, NULL, &local_err);
+            if (ret < 0) {
+                goto close_exit;
+            }
+            opened[i] = true;
+        }
+    }
+
+    g_free(opened);
+    goto exit;
+
+close_exit:
+    /* cleanup on error */
+    for (i = 0; i < s->total; i++) {
+        if (!opened[i]) {
+            continue;
+        }
+        bdrv_unref(s->bs[i]);
+    }
+    g_free(s->bs);
+    g_free(opened);
+exit:
+    /* propagate error */
+    if (error_is_set(&local_err)) {
+        error_propagate(errp, local_err);
+    }
+    QDECREF(sub);
+    QDECREF(list);
+    return ret;
+}
+
+static void quorum_close(BlockDriverState *bs)
+{
+    BDRVQuorumState *s = bs->opaque;
+    int i;
+
+    for (i = 0; i < s->total; i++) {
+        /* Ensure writes reach stable storage */
+        bdrv_flush(s->bs[i]);
+        /* close manually because we'll free s->bs */
+        bdrv_unref(s->bs[i]);
+    }
+
+    g_free(s->bs);
+}
+
 static BlockDriver bdrv_quorum = {
     .format_name        = "quorum",
     .protocol_name      = "quorum",
 
     .instance_size      = sizeof(BDRVQuorumState),
 
+    .bdrv_file_open     = quorum_open,
+    .bdrv_close         = quorum_close,
+
     .bdrv_co_flush_to_disk = quorum_co_flush,
 
     .bdrv_getlength     = quorum_getlength,
diff --git a/qapi-schema.json b/qapi-schema.json
index 05ced9d..903a3a0 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -4352,6 +4352,24 @@
             'raw': 'BlockdevRef' } }
 
 ##
+# @BlockdevOptionsQuorum
+#
+# Driver specific block device options for Quorum
+#
+# @blkverify:      #optional true if the driver must print content mismatch
+#
+# @children:       the children block device to use
+#
+# @vote_threshold: the vote limit under which a read will fail
+#
+# Since: 2.0
+##
+{ 'type': 'BlockdevOptionsQuorum',
+  'data': { '*blkverify': 'bool',
+            'children': [ 'BlockdevRef' ],
+            'vote-threshold': 'int' } }
+
+##
 # @BlockdevOptions
 #
 # Options for creating a block device.
@@ -4389,7 +4407,8 @@
       'vdi':        'BlockdevOptionsGenericFormat',
       'vhdx':       'BlockdevOptionsGenericFormat',
       'vmdk':       'BlockdevOptionsGenericCOWFormat',
-      'vpc':        'BlockdevOptionsGenericFormat'
+      'vpc':        'BlockdevOptionsGenericFormat',
+      'quorum':     'BlockdevOptionsQuorum'
   } }
 
 ##
-- 
1.8.3.2

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH V15 13/13] quorum: Add unit test.
  2014-02-03 21:51 [Qemu-devel] [PATCH V15 00/13] Quorum block filter Benoît Canet
                   ` (11 preceding siblings ...)
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 12/13] quorum: Add quorum_open() and quorum_close() Benoît Canet
@ 2014-02-03 21:51 ` Benoît Canet
  2014-02-04 16:13   ` Kevin Wolf
  12 siblings, 1 reply; 29+ messages in thread
From: Benoît Canet @ 2014-02-03 21:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, Benoît Canet, Benoit Canet, mreitz, stefanha

Signed-off-by: Benoit Canet <benoit@irqsave.net>
Reviewed-by: Max Reitz <mreitz@redhat.com>
---
 tests/qemu-iotests/075     | 95 ++++++++++++++++++++++++++++++++++++++++++++++
 tests/qemu-iotests/075.out | 34 +++++++++++++++++
 tests/qemu-iotests/group   |  1 +
 3 files changed, 130 insertions(+)
 create mode 100755 tests/qemu-iotests/075
 create mode 100644 tests/qemu-iotests/075.out

diff --git a/tests/qemu-iotests/075 b/tests/qemu-iotests/075
new file mode 100755
index 0000000..7e4f4f7
--- /dev/null
+++ b/tests/qemu-iotests/075
@@ -0,0 +1,95 @@
+#!/bin/bash
+#
+# Test Quorum block driver
+#
+# Copyright (C) 2013 Nodalink, SARL.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+
+# creator
+owner=benoit@irqsave.net
+
+seq=`basename $0`
+echo "QA output created by $seq"
+
+here=`pwd`
+tmp=/tmp/$$
+status=1	# failure is the default!
+
+_cleanup()
+{
+    rm -rf $TEST_DIR/1.raw
+    rm -rf $TEST_DIR/2.raw
+    rm -rf $TEST_DIR/3.raw
+}
+trap "_cleanup; exit \$status" 0 1 2 3 15
+
+# get standard environment, filters and checks
+. ./common.rc
+. ./common.filter
+
+_supported_fmt raw
+_supported_proto generic
+_supported_os Linux
+
+quorum="file.driver=quorum,file.children.0.file.filename=$TEST_DIR/1.raw"
+quorum="$quorum,file.children.1.file.filename=$TEST_DIR/2.raw"
+quorum="$quorum,file.children.2.file.filename=$TEST_DIR/3.raw,file.vote_threshold=2"
+
+echo
+echo "== creating quorum files =="
+
+size=10M
+
+TEST_IMG="$TEST_DIR/1.raw" _make_test_img $size
+TEST_IMG="$TEST_DIR/2.raw" _make_test_img $size
+TEST_IMG="$TEST_DIR/3.raw" _make_test_img $size
+
+echo
+echo "== writing images =="
+
+$QEMU_IO -c "open -o $quorum" -c "write -P 0x32 0 $size" | _filter_qemu_io
+
+echo
+echo "== checking quorum write =="
+
+$QEMU_IO -c "read -P 0x32 0 $size" "$TEST_DIR/1.raw" | _filter_qemu_io
+$QEMU_IO -c "read -P 0x32 0 $size" "$TEST_DIR/2.raw" | _filter_qemu_io
+$QEMU_IO -c "read -P 0x32 0 $size" "$TEST_DIR/3.raw" | _filter_qemu_io
+
+echo
+echo "== corrupting image =="
+
+$QEMU_IO -c "write -P 0x42 0 $size" "$TEST_DIR/2.raw" | _filter_qemu_io
+
+echo
+echo "== checking quorum correction =="
+
+$QEMU_IO -c "open -o $quorum" -c "read -P 0x32 0 $size" | _filter_qemu_io
+
+echo
+echo "== breaking quorum =="
+
+$QEMU_IO -c "write -P 0x41 0 $size" "$TEST_DIR/1.raw" | _filter_qemu_io
+echo
+echo "== checking that quorum is broken =="
+
+$QEMU_IO -c "open -o $quorum" -c "read -P 0x32 0 $size" | _filter_qemu_io
+
+
+# success, all done
+echo "*** done"
+rm -f $seq.full
+status=0
diff --git a/tests/qemu-iotests/075.out b/tests/qemu-iotests/075.out
new file mode 100644
index 0000000..eb1a0d6
--- /dev/null
+++ b/tests/qemu-iotests/075.out
@@ -0,0 +1,34 @@
+QA output created by 075
+
+== creating quorum files ==
+Formatting 'TEST_DIR/1.IMGFMT', fmt=IMGFMT size=10485760 
+Formatting 'TEST_DIR/2.IMGFMT', fmt=IMGFMT size=10485760 
+Formatting 'TEST_DIR/3.IMGFMT', fmt=IMGFMT size=10485760 
+
+== writing images ==
+wrote 10485760/10485760 bytes at offset 0
+10 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+
+== checking quorum write ==
+read 10485760/10485760 bytes at offset 0
+10 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+read 10485760/10485760 bytes at offset 0
+10 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+read 10485760/10485760 bytes at offset 0
+10 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+
+== corrupting image ==
+wrote 10485760/10485760 bytes at offset 0
+10 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+
+== checking quorum correction ==
+read 10485760/10485760 bytes at offset 0
+10 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+
+== breaking quorum ==
+wrote 10485760/10485760 bytes at offset 0
+10 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+
+== checking that quorum is broken ==
+qemu-io: can't open device (null): Could not read image for determining its format: Input/output error
+*** done
diff --git a/tests/qemu-iotests/group b/tests/qemu-iotests/group
index 03c762f..c697500 100644
--- a/tests/qemu-iotests/group
+++ b/tests/qemu-iotests/group
@@ -81,4 +81,5 @@
 072 rw auto
 073 rw auto
 074 rw auto
+075 rw auto
 077 rw auto
-- 
1.8.3.2

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH V15 01/13] quorum: Create quorum.c, add QuorumSingleAIOCB and QuorumAIOCB.
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 01/13] quorum: Create quorum.c, add QuorumSingleAIOCB and QuorumAIOCB Benoît Canet
@ 2014-02-04 12:57   ` Kevin Wolf
  2014-02-05 13:29     ` Benoît Canet
  0 siblings, 1 reply; 29+ messages in thread
From: Kevin Wolf @ 2014-02-04 12:57 UTC (permalink / raw)
  To: Benoît Canet; +Cc: Benoît Canet, qemu-devel, stefanha, mreitz

Am 03.02.2014 um 22:51 hat Benoît Canet geschrieben:
> From: Benoît Canet <benoit@irqsave.net>
> 
> Signed-off-by: Benoit Canet <benoit@irqsave.net>
> Reviewed-by: Max Reitz <mreitz@redhat.com>
> ---
>  block/Makefile.objs |  1 +
>  block/quorum.c      | 54 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 55 insertions(+)
>  create mode 100644 block/quorum.c
> 
> diff --git a/block/Makefile.objs b/block/Makefile.objs
> index 4e8c91e..a2650b9 100644
> --- a/block/Makefile.objs
> +++ b/block/Makefile.objs
> @@ -3,6 +3,7 @@ block-obj-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-c
>  block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
>  block-obj-y += qed-check.o
>  block-obj-$(CONFIG_VHDX) += vhdx.o vhdx-endian.o vhdx-log.o
> +block-obj-y += quorum.o
>  block-obj-y += parallels.o blkdebug.o blkverify.o
>  block-obj-y += snapshot.o qapi.o
>  block-obj-$(CONFIG_WIN32) += raw-win32.o win32-aio.o
> diff --git a/block/quorum.c b/block/quorum.c
> new file mode 100644
> index 0000000..17695d6
> --- /dev/null
> +++ b/block/quorum.c
> @@ -0,0 +1,54 @@
> +/*
> + * Quorum Block filter
> + *
> + * Copyright (C) 2012-2014 Nodalink, EURL.
> + *
> + * Author:
> + *   Benoît Canet <benoit.canet@irqsave.net>
> + *
> + * Based on the design and code of blkverify.c (Copyright (C) 2010 IBM, Corp)
> + * and blkmirror.c (Copyright (C) 2011 Red Hat, Inc).

I think you were planning to respin anyway, so I'll practice my
nitpicking: The file is called mirror.c, not blkmirror.c.

> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
> + * See the COPYING file in the top-level directory.
> + */
> +
> +#include "block/block_int.h"
> +
> +typedef struct QuorumAIOCB QuorumAIOCB;
> +
> +/* Quorum will create one instance of the following structure per operation it
> + * performs on its children.
> + * So for each read/write operation coming from the upper layer there will be
> + * $children_count QuorumSingleAIOCB.
> + */
> +typedef struct QuorumSingleAIOCB {
> +    BlockDriverAIOCB *aiocb;

So this isn't a real AIOCB, but it merely points to one. Perhaps
something like QuorumChildRequest would be a more precise name?

> +    QEMUIOVector qiov;
> +    uint8_t *buf;

The combination of a linear buffer and qiov is unusual. It looks like
there may be a reason for it (otherwise qemu_iovec_clone() wouldn't
exist), but I don't understand it yet at this point. Could hint at a
lack of documentation.

I might back come to this later in the patch series.

> +    int ret;
> +    QuorumAIOCB *parent;
> +} QuorumSingleAIOCB;
> +
> +/* Quorum will use the following structure to track progress of each read/write
> + * operation received by the upper layer.
> + * This structure hold pointers to the QuorumSingleAIOCB structures instances
> + * used to do operations on each children and track overall progress.
> + */
> +struct QuorumAIOCB {
> +    BlockDriverAIOCB common;
> +
> +    /* Request metadata */
> +    uint64_t sector_num;
> +    int nb_sectors;
> +
> +    QEMUIOVector *qiov;         /* calling IOV */
> +
> +    QuorumSingleAIOCB *aios;    /* individual AIOs */
> +    int count;                  /* number of completed AIOCB */
> +    int success_count;          /* number of successfully completed AIOCB */
> +    bool *finished;             /* completion signal for cancel */
> +
> +    bool is_read;
> +    int vote_ret;
> +};

Kevin

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH V15 02/13] quorum: Create BDRVQuorumState and BlkDriver and do init.
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 02/13] quorum: Create BDRVQuorumState and BlkDriver and do init Benoît Canet
@ 2014-02-04 13:08   ` Kevin Wolf
  0 siblings, 0 replies; 29+ messages in thread
From: Kevin Wolf @ 2014-02-04 13:08 UTC (permalink / raw)
  To: Benoît Canet; +Cc: Benoît Canet, qemu-devel, stefanha, mreitz

Am 03.02.2014 um 22:51 hat Benoît Canet geschrieben:
> From: Benoît Canet <benoit@irqsave.net>
> 
> Signed-off-by: Benoit Canet <benoit@irqsave.net>
> Reviewed-by: Max Reitz <mreitz@redhat.com>
> ---
>  block/quorum.c | 25 +++++++++++++++++++++++++
>  1 file changed, 25 insertions(+)
> 
> diff --git a/block/quorum.c b/block/quorum.c
> index 17695d6..157efdf 100644
> --- a/block/quorum.c
> +++ b/block/quorum.c
> @@ -15,6 +15,16 @@
>  
>  #include "block/block_int.h"
>  
> +/* the following structure holds the state of one quorum instance */
> +typedef struct {

The convention is to have both a struct name and a typedef name (and
it should be the same in both places, of course).

> +    BlockDriverState **bs; /* children BlockDriverStates */
> +    int total;             /* children count */

num_children? "total" could be anything.

> +    int threshold;         /* if less than threshold children reads gave the
> +                            * same result a quorum error occurs.
> +                            */
> +    bool is_blkverify;     /* true if the driver is in blkverify mode */

Can you describe in detail, what blkverify mode is?

> +} BDRVQuorumState;
> +
>  typedef struct QuorumAIOCB QuorumAIOCB;
>  
>  /* Quorum will create one instance of the following structure per operation it
> @@ -37,6 +47,7 @@ typedef struct QuorumSingleAIOCB {
>   */
>  struct QuorumAIOCB {
>      BlockDriverAIOCB common;
> +    BDRVQuorumState *bqs;
>  
>      /* Request metadata */
>      uint64_t sector_num;
> @@ -52,3 +63,17 @@ struct QuorumAIOCB {
>      bool is_read;
>      int vote_ret;
>  };
> +
> +static BlockDriver bdrv_quorum = {
> +    .format_name        = "quorum",
> +    .protocol_name      = "quorum",
> +
> +    .instance_size      = sizeof(BDRVQuorumState),
> +};
> +
> +static void bdrv_quorum_init(void)
> +{
> +    bdrv_register(&bdrv_quorum);
> +}
> +
> +block_init(bdrv_quorum_init);

I suspect that trying to use the driver will cause segfaults in this
state (has neither bdrv_open nor bdrv_file_open). But okay, we can live
with that.

Kevin

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH V15 03/13] quorum: Add quorum_aio_writev and its dependencies.
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 03/13] quorum: Add quorum_aio_writev and its dependencies Benoît Canet
@ 2014-02-04 13:57   ` Kevin Wolf
  2014-02-05 14:14     ` Benoît Canet
  0 siblings, 1 reply; 29+ messages in thread
From: Kevin Wolf @ 2014-02-04 13:57 UTC (permalink / raw)
  To: Benoît Canet; +Cc: Benoît Canet, qemu-devel, stefanha, mreitz

Am 03.02.2014 um 22:51 hat Benoît Canet geschrieben:
> From: Benoît Canet <benoit@irqsave.net>
> 
> Signed-off-by: Benoit Canet <benoit@irqsave.net>
> ---
>  block/quorum.c | 104 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>  1 file changed, 104 insertions(+)

Starting with writes before the driver can even open an image is a weird
order to do things in. It also doesn't make the review any easier when
you don't know how things are initialised.

> diff --git a/block/quorum.c b/block/quorum.c
> index 157efdf..81bffdd 100644
> --- a/block/quorum.c
> +++ b/block/quorum.c
> @@ -64,11 +64,115 @@ struct QuorumAIOCB {
>      int vote_ret;
>  };
>  
> +static void quorum_aio_cancel(BlockDriverAIOCB *blockacb)
> +{
> +    QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common);
> +    BDRVQuorumState *s = acb->bqs;
> +    int i;
> +
> +    /* cancel all callback */

"callbacks"

> +    for (i = 0; i < s->total; i++) {
> +        bdrv_aio_cancel(acb->aios[i].aiocb);
> +    }
> +}

Don't you want to free acb and similar cleanup?

> +
> +static AIOCBInfo quorum_aiocb_info = {
> +    .aiocb_size         = sizeof(QuorumAIOCB),
> +    .cancel             = quorum_aio_cancel,
> +};
> +
> +static void quorum_aio_finalize(QuorumAIOCB *acb)
> +{
> +    BDRVQuorumState *s = acb->bqs;

block/quorum.c: In function 'quorum_aio_finalize':
block/quorum.c:86:22: error: unused variable 's' [-Werror=unused-variable]

> +    int ret = 0;
> +
> +    acb->common.cb(acb->common.opaque, ret);
> +    if (acb->finished) {
> +        *acb->finished = true;
> +    }
> +    g_free(acb->aios);
> +    qemu_aio_release(acb);
> +}
> +
> +static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
> +                                   BlockDriverState *bs,
> +                                   QEMUIOVector *qiov,
> +                                   uint64_t sector_num,
> +                                   int nb_sectors,
> +                                   BlockDriverCompletionFunc *cb,
> +                                   void *opaque)
> +{
> +    QuorumAIOCB *acb = qemu_aio_get(&quorum_aiocb_info, bs, cb, opaque);
> +    int i;
> +
> +    acb->bqs = s;

Noticed it only here, but it's really in patch 2 (and should be in
patch 1):

What is acb->bqs good for? Isn't it always the same as
acb->common.bs->opaque?

> +    acb->sector_num = sector_num;
> +    acb->nb_sectors = nb_sectors;
> +    acb->qiov = qiov;
> +    acb->aios = g_new0(QuorumSingleAIOCB, s->total);
> +    acb->count = 0;
> +    acb->success_count = 0;
> +    acb->finished = NULL;
> +    acb->is_read = false;
> +    acb->vote_ret = 0;
> +
> +    for (i = 0; i < s->total; i++) {
> +        acb->aios[i].buf = NULL;
> +        acb->aios[i].ret = 0;
> +        acb->aios[i].parent = acb;
> +    }
>
> +
> +    return acb;
> +}

Kevin

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH V15 04/13] blkverify: Extract qemu_iovec_clone() and qemu_iovec_compare() from blkverify.
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 04/13] blkverify: Extract qemu_iovec_clone() and qemu_iovec_compare() from blkverify Benoît Canet
@ 2014-02-04 14:04   ` Kevin Wolf
  0 siblings, 0 replies; 29+ messages in thread
From: Kevin Wolf @ 2014-02-04 14:04 UTC (permalink / raw)
  To: Benoît Canet; +Cc: Benoît Canet, qemu-devel, stefanha, mreitz

Am 03.02.2014 um 22:51 hat Benoît Canet geschrieben:
> From: Benoît Canet <benoit@irqsave.net>
> 
> Signed-off-by: Benoit Canet <benoit@irqsave.net>
> Reviewed-by: Max Reitz <mreitz@redhat.com>
> ---
>  block/blkverify.c     | 108 +-------------------------------------------------
>  include/qemu-common.h |   2 +
>  util/iov.c            | 103 +++++++++++++++++++++++++++++++++++++++++++++++
>  3 files changed, 107 insertions(+), 106 deletions(-)

> --- a/util/iov.c
> +++ b/util/iov.c
> @@ -378,6 +378,109 @@ size_t qemu_iovec_memset(QEMUIOVector *qiov, size_t offset,
>      return iov_memset(qiov->iov, qiov->niov, offset, fillc, bytes);
>  }
>  
> +/**
> + * Check that I/O vector contents are identical
> + *
> + * @a:          I/O vector
> + * @b:          I/O vector
> + * @ret:        Offset to first mismatching byte or -1 if match
> + */

Perhaps we should update the documentation to state explicitly that the
I/O vectors must have the same structure (i.e. same length of all parts)
and that you should therefore only use it on vectors previously created
with qemu_iovec_clone().

> +ssize_t qemu_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
> +{
> +    int i;
> +    ssize_t offset = 0;
> +
> +    assert(a->niov == b->niov);
> +    for (i = 0; i < a->niov; i++) {
> +        size_t len = 0;
> +        uint8_t *p = (uint8_t *)a->iov[i].iov_base;
> +        uint8_t *q = (uint8_t *)b->iov[i].iov_base;
> +
> +        assert(a->iov[i].iov_len == b->iov[i].iov_len);
> +        while (len < a->iov[i].iov_len && *p++ == *q++) {
> +            len++;
> +        }
> +
> +        offset += len;
> +
> +        if (len != a->iov[i].iov_len) {
> +            return offset;
> +        }
> +    }
> +    return -1;
> +}

Kevin

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH V15 05/13] quorum: Add quorum_aio_readv.
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 05/13] quorum: Add quorum_aio_readv Benoît Canet
@ 2014-02-04 14:15   ` Kevin Wolf
  0 siblings, 0 replies; 29+ messages in thread
From: Kevin Wolf @ 2014-02-04 14:15 UTC (permalink / raw)
  To: Benoît Canet; +Cc: Benoît Canet, qemu-devel, stefanha, mreitz

Am 03.02.2014 um 22:51 hat Benoît Canet geschrieben:
> From: Benoît Canet <benoit@irqsave.net>
> 
> Signed-off-by: Benoit Canet <benoit@irqsave.net>
> Reviewed-by: Max Reitz <mreitz@redhat.com>
> ---
>  block/quorum.c | 38 ++++++++++++++++++++++++++++++++++++++
>  1 file changed, 38 insertions(+)
> 
> diff --git a/block/quorum.c b/block/quorum.c
> index 81bffdd..699b512 100644
> --- a/block/quorum.c
> +++ b/block/quorum.c
> @@ -86,10 +86,19 @@ static void quorum_aio_finalize(QuorumAIOCB *acb)
>      BDRVQuorumState *s = acb->bqs;
>      int ret = 0;
>  
> +    for (i = 0; i < s->total; i++) {
> +        qemu_vfree(acb->aios[i].buf);
> +        acb->aios[i].buf = NULL;
> +        acb->aios[i].ret = 0;

You free acb->aios anyway, no point in clearing the fields.

> +    }
> +
>      acb->common.cb(acb->common.opaque, ret);
>      if (acb->finished) {
>          *acb->finished = true;
>      }
> +    for (i = 0; acb->is_read && i < s->total; i++) {

As Max noted, pulling acb->is_read into the for loop condition is
clever. Code being unnecessarily clever is usually bad.

> +        qemu_iovec_destroy(&acb->aios[i].qiov);
> +    }

Any reason to have two separate for loops? I can't see one why the qiov
should be any longer valid than the data in it. I also can't see the
reason for the different conditions, other than qemu_vfree() being a
no-op for writes.

Keep it simple:

if (acb->is_read) {
    for (i = 0; i < s->total; i++) {
        qemu_vfree(acb->aios[i].buf);
        qemu_iovec_destroy(&acb->aios[i].qiov);
    }
}

It's shorter, less clever and more readable than your version.

>      g_free(acb->aios);
>      qemu_aio_release(acb);
>  }
> @@ -145,6 +154,34 @@ static void quorum_aio_cb(void *opaque, int ret)
>      quorum_aio_finalize(acb);
>  }
>  
> +static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
> +                                         int64_t sector_num,
> +                                         QEMUIOVector *qiov,
> +                                         int nb_sectors,
> +                                         BlockDriverCompletionFunc *cb,
> +                                         void *opaque)
> +{
> +    BDRVQuorumState *s = bs->opaque;
> +    QuorumAIOCB *acb = quorum_aio_get(s, bs, qiov, sector_num,
> +                                      nb_sectors, cb, opaque);
> +    int i;
> +
> +    acb->is_read = true;
> +
> +    for (i = 0; i < s->total; i++) {
> +        acb->aios[i].buf = qemu_blockalign(bs->file, qiov->size);

Shouldn't this be s->bs[i] instead of bs->file?

> +        qemu_iovec_init(&acb->aios[i].qiov, qiov->niov);
> +        qemu_iovec_clone(&acb->aios[i].qiov, qiov, acb->aios[i].buf);
> +    }
> +
> +    for (i = 0; i < s->total; i++) {
> +        bdrv_aio_readv(s->bs[i], sector_num, qiov, nb_sectors,
> +                       quorum_aio_cb, &acb->aios[i]);
> +    }
> +
> +    return &acb->common;
> +}
> +
>  static BlockDriverAIOCB *quorum_aio_writev(BlockDriverState *bs,
>                                            int64_t sector_num,
>                                            QEMUIOVector *qiov,
> @@ -172,6 +209,7 @@ static BlockDriver bdrv_quorum = {
>  
>      .instance_size      = sizeof(BDRVQuorumState),
>  
> +    .bdrv_aio_readv     = quorum_aio_readv,
>      .bdrv_aio_writev    = quorum_aio_writev,
>  };

Kevin

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH V15 06/13] quorum: Add quorum mechanism.
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 06/13] quorum: Add quorum mechanism Benoît Canet
@ 2014-02-04 15:40   ` Kevin Wolf
  2014-02-05 15:14     ` Benoît Canet
  0 siblings, 1 reply; 29+ messages in thread
From: Kevin Wolf @ 2014-02-04 15:40 UTC (permalink / raw)
  To: Benoît Canet; +Cc: Benoît Canet, qemu-devel, stefanha, mreitz

Am 03.02.2014 um 22:51 hat Benoît Canet geschrieben:
> From: Benoît Canet <benoit@irqsave.net>
> 
> Use gnutls's SHA-256 to compare versions.
> 
> Signed-off-by: Benoit Canet <benoit@irqsave.net>
> ---
>  block/Makefile.objs       |   2 +-
>  block/quorum.c            | 386 +++++++++++++++++++++++++++++++++++++++++++++-
>  configure                 |  36 +++++
>  docs/qmp/qmp-events.txt   |  33 ++++
>  include/monitor/monitor.h |   2 +
>  monitor.c                 |   2 +
>  6 files changed, 458 insertions(+), 3 deletions(-)
> 
> diff --git a/block/Makefile.objs b/block/Makefile.objs
> index a2650b9..4ca9d43 100644
> --- a/block/Makefile.objs
> +++ b/block/Makefile.objs
> @@ -3,7 +3,7 @@ block-obj-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-c
>  block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
>  block-obj-y += qed-check.o
>  block-obj-$(CONFIG_VHDX) += vhdx.o vhdx-endian.o vhdx-log.o
> -block-obj-y += quorum.o
> +block-obj-$(CONFIG_QUORUM) += quorum.o
>  block-obj-y += parallels.o blkdebug.o blkverify.o
>  block-obj-y += snapshot.o qapi.o
>  block-obj-$(CONFIG_WIN32) += raw-win32.o win32-aio.o
> diff --git a/block/quorum.c b/block/quorum.c
> index 699b512..837d261 100644
> --- a/block/quorum.c
> +++ b/block/quorum.c
> @@ -13,7 +13,43 @@
>   * See the COPYING file in the top-level directory.
>   */
>  
> +#include <gnutls/gnutls.h>
> +#include <gnutls/crypto.h>
>  #include "block/block_int.h"
> +#include "qapi/qmp/qjson.h"
> +
> +#define HASH_LENGTH 32
> +
> +/* This union holds a vote hash value */
> +typedef union QuorumVoteValue {
> +    char h[HASH_LENGTH];       /* SHA-256 hash */
> +    int64_t l;                 /* simpler 64 bits hash */
> +} QuorumVoteValue;
> +
> +/* A vote item */
> +typedef struct QuorumVoteItem {
> +    int index;
> +    QLIST_ENTRY(QuorumVoteItem) next;
> +} QuorumVoteItem;
> +
> +/* this structure is a vote version. A version is the set of votes sharing the
> + * same vote value.
> + * The set of votes will be tracked with the items field and its cardinality is
> + * vote_count.
> + */
> +typedef struct QuorumVoteVersion {
> +    QuorumVoteValue value;
> +    int index;
> +    int vote_count;
> +    QLIST_HEAD(, QuorumVoteItem) items;
> +    QLIST_ENTRY(QuorumVoteVersion) next;
> +} QuorumVoteVersion;
> +
> +/* this structure holds a group of vote versions together */
> +typedef struct QuorumVotes {
> +    QLIST_HEAD(, QuorumVoteVersion) vote_list;
> +    int (*compare)(QuorumVoteValue *a, QuorumVoteValue *b);
> +} QuorumVotes;
>  
>  /* the following structure holds the state of one quorum instance */
>  typedef struct {
> @@ -60,10 +96,14 @@ struct QuorumAIOCB {
>      int success_count;          /* number of successfully completed AIOCB */
>      bool *finished;             /* completion signal for cancel */
>  
> +    QuorumVotes votes;
> +
>      bool is_read;
>      int vote_ret;
>  };
>  
> +static void quorum_vote(QuorumAIOCB *acb);
> +
>  static void quorum_aio_cancel(BlockDriverAIOCB *blockacb)
>  {
>      QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common);
> @@ -81,10 +121,12 @@ static AIOCBInfo quorum_aiocb_info = {
>      .cancel             = quorum_aio_cancel,
>  };
>  
> +static int quorum_vote_error(QuorumAIOCB *acb);
> +

What's the reason for putting the forward declaration here? This is
neither directly before the first user nor at the top.

In fact, the next occurence of quorum_vote_error() is the implementation
of the function, so the forward declaration is completely unnecessary.

>  static void quorum_aio_finalize(QuorumAIOCB *acb)
>  {
>      BDRVQuorumState *s = acb->bqs;
> -    int ret = 0;
> +    int i, ret = 0;
>  
>      for (i = 0; i < s->total; i++) {
>          qemu_vfree(acb->aios[i].buf);
> @@ -92,6 +134,10 @@ static void quorum_aio_finalize(QuorumAIOCB *acb)
>          acb->aios[i].ret = 0;
>      }
>  
> +    if (acb->vote_ret) {
> +        ret = acb->vote_ret;
> +    }
> +
>      acb->common.cb(acb->common.opaque, ret);
>      if (acb->finished) {
>          *acb->finished = true;
> @@ -103,6 +149,27 @@ static void quorum_aio_finalize(QuorumAIOCB *acb)
>      qemu_aio_release(acb);
>  }
>  
> +static int quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
> +{
> +    return memcmp(a->h, b->h, HASH_LENGTH);
> +}
> +
> +static int quorum_64bits_compare(QuorumVoteValue *a, QuorumVoteValue *b)
> +{
> +    int64_t i = a->l;
> +    int64_t j = b->l;
> +
> +    if (i < j) {
> +        return -1;
> +    }
> +
> +    if (i > j) {
> +        return 1;
> +    }
> +
> +    return 0;
> +}

The usual way to implement this is 'return a->l - b->l;', because if you
expect memcmp() to return a valid value for the compare function you
can't assume that it's normalised to -1/0/1 anyway.

As you only ever use the result as a bool, you could alternatively
even declare the function as such and do 'return a->l != b->l;'.

>  static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
>                                     BlockDriverState *bs,
>                                     QEMUIOVector *qiov,
> @@ -122,6 +189,7 @@ static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
>      acb->count = 0;
>      acb->success_count = 0;
>      acb->finished = NULL;
> +    acb->votes.compare = quorum_sha256_compare;
>      acb->is_read = false;
>      acb->vote_ret = 0;
>  

You need to initialise votes.vote_list as well.

> @@ -151,9 +219,323 @@ static void quorum_aio_cb(void *opaque, int ret)
>          return;
>      }
>  
> +    /* Do the vote on read */
> +    if (acb->is_read) {
> +        quorum_vote(acb);
> +    }
> +
>      quorum_aio_finalize(acb);
>  }
>  
> +static void quorum_report_bad(QuorumAIOCB *acb, char *node_name)
> +{
> +    QObject *data;
> +    data = qobject_from_jsonf("{ 'node-name': \"%s\""
> +                              ", 'sector-num': %" PRId64
> +                              ", 'sectors-count': %i }",
> +                              node_name,

Can't node_name be NULL here?

> +                              acb->sector_num,
> +                              acb->nb_sectors);
> +    monitor_protocol_event(QEVENT_QUORUM_REPORT_BAD, data);
> +    qobject_decref(data);
> +}
> +
> +static void quorum_report_failure(QuorumAIOCB *acb)
> +{
> +    QObject *data;
> +    data = qobject_from_jsonf("{ 'sector-num': %" PRId64
> +                              ", 'sectors-count': %i }",
> +                              acb->sector_num,
> +                              acb->nb_sectors);

If I have multiple quorum devices, this event doesn't tell me, which one
it is about.

> +    monitor_protocol_event(QEVENT_QUORUM_FAILURE, data);
> +    qobject_decref(data);
> +}
> +
> +static void quorum_report_bad_versions(BDRVQuorumState *s,
> +                                       QuorumAIOCB *acb,
> +                                       QuorumVoteValue *value)
> +{
> +    QuorumVoteVersion *version;
> +    QuorumVoteItem *item;
> +
> +    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
> +        if (!acb->votes.compare(&version->value, value)) {
> +            continue;
> +        }
> +        QLIST_FOREACH(item, &version->items, next) {
> +            quorum_report_bad(acb, s->bs[item->index]->node_name);
> +        }
> +    }
> +}
> +
> +static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
> +{
> +    int i;
> +    assert(dest->niov == source->niov);
> +    assert(dest->size == source->size);
> +    for (i = 0; i < source->niov; i++) {
> +        assert(dest->iov[i].iov_len == source->iov[i].iov_len);
> +        memcpy(dest->iov[i].iov_base,
> +               source->iov[i].iov_base,
> +               source->iov[i].iov_len);
> +    }
> +}
> +
> +static void quorum_count_vote(QuorumVotes *votes,
> +                              QuorumVoteValue *value,
> +                              int index)
> +{
> +    QuorumVoteVersion *v = NULL, *version = NULL;
> +    QuorumVoteItem *item;
> +
> +    /* look if we have something with this hash */
> +    QLIST_FOREACH(v, &votes->vote_list, next) {
> +        if (!votes->compare(&v->value, value)) {
> +            version = v;
> +            break;
> +        }
> +    }
> +
> +    /* It's a version not yet in the list add it */
> +    if (!version) {
> +        version = g_new0(QuorumVoteVersion, 1);
> +        QLIST_INIT(&version->items);
> +        memcpy(&version->value, value, sizeof(version->value));
> +        version->index = index;
> +        version->vote_count = 0;
> +        QLIST_INSERT_HEAD(&votes->vote_list, version, next);
> +    }
> +
> +    version->vote_count++;
> +
> +    item = g_new0(QuorumVoteItem, 1);
> +    item->index = index;
> +    QLIST_INSERT_HEAD(&version->items, item, next);
> +}
> +
> +static void quorum_free_vote_list(QuorumVotes *votes)
> +{
> +    QuorumVoteVersion *version, *next_version;
> +    QuorumVoteItem *item, *next_item;
> +
> +    QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) {
> +        QLIST_REMOVE(version, next);
> +        QLIST_FOREACH_SAFE(item, &version->items, next, next_item) {
> +            QLIST_REMOVE(item, next);
> +            g_free(item);
> +        }
> +        g_free(version);
> +    }
> +}
> +
> +static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash)
> +{
> +    int j, ret;
> +    gnutls_hash_hd_t dig;
> +    QEMUIOVector *qiov = &acb->aios[i].qiov;
> +
> +    ret = gnutls_hash_init(&dig, GNUTLS_DIG_SHA256);
> +
> +    if (ret < 0) {
> +        return ret;
> +    }
> +
> +    for (j = 0; j < qiov->niov; j++) {
> +        ret = gnutls_hash(dig, qiov->iov[j].iov_base, qiov->iov[j].iov_len);
> +        if (ret < 0) {
> +            break;
> +        }
> +    }
> +
> +    gnutls_hash_deinit(dig, (void *) hash);
> +    return ret;
> +}
> +
> +static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes)
> +{
> +    int i = 0;

I like obvious variable names. This must be a loop counter.

> +    QuorumVoteVersion *candidate, *winner = NULL;
> +
> +    QLIST_FOREACH(candidate, &votes->vote_list, next) {
> +        if (candidate->vote_count > i) {
> +            i = candidate->vote_count;

Wait, what? This doesn't quite look like a loop.

> +            winner = candidate;
> +        }
> +    }
> +
> +    return winner;
> +}
> +
> +static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
> +{
> +    int i;
> +    int result;
> +
> +    assert(a->niov == b->niov);
> +    for (i = 0; i < a->niov; i++) {
> +        assert(a->iov[i].iov_len == b->iov[i].iov_len);
> +        result = memcmp(a->iov[i].iov_base,
> +                        b->iov[i].iov_base,
> +                        a->iov[i].iov_len);
> +        if (result) {
> +            return false;
> +        }
> +    }
> +
> +    return true;
> +}

I thought we introduced qemu_iovec_compare() earlier in this series to
do exactly this, except more generically?

I see that you call one or the other depending on whether we're running
in blkverify mode, but what is the difference in the semantics? Either
both are the same and there is no reason to have both, or one of them
must have non-obvious semantics and lacks proper documentation.

> +static void GCC_FMT_ATTR(2, 3) quorum_err(QuorumAIOCB *acb,
> +                                          const char *fmt, ...)
> +{
> +    va_list ap;
> +
> +    va_start(ap, fmt);
> +    fprintf(stderr, "quorum: sector_num=%" PRId64 " nb_sectors=%d ",
> +            acb->sector_num, acb->nb_sectors);
> +    vfprintf(stderr, fmt, ap);
> +    fprintf(stderr, "\n");
> +    va_end(ap);
> +    exit(1);
> +}
> +
> +static bool quorum_compare(QuorumAIOCB *acb,
> +                           QEMUIOVector *a,
> +                           QEMUIOVector *b)
> +{
> +    BDRVQuorumState *s = acb->bqs;
> +    ssize_t offset;
> +
> +    /* This driver will replace blkverify in this particular case */
> +    if (s->is_blkverify) {
> +        offset = qemu_iovec_compare(a, b);
> +        if (offset != -1) {
> +            quorum_err(acb, "contents mismatch in sector %" PRId64,
> +                       acb->sector_num +
> +                       (uint64_t)(offset / BDRV_SECTOR_SIZE));
> +        }
> +        return true;
> +    }
> +
> +    return quorum_iovec_compare(a, b);
> +}
> +
> +/* Do a vote to get the error code */
> +static int quorum_vote_error(QuorumAIOCB *acb)
> +{
> +    BDRVQuorumState *s = acb->bqs;
> +    QuorumVoteVersion *winner = NULL;
> +    QuorumVotes error_votes;
> +    QuorumVoteValue result_value;
> +    int i, ret = 0;
> +    bool error = false;
> +
> +    QLIST_INIT(&error_votes.vote_list);
> +    error_votes.compare = quorum_64bits_compare;
> +
> +    for (i = 0; i < s->total; i++) {
> +        ret = acb->aios[i].ret;
> +        if (ret) {
> +            error = true;
> +            result_value.l = ret;
> +            quorum_count_vote(&error_votes, &result_value, i);
> +        }
> +    }
> +
> +    if (error) {
> +        winner = quorum_get_vote_winner(&error_votes);
> +        ret = winner->value.l;
> +    }
> +
> +    quorum_free_vote_list(&error_votes);
> +
> +    return ret;
> +}
> +
> +static void quorum_vote(QuorumAIOCB *acb)
> +{
> +    bool quorum = false;
> +    int i, j, ret;
> +    QuorumVoteValue hash;
> +    BDRVQuorumState *s = acb->bqs;
> +    QuorumVoteVersion *winner;
> +
> +    QLIST_INIT(&acb->votes.vote_list);
> +
> +    /* if we don't get enough successful read use the first error code */
> +    if (acb->success_count < s->threshold) {
> +        acb->vote_ret = quorum_vote_error(acb);
> +        quorum_report_failure(acb);
> +        return;
> +    }
> +
> +    /* get the index of the first successful read (we are sure to find one) */
> +    for (i = 0; i < s->total; i++) {
> +        if (!acb->aios[i].ret) {
> +            break;
> +        }
> +    }

"we are sure to find one" is spelt "assert(i < s->total);"

> +
> +    /* compare this read with all other successful read looking for quorum */
> +    for (j = i + 1; j < s->total; j++) {
> +        if (acb->aios[j].ret) {
> +            continue;
> +        }
> +        quorum = quorum_compare(acb, &acb->aios[i].qiov, &acb->aios[j].qiov);
> +        if (!quorum) {
> +            break;
> +       }
> +    }
> +
> +    /* Every successful read agrees and their count is higher or equal threshold
> +     * -> Quorum
> +     */
> +    if (quorum && acb->success_count >= s->threshold) {
> +        quorum_copy_qiov(acb->qiov, &acb->aios[i].qiov);
> +        return;
> +    }

For threshold == success_count == 1, the condition in the comment is
fulfilled, but the one in the code isn't.

> +
> +    /* compute hashs for each successful read, also store indexes */
> +    for (i = 0; i < s->total; i++) {
> +        if (acb->aios[i].ret) {
> +            continue;
> +        }
> +        ret = quorum_compute_hash(acb, i, &hash);
> +        /* if ever the hash computation failed */
> +        if (ret < 0) {
> +            acb->vote_ret = ret;
> +            goto free_exit;
> +        }
> +        quorum_count_vote(&acb->votes, &hash, i);
> +    }
> +
> +    /* vote to select the most represented version */
> +    winner = quorum_get_vote_winner(&acb->votes);
> +    /* every vote version are differents -> error */
> +    if (!winner) {

Can this happen? This means that there was no vote at all.

> +        quorum_report_failure(acb);
> +        acb->vote_ret = -EIO;
> +        goto free_exit;
> +    }
> +
> +    /* if the winner count is smaller than threshold the read fails */
> +    if (winner->vote_count < s->threshold) {
> +        quorum_report_failure(acb);
> +        acb->vote_ret = -EIO;
> +        goto free_exit;
> +    }
> +
> +    /* we have a winner: copy it */
> +    quorum_copy_qiov(acb->qiov, &acb->aios[winner->index].qiov);
> +
> +    /* some versions are bad print them */
> +    quorum_report_bad_versions(s, acb, &winner->value);
> +
> +free_exit:
> +    /* free lists */
> +    quorum_free_vote_list(&acb->votes);
> +}
> +
>  static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
>                                           int64_t sector_num,
>                                           QEMUIOVector *qiov,
> @@ -175,7 +557,7 @@ static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
>      }
>  
>      for (i = 0; i < s->total; i++) {
> -        bdrv_aio_readv(s->bs[i], sector_num, qiov, nb_sectors,
> +        bdrv_aio_readv(s->bs[i], sector_num, &acb->aios[i].qiov, nb_sectors,
>                         quorum_aio_cb, &acb->aios[i]);
>      }

Why don't you do this from the beginning?

Kevin

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH V15 09/13] quorum: Add quorum_co_get_block_status.
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 09/13] quorum: Add quorum_co_get_block_status Benoît Canet
@ 2014-02-04 15:49   ` Kevin Wolf
  2014-02-05 16:28     ` Benoît Canet
  0 siblings, 1 reply; 29+ messages in thread
From: Kevin Wolf @ 2014-02-04 15:49 UTC (permalink / raw)
  To: Benoît Canet; +Cc: Benoît Canet, qemu-devel, stefanha, mreitz

Am 03.02.2014 um 22:51 hat Benoît Canet geschrieben:
> From: Benoît Canet <benoit@irqsave.net>
> 
> Signed-off-by: Benoit Canet <benoit@irqsave.net>
> Reviewed-by: Max Reitz <mreitz@redhat.com>
> ---
>  block/quorum.c | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++
>  1 file changed, 51 insertions(+)
> 
> diff --git a/block/quorum.c b/block/quorum.c
> index cef4424..677a96d 100644
> --- a/block/quorum.c
> +++ b/block/quorum.c
> @@ -619,6 +619,56 @@ static void quorum_invalidate_cache(BlockDriverState *bs)
>      }
>  }
>  
> +static int64_t coroutine_fn quorum_co_get_block_status(BlockDriverState *bs,
> +                                                       int64_t sector_num,
> +                                                       int nb_sectors,
> +                                                       int *pnum)
> +{
> +    BDRVQuorumState *s = bs->opaque;
> +    QuorumVoteVersion *winner = NULL;
> +    QuorumVotes result_votes, num_votes;
> +    QuorumVoteValue result_value, num_value;
> +    int i, num;
> +    int64_t result = 0;
> +
> +    QLIST_INIT(&result_votes.vote_list);
> +    QLIST_INIT(&num_votes.vote_list);
> +    result_votes.compare = quorum_64bits_compare;
> +    num_votes.compare = quorum_64bits_compare;
> +
> +    for (i = 0; i < s->total; i++) {
> +        result = bdrv_get_block_status(s->bs[i], sector_num, nb_sectors, &num);
> +        /* skip failed requests */
> +        if (result < 0) {
> +            continue;
> +        }
> +        result_value.l = result & BDRV_BLOCK_DATA;
> +        num_value.l = num;
> +        quorum_count_vote(&result_votes, &result_value, i);
> +        quorum_count_vote(&num_votes, &num_value, i);
> +    }

This doesn't work. bdrv_get_block_status() doesn't guarantee that it
returns all consecutive blocks with the same status. You need to call it
in a loop here, or change bdrv_get_block_status() so that it loops
itself.

> +    winner = quorum_get_vote_winner(&result_votes);
> +    if (winner->vote_count < s->threshold) {
> +        result = -EIO;
> +        goto free_exit;
> +    }
> +    result = winner->value.l;
> +
> +    winner = quorum_get_vote_winner(&num_votes);
> +    if (winner->vote_count < s->threshold) {
> +        result = -EIO;
> +        goto free_exit;
> +    }
> +    *pnum = winner->value.l;

You can take the status from one group of devices and the number of
blocks that share this state from another group?!

> +
> +free_exit:
> +    quorum_free_vote_list(&result_votes);
> +    quorum_free_vote_list(&num_votes);
> +
> +    return result;
> +}
> +
>  static BlockDriver bdrv_quorum = {
>      .format_name        = "quorum",
>      .protocol_name      = "quorum",
> @@ -630,6 +680,7 @@ static BlockDriver bdrv_quorum = {
>      .bdrv_aio_readv     = quorum_aio_readv,
>      .bdrv_aio_writev    = quorum_aio_writev,
>      .bdrv_invalidate_cache = quorum_invalidate_cache,
> +    .bdrv_co_get_block_status = quorum_co_get_block_status,
>  };
>  
>  static void bdrv_quorum_init(void)

Kevin

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH V15 12/13] quorum: Add quorum_open() and quorum_close().
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 12/13] quorum: Add quorum_open() and quorum_close() Benoît Canet
@ 2014-02-04 16:08   ` Kevin Wolf
  2014-02-05 16:11     ` Benoît Canet
  0 siblings, 1 reply; 29+ messages in thread
From: Kevin Wolf @ 2014-02-04 16:08 UTC (permalink / raw)
  To: Benoît Canet; +Cc: Benoît Canet, qemu-devel, stefanha, mreitz

Am 03.02.2014 um 22:51 hat Benoît Canet geschrieben:
> From: Benoît Canet <benoit@irqsave.net>
> 
> Example of command line:
> -drive if=virtio,file.driver=quorum,\
> file.children.0.file.filename=1.raw,\
> file.children.0.node-name=1.raw,\
> file.children.0.driver=raw,\
> file.children.1.file.filename=2.raw,\
> file.children.1.node-name=2.raw,\
> file.children.1.driver=raw,\
> file.children.2.file.filename=3.raw,\
> file.children.2.node-name=3.raw,\
> file.children.2.driver=raw,\
> file.vote_threshold=2
> 
> file.blkverify=on with file.vote_threshold=2 and two files can be passed to
> emulated blkverify.
> 
> Signed-off-by: Benoit Canet <benoit@irqsave.net>
> ---
>  block/quorum.c   | 171 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
>  qapi-schema.json |  21 ++++++-
>  2 files changed, 191 insertions(+), 1 deletion(-)
> 
> diff --git a/block/quorum.c b/block/quorum.c
> index 1e683f8..d2bea29 100644
> --- a/block/quorum.c
> +++ b/block/quorum.c
> @@ -17,8 +17,12 @@
>  #include <gnutls/crypto.h>
>  #include "block/block_int.h"
>  #include "qapi/qmp/qjson.h"
> +#include "qapi/qmp/types.h"
> +#include "qemu-common.h"
>  
>  #define HASH_LENGTH 32
> +#define KEY_PREFIX "children."
> +#define KEY_FILENAME_SUFFIX ".file.filename"
>  
>  /* This union holds a vote hash value */
>  typedef union QuorumVoteValue {
> @@ -712,12 +716,179 @@ static bool quorum_recurse_is_first_non_filter(BlockDriverState *bs,
>      return false;
>  }
>  
> +static int quorum_valid_threshold(int threshold,
> +                                  int total,
> +                                  Error **errp)
> +{
> +
> +    if (threshold < 1) {
> +        error_set(errp, QERR_INVALID_PARAMETER_VALUE,
> +                  "vote-threshold", "value >= 1");
> +        return -ERANGE;
> +    }
> +
> +    if (threshold > total) {
> +        error_setg(errp, "threshold may not exceed children count");
> +        return -ERANGE;
> +    }
> +
> +    return 0;
> +}
> +
> +static int quorum_open(BlockDriverState *bs,
> +                       QDict *options,
> +                       int flags,
> +                       Error **errp)
> +{
> +    BDRVQuorumState *s = bs->opaque;
> +    Error *local_err = NULL;
> +    bool *opened;
> +    QDict *sub = NULL;
> +    QList *list = NULL;
> +    const QListEntry *lentry;
> +    const QDictEntry *dentry;
> +    const char *value;
> +    char *next;
> +    int i;
> +    int ret = 0;
> +    unsigned long long threshold = 0;
> +
> +    qdict_flatten(options);
> +    qdict_extract_subqdict(options, &sub, "children.");
> +    qdict_array_split(sub, &list);
> +
> +    /* count how many different children are present and validate */
> +    s->total = !qlist_size(list) ? qdict_size(sub) : qlist_size(list);

Which case does qdict_size(sub) address?

> +    if (s->total < 2) {
> +        error_setg(&local_err,
> +                   "Number of provided children must be greater than 1");
> +        ret = -EINVAL;
> +        goto exit;
> +    }
> +
> +    ret = qdict_get_try_int(options, "vote-threshold", -1);
> +    /* from QMP */
> +    if (ret != -1) {
> +        qdict_del(options, "vote-threshold");
> +        s->threshold = ret;
> +    /* from command line */
> +    } else {
> +        /* retrieve the threshold option from the command line */
> +        value = qdict_get_try_str(options, "vote_threshold");
> +        if (!value) {
> +            error_setg(&local_err,
> +                       "vote_threshold must be provided");
> +            ret = -EINVAL;
> +            goto exit;
> +        }
> +        qdict_del(options, "vote_threshold");
> +
> +        ret = parse_uint(value, &threshold, &next, 10);
> +
> +        /* no int found -> scan fail */
> +        if (ret < 0) {
> +            error_setg(&local_err,
> +                       "invalid vote_threshold specified");
> +            ret = -EINVAL;
> +            goto exit;
> +        }
> +        s->threshold = threshold;
> +    }

This part looks seriously wrong. I think you should consider using an
QemuOpts like other drivers do (have a look at qcow2, for example), that
should parse the integer for you.

> +    /* and validate it against s->total */
> +    ret = quorum_valid_threshold(s->threshold, s->total, &local_err);
> +    if (ret < 0) {
> +        goto exit;
> +    }
> +
> +    /* is the driver in blkverify mode */
> +    value = qdict_get_try_str(options, "blkverify");
> +    if (value && !strcmp(value, "on")  &&
> +        s->total == 2 && s->threshold == 2) {
> +        s->is_blkverify = true;
> +    } else if (value && strcmp(value, "off")) {
> +        fprintf(stderr, "blkverify mode is set by setting blkverify=on "
> +                        "and using two files with vote_threshold=2\n");
> +    }
> +    qdict_del(options, "blkverify");

And the QemuOpts would also know how to parse a boolean.

> +
> +    /* allocate the children BlockDriverState array */
> +    s->bs = g_new0(BlockDriverState *, s->total);
> +    opened = g_new0(bool, s->total);
> +
> +    /* Open by file name or options dict (command line or QMP) */
> +    if (s->total == qlist_size(list)) {
> +        for (i = 0, lentry = qlist_first(list); lentry;
> +            lentry = qlist_next(lentry), i++) {
> +            ret = bdrv_open(&s->bs[i], NULL, NULL,
> +                            qobject_to_qdict(lentry->value),
> +                            flags, NULL, &local_err);
> +            if (ret < 0) {
> +                goto close_exit;
> +            }
> +            opened[i] = true;
> +        }
> +    /* Open by QMP references */
> +    } else {
> +        for (i = 0, dentry = qdict_first(sub); dentry;
> +             dentry = qdict_next(sub, dentry), i++) {
> +            ret = bdrv_open(&s->bs[i], NULL,
> +                            qstring_get_str(qobject_to_qstring(dentry->value)),
> +                            NULL, flags, NULL, &local_err);
> +            if (ret < 0) {
> +                goto close_exit;
> +            }
> +            opened[i] = true;
> +        }
> +    }

What does the reference case look like? (Should work on the command
line, too) I imagine something like this:

    -drive if=virtio,file.driver=quorum,\
    file.children.0=image0,\
    file.children.1=image1,\
    file.children.2=image2,\
    file.vote_threshold=2

Doesn't qdict_array_split() split this into a list of strings? The whole
thing with directly accessing QDict entries for references confuses me.

> +    g_free(opened);
> +    goto exit;
> +
> +close_exit:
> +    /* cleanup on error */
> +    for (i = 0; i < s->total; i++) {
> +        if (!opened[i]) {
> +            continue;
> +        }
> +        bdrv_unref(s->bs[i]);
> +    }
> +    g_free(s->bs);
> +    g_free(opened);
> +exit:
> +    /* propagate error */
> +    if (error_is_set(&local_err)) {
> +        error_propagate(errp, local_err);
> +    }
> +    QDECREF(sub);
> +    QDECREF(list);
> +    return ret;
> +}
> +
> +static void quorum_close(BlockDriverState *bs)
> +{
> +    BDRVQuorumState *s = bs->opaque;
> +    int i;
> +
> +    for (i = 0; i < s->total; i++) {
> +        /* Ensure writes reach stable storage */
> +        bdrv_flush(s->bs[i]);

Not necessary, block.c code will flush for us.

> +        /* close manually because we'll free s->bs */
> +        bdrv_unref(s->bs[i]);
> +    }
> +
> +    g_free(s->bs);
> +}

Kevin

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH V15 13/13] quorum: Add unit test.
  2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 13/13] quorum: Add unit test Benoît Canet
@ 2014-02-04 16:13   ` Kevin Wolf
  2014-02-04 21:53     ` Benoît Canet
  0 siblings, 1 reply; 29+ messages in thread
From: Kevin Wolf @ 2014-02-04 16:13 UTC (permalink / raw)
  To: Benoît Canet; +Cc: Benoit Canet, qemu-devel, stefanha, mreitz

Am 03.02.2014 um 22:51 hat Benoît Canet geschrieben:
> Signed-off-by: Benoit Canet <benoit@irqsave.net>
> Reviewed-by: Max Reitz <mreitz@redhat.com>
> ---
>  tests/qemu-iotests/075     | 95 ++++++++++++++++++++++++++++++++++++++++++++++
>  tests/qemu-iotests/075.out | 34 +++++++++++++++++
>  tests/qemu-iotests/group   |  1 +
>  3 files changed, 130 insertions(+)
>  create mode 100755 tests/qemu-iotests/075
>  create mode 100644 tests/qemu-iotests/075.out

I believe 081 is the next free number that doesn't cause conflicts.

Kevin

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH V15 13/13] quorum: Add unit test.
  2014-02-04 16:13   ` Kevin Wolf
@ 2014-02-04 21:53     ` Benoît Canet
  0 siblings, 0 replies; 29+ messages in thread
From: Benoît Canet @ 2014-02-04 21:53 UTC (permalink / raw)
  To: Kevin Wolf; +Cc: Benoît Canet, qemu-devel, stefanha, mreitz

Le Tuesday 04 Feb 2014 à 17:13:27 (+0100), Kevin Wolf a écrit :
> Am 03.02.2014 um 22:51 hat Benoît Canet geschrieben:
> > Signed-off-by: Benoit Canet <benoit@irqsave.net>
> > Reviewed-by: Max Reitz <mreitz@redhat.com>
> > ---
> >  tests/qemu-iotests/075     | 95 ++++++++++++++++++++++++++++++++++++++++++++++
> >  tests/qemu-iotests/075.out | 34 +++++++++++++++++
> >  tests/qemu-iotests/group   |  1 +
> >  3 files changed, 130 insertions(+)
> >  create mode 100755 tests/qemu-iotests/075
> >  create mode 100644 tests/qemu-iotests/075.out
> 
> I believe 081 is the next free number that doesn't cause conflicts.

Thanks for the review I will iterate tomorow.

Best regards

Benoît

> 
> Kevin
> 

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH V15 01/13] quorum: Create quorum.c, add QuorumSingleAIOCB and QuorumAIOCB.
  2014-02-04 12:57   ` Kevin Wolf
@ 2014-02-05 13:29     ` Benoît Canet
  0 siblings, 0 replies; 29+ messages in thread
From: Benoît Canet @ 2014-02-05 13:29 UTC (permalink / raw)
  To: Kevin Wolf; +Cc: Benoît Canet, qemu-devel, stefanha, mreitz

Le Tuesday 04 Feb 2014 à 13:57:07 (+0100), Kevin Wolf a écrit :
> Am 03.02.2014 um 22:51 hat Benoît Canet geschrieben:
> > From: Benoît Canet <benoit@irqsave.net>
> > 
> > Signed-off-by: Benoit Canet <benoit@irqsave.net>
> > Reviewed-by: Max Reitz <mreitz@redhat.com>
> > ---
> >  block/Makefile.objs |  1 +
> >  block/quorum.c      | 54 +++++++++++++++++++++++++++++++++++++++++++++++++++++
> >  2 files changed, 55 insertions(+)
> >  create mode 100644 block/quorum.c
> > 
> > diff --git a/block/Makefile.objs b/block/Makefile.objs
> > index 4e8c91e..a2650b9 100644
> > --- a/block/Makefile.objs
> > +++ b/block/Makefile.objs
> > @@ -3,6 +3,7 @@ block-obj-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-c
> >  block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
> >  block-obj-y += qed-check.o
> >  block-obj-$(CONFIG_VHDX) += vhdx.o vhdx-endian.o vhdx-log.o
> > +block-obj-y += quorum.o
> >  block-obj-y += parallels.o blkdebug.o blkverify.o
> >  block-obj-y += snapshot.o qapi.o
> >  block-obj-$(CONFIG_WIN32) += raw-win32.o win32-aio.o
> > diff --git a/block/quorum.c b/block/quorum.c
> > new file mode 100644
> > index 0000000..17695d6
> > --- /dev/null
> > +++ b/block/quorum.c
> > @@ -0,0 +1,54 @@
> > +/*
> > + * Quorum Block filter
> > + *
> > + * Copyright (C) 2012-2014 Nodalink, EURL.
> > + *
> > + * Author:
> > + *   Benoît Canet <benoit.canet@irqsave.net>
> > + *
> > + * Based on the design and code of blkverify.c (Copyright (C) 2010 IBM, Corp)
> > + * and blkmirror.c (Copyright (C) 2011 Red Hat, Inc).
> 
> I think you were planning to respin anyway, so I'll practice my
> nitpicking: The file is called mirror.c, not blkmirror.c.

It's a reference to blkmirror.c by Marcello Tosati. A patch that was never merged.
https://www.google.fr/url?sa=t&rct=j&q=&esrc=s&source=web&cd=1&cad=rja&ved=0CCkQFjAA&url=https%3A%2F%2Flists.gnu.org%2Farchive%2Fhtml%2Fqemu-devel%2F2011-05%2Fmsg02521.html&ei=FzzyUuOBEeSg0QX9jYCgCg&usg=AFQjCNG6W5qbmITacObTAD4ShOZlxky2bA&bvm=bv.60799247,d.d2k

> 
> > + *
> > + * This work is licensed under the terms of the GNU GPL, version 2 or later.
> > + * See the COPYING file in the top-level directory.
> > + */
> > +
> > +#include "block/block_int.h"
> > +
> > +typedef struct QuorumAIOCB QuorumAIOCB;
> > +
> > +/* Quorum will create one instance of the following structure per operation it
> > + * performs on its children.
> > + * So for each read/write operation coming from the upper layer there will be
> > + * $children_count QuorumSingleAIOCB.
> > + */
> > +typedef struct QuorumSingleAIOCB {
> > +    BlockDriverAIOCB *aiocb;
> 
> So this isn't a real AIOCB, but it merely points to one. Perhaps
> something like QuorumChildRequest would be a more precise name?
> 
> > +    QEMUIOVector qiov;
> > +    uint8_t *buf;
> 
> The combination of a linear buffer and qiov is unusual. It looks like
> there may be a reason for it (otherwise qemu_iovec_clone() wouldn't
> exist), but I don't understand it yet at this point. Could hint at a
> lack of documentation.
> 
> I might back come to this later in the patch series.
> 
> > +    int ret;
> > +    QuorumAIOCB *parent;
> > +} QuorumSingleAIOCB;
> > +
> > +/* Quorum will use the following structure to track progress of each read/write
> > + * operation received by the upper layer.
> > + * This structure hold pointers to the QuorumSingleAIOCB structures instances
> > + * used to do operations on each children and track overall progress.
> > + */
> > +struct QuorumAIOCB {
> > +    BlockDriverAIOCB common;
> > +
> > +    /* Request metadata */
> > +    uint64_t sector_num;
> > +    int nb_sectors;
> > +
> > +    QEMUIOVector *qiov;         /* calling IOV */
> > +
> > +    QuorumSingleAIOCB *aios;    /* individual AIOs */
> > +    int count;                  /* number of completed AIOCB */
> > +    int success_count;          /* number of successfully completed AIOCB */
> > +    bool *finished;             /* completion signal for cancel */
> > +
> > +    bool is_read;
> > +    int vote_ret;
> > +};
> 
> Kevin

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH V15 03/13] quorum: Add quorum_aio_writev and its dependencies.
  2014-02-04 13:57   ` Kevin Wolf
@ 2014-02-05 14:14     ` Benoît Canet
  0 siblings, 0 replies; 29+ messages in thread
From: Benoît Canet @ 2014-02-05 14:14 UTC (permalink / raw)
  To: Kevin Wolf; +Cc: Benoît Canet, qemu-devel, stefanha, mreitz

Le Tuesday 04 Feb 2014 à 14:57:22 (+0100), Kevin Wolf a écrit :
> Am 03.02.2014 um 22:51 hat Benoît Canet geschrieben:
> > From: Benoît Canet <benoit@irqsave.net>
> > 
> > Signed-off-by: Benoit Canet <benoit@irqsave.net>
> > ---
> >  block/quorum.c | 104 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
> >  1 file changed, 104 insertions(+)
> 
> Starting with writes before the driver can even open an image is a weird
> order to do things in. It also doesn't make the review any easier when
> you don't know how things are initialised.
I have done it in this way for better git bisectability: if the driver cannot
open the quorum the commit is a preparation work that should be excluded of git
bisect commit range.

For the write before read order it's this way because quorum writes are simpler.

Best regards

Benoît

> 
> > diff --git a/block/quorum.c b/block/quorum.c
> > index 157efdf..81bffdd 100644
> > --- a/block/quorum.c
> > +++ b/block/quorum.c
> > @@ -64,11 +64,115 @@ struct QuorumAIOCB {
> >      int vote_ret;
> >  };
> >  
> > +static void quorum_aio_cancel(BlockDriverAIOCB *blockacb)
> > +{
> > +    QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common);
> > +    BDRVQuorumState *s = acb->bqs;
> > +    int i;
> > +
> > +    /* cancel all callback */
> 
> "callbacks"
> 
> > +    for (i = 0; i < s->total; i++) {
> > +        bdrv_aio_cancel(acb->aios[i].aiocb);
> > +    }
> > +}
> 
> Don't you want to free acb and similar cleanup?
> 
> > +
> > +static AIOCBInfo quorum_aiocb_info = {
> > +    .aiocb_size         = sizeof(QuorumAIOCB),
> > +    .cancel             = quorum_aio_cancel,
> > +};
> > +
> > +static void quorum_aio_finalize(QuorumAIOCB *acb)
> > +{
> > +    BDRVQuorumState *s = acb->bqs;
> 
> block/quorum.c: In function 'quorum_aio_finalize':
> block/quorum.c:86:22: error: unused variable 's' [-Werror=unused-variable]
> 
> > +    int ret = 0;
> > +
> > +    acb->common.cb(acb->common.opaque, ret);
> > +    if (acb->finished) {
> > +        *acb->finished = true;
> > +    }
> > +    g_free(acb->aios);
> > +    qemu_aio_release(acb);
> > +}
> > +
> > +static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
> > +                                   BlockDriverState *bs,
> > +                                   QEMUIOVector *qiov,
> > +                                   uint64_t sector_num,
> > +                                   int nb_sectors,
> > +                                   BlockDriverCompletionFunc *cb,
> > +                                   void *opaque)
> > +{
> > +    QuorumAIOCB *acb = qemu_aio_get(&quorum_aiocb_info, bs, cb, opaque);
> > +    int i;
> > +
> > +    acb->bqs = s;
> 
> Noticed it only here, but it's really in patch 2 (and should be in
> patch 1):
> 
> What is acb->bqs good for? Isn't it always the same as
> acb->common.bs->opaque?
> 
> > +    acb->sector_num = sector_num;
> > +    acb->nb_sectors = nb_sectors;
> > +    acb->qiov = qiov;
> > +    acb->aios = g_new0(QuorumSingleAIOCB, s->total);
> > +    acb->count = 0;
> > +    acb->success_count = 0;
> > +    acb->finished = NULL;
> > +    acb->is_read = false;
> > +    acb->vote_ret = 0;
> > +
> > +    for (i = 0; i < s->total; i++) {
> > +        acb->aios[i].buf = NULL;
> > +        acb->aios[i].ret = 0;
> > +        acb->aios[i].parent = acb;
> > +    }
> >
> > +
> > +    return acb;
> > +}
> 
> Kevin

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH V15 06/13] quorum: Add quorum mechanism.
  2014-02-04 15:40   ` Kevin Wolf
@ 2014-02-05 15:14     ` Benoît Canet
  0 siblings, 0 replies; 29+ messages in thread
From: Benoît Canet @ 2014-02-05 15:14 UTC (permalink / raw)
  To: Kevin Wolf; +Cc: Benoît Canet, qemu-devel, stefanha, mreitz

Le Tuesday 04 Feb 2014 à 16:40:12 (+0100), Kevin Wolf a écrit :
> Am 03.02.2014 um 22:51 hat Benoît Canet geschrieben:
> > From: Benoît Canet <benoit@irqsave.net>
> > 
> > Use gnutls's SHA-256 to compare versions.
> > 
> > Signed-off-by: Benoit Canet <benoit@irqsave.net>
> > ---
> >  block/Makefile.objs       |   2 +-
> >  block/quorum.c            | 386 +++++++++++++++++++++++++++++++++++++++++++++-
> >  configure                 |  36 +++++
> >  docs/qmp/qmp-events.txt   |  33 ++++
> >  include/monitor/monitor.h |   2 +
> >  monitor.c                 |   2 +
> >  6 files changed, 458 insertions(+), 3 deletions(-)
> > 
> > diff --git a/block/Makefile.objs b/block/Makefile.objs
> > index a2650b9..4ca9d43 100644
> > --- a/block/Makefile.objs
> > +++ b/block/Makefile.objs
> > @@ -3,7 +3,7 @@ block-obj-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-c
> >  block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
> >  block-obj-y += qed-check.o
> >  block-obj-$(CONFIG_VHDX) += vhdx.o vhdx-endian.o vhdx-log.o
> > -block-obj-y += quorum.o
> > +block-obj-$(CONFIG_QUORUM) += quorum.o
> >  block-obj-y += parallels.o blkdebug.o blkverify.o
> >  block-obj-y += snapshot.o qapi.o
> >  block-obj-$(CONFIG_WIN32) += raw-win32.o win32-aio.o
> > diff --git a/block/quorum.c b/block/quorum.c
> > index 699b512..837d261 100644
> > --- a/block/quorum.c
> > +++ b/block/quorum.c
> > @@ -13,7 +13,43 @@
> >   * See the COPYING file in the top-level directory.
> >   */
> >  
> > +#include <gnutls/gnutls.h>
> > +#include <gnutls/crypto.h>
> >  #include "block/block_int.h"
> > +#include "qapi/qmp/qjson.h"
> > +
> > +#define HASH_LENGTH 32
> > +
> > +/* This union holds a vote hash value */
> > +typedef union QuorumVoteValue {
> > +    char h[HASH_LENGTH];       /* SHA-256 hash */
> > +    int64_t l;                 /* simpler 64 bits hash */
> > +} QuorumVoteValue;
> > +
> > +/* A vote item */
> > +typedef struct QuorumVoteItem {
> > +    int index;
> > +    QLIST_ENTRY(QuorumVoteItem) next;
> > +} QuorumVoteItem;
> > +
> > +/* this structure is a vote version. A version is the set of votes sharing the
> > + * same vote value.
> > + * The set of votes will be tracked with the items field and its cardinality is
> > + * vote_count.
> > + */
> > +typedef struct QuorumVoteVersion {
> > +    QuorumVoteValue value;
> > +    int index;
> > +    int vote_count;
> > +    QLIST_HEAD(, QuorumVoteItem) items;
> > +    QLIST_ENTRY(QuorumVoteVersion) next;
> > +} QuorumVoteVersion;
> > +
> > +/* this structure holds a group of vote versions together */
> > +typedef struct QuorumVotes {
> > +    QLIST_HEAD(, QuorumVoteVersion) vote_list;
> > +    int (*compare)(QuorumVoteValue *a, QuorumVoteValue *b);
> > +} QuorumVotes;
> >  
> >  /* the following structure holds the state of one quorum instance */
> >  typedef struct {
> > @@ -60,10 +96,14 @@ struct QuorumAIOCB {
> >      int success_count;          /* number of successfully completed AIOCB */
> >      bool *finished;             /* completion signal for cancel */
> >  
> > +    QuorumVotes votes;
> > +
> >      bool is_read;
> >      int vote_ret;
> >  };
> >  
> > +static void quorum_vote(QuorumAIOCB *acb);
> > +
> >  static void quorum_aio_cancel(BlockDriverAIOCB *blockacb)
> >  {
> >      QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common);
> > @@ -81,10 +121,12 @@ static AIOCBInfo quorum_aiocb_info = {
> >      .cancel             = quorum_aio_cancel,
> >  };
> >  
> > +static int quorum_vote_error(QuorumAIOCB *acb);
> > +
> 
> What's the reason for putting the forward declaration here? This is
> neither directly before the first user nor at the top.
> 
> In fact, the next occurence of quorum_vote_error() is the implementation
> of the function, so the forward declaration is completely unnecessary.
> 
> >  static void quorum_aio_finalize(QuorumAIOCB *acb)
> >  {
> >      BDRVQuorumState *s = acb->bqs;
> > -    int ret = 0;
> > +    int i, ret = 0;
> >  
> >      for (i = 0; i < s->total; i++) {
> >          qemu_vfree(acb->aios[i].buf);
> > @@ -92,6 +134,10 @@ static void quorum_aio_finalize(QuorumAIOCB *acb)
> >          acb->aios[i].ret = 0;
> >      }
> >  
> > +    if (acb->vote_ret) {
> > +        ret = acb->vote_ret;
> > +    }
> > +
> >      acb->common.cb(acb->common.opaque, ret);
> >      if (acb->finished) {
> >          *acb->finished = true;
> > @@ -103,6 +149,27 @@ static void quorum_aio_finalize(QuorumAIOCB *acb)
> >      qemu_aio_release(acb);
> >  }
> >  
> > +static int quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
> > +{
> > +    return memcmp(a->h, b->h, HASH_LENGTH);
> > +}
> > +
> > +static int quorum_64bits_compare(QuorumVoteValue *a, QuorumVoteValue *b)
> > +{
> > +    int64_t i = a->l;
> > +    int64_t j = b->l;
> > +
> > +    if (i < j) {
> > +        return -1;
> > +    }
> > +
> > +    if (i > j) {
> > +        return 1;
> > +    }
> > +
> > +    return 0;
> > +}
> 
> The usual way to implement this is 'return a->l - b->l;', because if you
> expect memcmp() to return a valid value for the compare function you
> can't assume that it's normalised to -1/0/1 anyway.
> 
> As you only ever use the result as a bool, you could alternatively
> even declare the function as such and do 'return a->l != b->l;'.
> 
> >  static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
> >                                     BlockDriverState *bs,
> >                                     QEMUIOVector *qiov,
> > @@ -122,6 +189,7 @@ static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
> >      acb->count = 0;
> >      acb->success_count = 0;
> >      acb->finished = NULL;
> > +    acb->votes.compare = quorum_sha256_compare;
> >      acb->is_read = false;
> >      acb->vote_ret = 0;
> >  
> 
> You need to initialise votes.vote_list as well.
> 
> > @@ -151,9 +219,323 @@ static void quorum_aio_cb(void *opaque, int ret)
> >          return;
> >      }
> >  
> > +    /* Do the vote on read */
> > +    if (acb->is_read) {
> > +        quorum_vote(acb);
> > +    }
> > +
> >      quorum_aio_finalize(acb);
> >  }
> >  
> > +static void quorum_report_bad(QuorumAIOCB *acb, char *node_name)
> > +{
> > +    QObject *data;
> > +    data = qobject_from_jsonf("{ 'node-name': \"%s\""
> > +                              ", 'sector-num': %" PRId64
> > +                              ", 'sectors-count': %i }",
> > +                              node_name,
> 
> Can't node_name be NULL here?

No node_name is a member of BlockDriverState so the only thing that could happen
is bs->node_name[0] == '\0'
Yet I will add a security.

> 
> > +                              acb->sector_num,
> > +                              acb->nb_sectors);
> > +    monitor_protocol_event(QEVENT_QUORUM_REPORT_BAD, data);
> > +    qobject_decref(data);
> > +}
> > +
> > +static void quorum_report_failure(QuorumAIOCB *acb)
> > +{
> > +    QObject *data;
> > +    data = qobject_from_jsonf("{ 'sector-num': %" PRId64
> > +                              ", 'sectors-count': %i }",
> > +                              acb->sector_num,
> > +                              acb->nb_sectors);
> 
> If I have multiple quorum devices, this event doesn't tell me, which one
> it is about.
> 
> > +    monitor_protocol_event(QEVENT_QUORUM_FAILURE, data);
> > +    qobject_decref(data);
> > +}
> > +
> > +static void quorum_report_bad_versions(BDRVQuorumState *s,
> > +                                       QuorumAIOCB *acb,
> > +                                       QuorumVoteValue *value)
> > +{
> > +    QuorumVoteVersion *version;
> > +    QuorumVoteItem *item;
> > +
> > +    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
> > +        if (!acb->votes.compare(&version->value, value)) {
> > +            continue;
> > +        }
> > +        QLIST_FOREACH(item, &version->items, next) {
> > +            quorum_report_bad(acb, s->bs[item->index]->node_name);
> > +        }
> > +    }
> > +}
> > +
> > +static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
> > +{
> > +    int i;
> > +    assert(dest->niov == source->niov);
> > +    assert(dest->size == source->size);
> > +    for (i = 0; i < source->niov; i++) {
> > +        assert(dest->iov[i].iov_len == source->iov[i].iov_len);
> > +        memcpy(dest->iov[i].iov_base,
> > +               source->iov[i].iov_base,
> > +               source->iov[i].iov_len);
> > +    }
> > +}
> > +
> > +static void quorum_count_vote(QuorumVotes *votes,
> > +                              QuorumVoteValue *value,
> > +                              int index)
> > +{
> > +    QuorumVoteVersion *v = NULL, *version = NULL;
> > +    QuorumVoteItem *item;
> > +
> > +    /* look if we have something with this hash */
> > +    QLIST_FOREACH(v, &votes->vote_list, next) {
> > +        if (!votes->compare(&v->value, value)) {
> > +            version = v;
> > +            break;
> > +        }
> > +    }
> > +
> > +    /* It's a version not yet in the list add it */
> > +    if (!version) {
> > +        version = g_new0(QuorumVoteVersion, 1);
> > +        QLIST_INIT(&version->items);
> > +        memcpy(&version->value, value, sizeof(version->value));
> > +        version->index = index;
> > +        version->vote_count = 0;
> > +        QLIST_INSERT_HEAD(&votes->vote_list, version, next);
> > +    }
> > +
> > +    version->vote_count++;
> > +
> > +    item = g_new0(QuorumVoteItem, 1);
> > +    item->index = index;
> > +    QLIST_INSERT_HEAD(&version->items, item, next);
> > +}
> > +
> > +static void quorum_free_vote_list(QuorumVotes *votes)
> > +{
> > +    QuorumVoteVersion *version, *next_version;
> > +    QuorumVoteItem *item, *next_item;
> > +
> > +    QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) {
> > +        QLIST_REMOVE(version, next);
> > +        QLIST_FOREACH_SAFE(item, &version->items, next, next_item) {
> > +            QLIST_REMOVE(item, next);
> > +            g_free(item);
> > +        }
> > +        g_free(version);
> > +    }
> > +}
> > +
> > +static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash)
> > +{
> > +    int j, ret;
> > +    gnutls_hash_hd_t dig;
> > +    QEMUIOVector *qiov = &acb->aios[i].qiov;
> > +
> > +    ret = gnutls_hash_init(&dig, GNUTLS_DIG_SHA256);
> > +
> > +    if (ret < 0) {
> > +        return ret;
> > +    }
> > +
> > +    for (j = 0; j < qiov->niov; j++) {
> > +        ret = gnutls_hash(dig, qiov->iov[j].iov_base, qiov->iov[j].iov_len);
> > +        if (ret < 0) {
> > +            break;
> > +        }
> > +    }
> > +
> > +    gnutls_hash_deinit(dig, (void *) hash);
> > +    return ret;
> > +}
> > +
> > +static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes)
> > +{
> > +    int i = 0;
> 
> I like obvious variable names. This must be a loop counter.
> 
> > +    QuorumVoteVersion *candidate, *winner = NULL;
> > +
> > +    QLIST_FOREACH(candidate, &votes->vote_list, next) {
> > +        if (candidate->vote_count > i) {
> > +            i = candidate->vote_count;
> 
> Wait, what? This doesn't quite look like a loop.
> 
> > +            winner = candidate;
> > +        }
> > +    }
> > +
> > +    return winner;
> > +}
> > +
> > +static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
> > +{
> > +    int i;
> > +    int result;
> > +
> > +    assert(a->niov == b->niov);
> > +    for (i = 0; i < a->niov; i++) {
> > +        assert(a->iov[i].iov_len == b->iov[i].iov_len);
> > +        result = memcmp(a->iov[i].iov_base,
> > +                        b->iov[i].iov_base,
> > +                        a->iov[i].iov_len);
> > +        if (result) {
> > +            return false;
> > +        }
> > +    }
> > +
> > +    return true;
> > +}
> 
> I thought we introduced qemu_iovec_compare() earlier in this series to
> do exactly this, except more generically?
> 
> I see that you call one or the other depending on whether we're running
> in blkverify mode, but what is the difference in the semantics? Either
> both are the same and there is no reason to have both, or one of them
> must have non-obvious semantics and lacks proper documentation.
> 
> > +static void GCC_FMT_ATTR(2, 3) quorum_err(QuorumAIOCB *acb,
> > +                                          const char *fmt, ...)
> > +{
> > +    va_list ap;
> > +
> > +    va_start(ap, fmt);
> > +    fprintf(stderr, "quorum: sector_num=%" PRId64 " nb_sectors=%d ",
> > +            acb->sector_num, acb->nb_sectors);
> > +    vfprintf(stderr, fmt, ap);
> > +    fprintf(stderr, "\n");
> > +    va_end(ap);
> > +    exit(1);
> > +}
> > +
> > +static bool quorum_compare(QuorumAIOCB *acb,
> > +                           QEMUIOVector *a,
> > +                           QEMUIOVector *b)
> > +{
> > +    BDRVQuorumState *s = acb->bqs;
> > +    ssize_t offset;
> > +
> > +    /* This driver will replace blkverify in this particular case */
> > +    if (s->is_blkverify) {
> > +        offset = qemu_iovec_compare(a, b);
> > +        if (offset != -1) {
> > +            quorum_err(acb, "contents mismatch in sector %" PRId64,
> > +                       acb->sector_num +
> > +                       (uint64_t)(offset / BDRV_SECTOR_SIZE));
> > +        }
> > +        return true;
> > +    }
> > +
> > +    return quorum_iovec_compare(a, b);
> > +}
> > +
> > +/* Do a vote to get the error code */
> > +static int quorum_vote_error(QuorumAIOCB *acb)
> > +{
> > +    BDRVQuorumState *s = acb->bqs;
> > +    QuorumVoteVersion *winner = NULL;
> > +    QuorumVotes error_votes;
> > +    QuorumVoteValue result_value;
> > +    int i, ret = 0;
> > +    bool error = false;
> > +
> > +    QLIST_INIT(&error_votes.vote_list);
> > +    error_votes.compare = quorum_64bits_compare;
> > +
> > +    for (i = 0; i < s->total; i++) {
> > +        ret = acb->aios[i].ret;
> > +        if (ret) {
> > +            error = true;
> > +            result_value.l = ret;
> > +            quorum_count_vote(&error_votes, &result_value, i);
> > +        }
> > +    }
> > +
> > +    if (error) {
> > +        winner = quorum_get_vote_winner(&error_votes);
> > +        ret = winner->value.l;
> > +    }
> > +
> > +    quorum_free_vote_list(&error_votes);
> > +
> > +    return ret;
> > +}
> > +
> > +static void quorum_vote(QuorumAIOCB *acb)
> > +{
> > +    bool quorum = false;
> > +    int i, j, ret;
> > +    QuorumVoteValue hash;
> > +    BDRVQuorumState *s = acb->bqs;
> > +    QuorumVoteVersion *winner;
> > +
> > +    QLIST_INIT(&acb->votes.vote_list);
> > +
> > +    /* if we don't get enough successful read use the first error code */
> > +    if (acb->success_count < s->threshold) {
> > +        acb->vote_ret = quorum_vote_error(acb);
> > +        quorum_report_failure(acb);
> > +        return;
> > +    }
> > +
> > +    /* get the index of the first successful read (we are sure to find one) */
> > +    for (i = 0; i < s->total; i++) {
> > +        if (!acb->aios[i].ret) {
> > +            break;
> > +        }
> > +    }
> 
> "we are sure to find one" is spelt "assert(i < s->total);"
> 
> > +
> > +    /* compare this read with all other successful read looking for quorum */
> > +    for (j = i + 1; j < s->total; j++) {
> > +        if (acb->aios[j].ret) {
> > +            continue;
> > +        }
> > +        quorum = quorum_compare(acb, &acb->aios[i].qiov, &acb->aios[j].qiov);
> > +        if (!quorum) {
> > +            break;
> > +       }
> > +    }
> > +
> > +    /* Every successful read agrees and their count is higher or equal threshold
> > +     * -> Quorum
> > +     */
> > +    if (quorum && acb->success_count >= s->threshold) {
> > +        quorum_copy_qiov(acb->qiov, &acb->aios[i].qiov);
> > +        return;
> > +    }
> 
> For threshold == success_count == 1, the condition in the comment is
> fulfilled, but the one in the code isn't.
> 
> > +
> > +    /* compute hashs for each successful read, also store indexes */
> > +    for (i = 0; i < s->total; i++) {
> > +        if (acb->aios[i].ret) {
> > +            continue;
> > +        }
> > +        ret = quorum_compute_hash(acb, i, &hash);
> > +        /* if ever the hash computation failed */
> > +        if (ret < 0) {
> > +            acb->vote_ret = ret;
> > +            goto free_exit;
> > +        }
> > +        quorum_count_vote(&acb->votes, &hash, i);
> > +    }
> > +
> > +    /* vote to select the most represented version */
> > +    winner = quorum_get_vote_winner(&acb->votes);
> > +    /* every vote version are differents -> error */
> > +    if (!winner) {
> 
> Can this happen? This means that there was no vote at all.
> 
> > +        quorum_report_failure(acb);
> > +        acb->vote_ret = -EIO;
> > +        goto free_exit;
> > +    }
> > +
> > +    /* if the winner count is smaller than threshold the read fails */
> > +    if (winner->vote_count < s->threshold) {
> > +        quorum_report_failure(acb);
> > +        acb->vote_ret = -EIO;
> > +        goto free_exit;
> > +    }
> > +
> > +    /* we have a winner: copy it */
> > +    quorum_copy_qiov(acb->qiov, &acb->aios[winner->index].qiov);
> > +
> > +    /* some versions are bad print them */
> > +    quorum_report_bad_versions(s, acb, &winner->value);
> > +
> > +free_exit:
> > +    /* free lists */
> > +    quorum_free_vote_list(&acb->votes);
> > +}
> > +
> >  static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
> >                                           int64_t sector_num,
> >                                           QEMUIOVector *qiov,
> > @@ -175,7 +557,7 @@ static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
> >      }
> >  
> >      for (i = 0; i < s->total; i++) {
> > -        bdrv_aio_readv(s->bs[i], sector_num, qiov, nb_sectors,
> > +        bdrv_aio_readv(s->bs[i], sector_num, &acb->aios[i].qiov, nb_sectors,
> >                         quorum_aio_cb, &acb->aios[i]);
> >      }
> 
> Why don't you do this from the beginning?
> 
> Kevin

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH V15 12/13] quorum: Add quorum_open() and quorum_close().
  2014-02-04 16:08   ` Kevin Wolf
@ 2014-02-05 16:11     ` Benoît Canet
  0 siblings, 0 replies; 29+ messages in thread
From: Benoît Canet @ 2014-02-05 16:11 UTC (permalink / raw)
  To: Kevin Wolf; +Cc: Benoît Canet, qemu-devel, stefanha, mreitz

Le Tuesday 04 Feb 2014 à 17:08:07 (+0100), Kevin Wolf a écrit :
> Am 03.02.2014 um 22:51 hat Benoît Canet geschrieben:
> > From: Benoît Canet <benoit@irqsave.net>
> > 
> > Example of command line:
> > -drive if=virtio,file.driver=quorum,\
> > file.children.0.file.filename=1.raw,\
> > file.children.0.node-name=1.raw,\
> > file.children.0.driver=raw,\
> > file.children.1.file.filename=2.raw,\
> > file.children.1.node-name=2.raw,\
> > file.children.1.driver=raw,\
> > file.children.2.file.filename=3.raw,\
> > file.children.2.node-name=3.raw,\
> > file.children.2.driver=raw,\
> > file.vote_threshold=2
> > 
> > file.blkverify=on with file.vote_threshold=2 and two files can be passed to
> > emulated blkverify.
> > 
> > Signed-off-by: Benoit Canet <benoit@irqsave.net>
> > ---
> >  block/quorum.c   | 171 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
> >  qapi-schema.json |  21 ++++++-
> >  2 files changed, 191 insertions(+), 1 deletion(-)
> > 
> > diff --git a/block/quorum.c b/block/quorum.c
> > index 1e683f8..d2bea29 100644
> > --- a/block/quorum.c
> > +++ b/block/quorum.c
> > @@ -17,8 +17,12 @@
> >  #include <gnutls/crypto.h>
> >  #include "block/block_int.h"
> >  #include "qapi/qmp/qjson.h"
> > +#include "qapi/qmp/types.h"
> > +#include "qemu-common.h"
> >  
> >  #define HASH_LENGTH 32
> > +#define KEY_PREFIX "children."
> > +#define KEY_FILENAME_SUFFIX ".file.filename"
> >  
> >  /* This union holds a vote hash value */
> >  typedef union QuorumVoteValue {
> > @@ -712,12 +716,179 @@ static bool quorum_recurse_is_first_non_filter(BlockDriverState *bs,
> >      return false;
> >  }
> >  
> > +static int quorum_valid_threshold(int threshold,
> > +                                  int total,
> > +                                  Error **errp)
> > +{
> > +
> > +    if (threshold < 1) {
> > +        error_set(errp, QERR_INVALID_PARAMETER_VALUE,
> > +                  "vote-threshold", "value >= 1");
> > +        return -ERANGE;
> > +    }
> > +
> > +    if (threshold > total) {
> > +        error_setg(errp, "threshold may not exceed children count");
> > +        return -ERANGE;
> > +    }
> > +
> > +    return 0;
> > +}
> > +
> > +static int quorum_open(BlockDriverState *bs,
> > +                       QDict *options,
> > +                       int flags,
> > +                       Error **errp)
> > +{
> > +    BDRVQuorumState *s = bs->opaque;
> > +    Error *local_err = NULL;
> > +    bool *opened;
> > +    QDict *sub = NULL;
> > +    QList *list = NULL;
> > +    const QListEntry *lentry;
> > +    const QDictEntry *dentry;
> > +    const char *value;
> > +    char *next;
> > +    int i;
> > +    int ret = 0;
> > +    unsigned long long threshold = 0;
> > +
> > +    qdict_flatten(options);
> > +    qdict_extract_subqdict(options, &sub, "children.");
> > +    qdict_array_split(sub, &list);
> > +
> > +    /* count how many different children are present and validate */
> > +    s->total = !qlist_size(list) ? qdict_size(sub) : qlist_size(list);
> 
> Which case does qdict_size(sub) address?
> 
> > +    if (s->total < 2) {
> > +        error_setg(&local_err,
> > +                   "Number of provided children must be greater than 1");
> > +        ret = -EINVAL;
> > +        goto exit;
> > +    }
> > +
> > +    ret = qdict_get_try_int(options, "vote-threshold", -1);
> > +    /* from QMP */
> > +    if (ret != -1) {
> > +        qdict_del(options, "vote-threshold");
> > +        s->threshold = ret;
> > +    /* from command line */
> > +    } else {
> > +        /* retrieve the threshold option from the command line */
> > +        value = qdict_get_try_str(options, "vote_threshold");
> > +        if (!value) {
> > +            error_setg(&local_err,
> > +                       "vote_threshold must be provided");
> > +            ret = -EINVAL;
> > +            goto exit;
> > +        }
> > +        qdict_del(options, "vote_threshold");
> > +
> > +        ret = parse_uint(value, &threshold, &next, 10);
> > +
> > +        /* no int found -> scan fail */
> > +        if (ret < 0) {
> > +            error_setg(&local_err,
> > +                       "invalid vote_threshold specified");
> > +            ret = -EINVAL;
> > +            goto exit;
> > +        }
> > +        s->threshold = threshold;
> > +    }
> 
> This part looks seriously wrong. I think you should consider using an
> QemuOpts like other drivers do (have a look at qcow2, for example), that
> should parse the integer for you.
> 
> > +    /* and validate it against s->total */
> > +    ret = quorum_valid_threshold(s->threshold, s->total, &local_err);
> > +    if (ret < 0) {
> > +        goto exit;
> > +    }
> > +
> > +    /* is the driver in blkverify mode */
> > +    value = qdict_get_try_str(options, "blkverify");
> > +    if (value && !strcmp(value, "on")  &&
> > +        s->total == 2 && s->threshold == 2) {
> > +        s->is_blkverify = true;
> > +    } else if (value && strcmp(value, "off")) {
> > +        fprintf(stderr, "blkverify mode is set by setting blkverify=on "
> > +                        "and using two files with vote_threshold=2\n");
> > +    }
> > +    qdict_del(options, "blkverify");
> 
> And the QemuOpts would also know how to parse a boolean.
> 
> > +
> > +    /* allocate the children BlockDriverState array */
> > +    s->bs = g_new0(BlockDriverState *, s->total);
> > +    opened = g_new0(bool, s->total);
> > +
> > +    /* Open by file name or options dict (command line or QMP) */
> > +    if (s->total == qlist_size(list)) {
> > +        for (i = 0, lentry = qlist_first(list); lentry;
> > +            lentry = qlist_next(lentry), i++) {
> > +            ret = bdrv_open(&s->bs[i], NULL, NULL,
> > +                            qobject_to_qdict(lentry->value),
> > +                            flags, NULL, &local_err);
> > +            if (ret < 0) {
> > +                goto close_exit;
> > +            }
> > +            opened[i] = true;
> > +        }
> > +    /* Open by QMP references */
> > +    } else {
> > +        for (i = 0, dentry = qdict_first(sub); dentry;
> > +             dentry = qdict_next(sub, dentry), i++) {
> > +            ret = bdrv_open(&s->bs[i], NULL,
> > +                            qstring_get_str(qobject_to_qstring(dentry->value)),
> > +                            NULL, flags, NULL, &local_err);
> > +            if (ret < 0) {
> > +                goto close_exit;
> > +            }
> > +            opened[i] = true;
> > +        }
> > +    }
> 
> What does the reference case look like? (Should work on the command
> line, too) I imagine something like this:
> 
>     -drive if=virtio,file.driver=quorum,\
>     file.children.0=image0,\
>     file.children.1=image1,\
>     file.children.2=image2,\
>     file.vote_threshold=2
> 
> Doesn't qdict_array_split() split this into a list of strings? The whole
> thing with directly accessing QDict entries for references confuses me.

No references are left into sub.
Also they are string and this means that even if they ended up in list the
code would have to discriminate on object types. It would not be much cleanner.

Best regards

Benoît

> 
> > +    g_free(opened);
> > +    goto exit;
> > +
> > +close_exit:
> > +    /* cleanup on error */
> > +    for (i = 0; i < s->total; i++) {
> > +        if (!opened[i]) {
> > +            continue;
> > +        }
> > +        bdrv_unref(s->bs[i]);
> > +    }
> > +    g_free(s->bs);
> > +    g_free(opened);
> > +exit:
> > +    /* propagate error */
> > +    if (error_is_set(&local_err)) {
> > +        error_propagate(errp, local_err);
> > +    }
> > +    QDECREF(sub);
> > +    QDECREF(list);
> > +    return ret;
> > +}
> > +
> > +static void quorum_close(BlockDriverState *bs)
> > +{
> > +    BDRVQuorumState *s = bs->opaque;
> > +    int i;
> > +
> > +    for (i = 0; i < s->total; i++) {
> > +        /* Ensure writes reach stable storage */
> > +        bdrv_flush(s->bs[i]);
> 
> Not necessary, block.c code will flush for us.
> 
> > +        /* close manually because we'll free s->bs */
> > +        bdrv_unref(s->bs[i]);
> > +    }
> > +
> > +    g_free(s->bs);
> > +}
> 
> Kevin
> 

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH V15 09/13] quorum: Add quorum_co_get_block_status.
  2014-02-04 15:49   ` Kevin Wolf
@ 2014-02-05 16:28     ` Benoît Canet
  0 siblings, 0 replies; 29+ messages in thread
From: Benoît Canet @ 2014-02-05 16:28 UTC (permalink / raw)
  To: Kevin Wolf; +Cc: Benoît Canet, qemu-devel, stefanha, mreitz

Le Tuesday 04 Feb 2014 à 16:49:22 (+0100), Kevin Wolf a écrit :
> Am 03.02.2014 um 22:51 hat Benoît Canet geschrieben:
> > From: Benoît Canet <benoit@irqsave.net>
> > 
> > Signed-off-by: Benoit Canet <benoit@irqsave.net>
> > Reviewed-by: Max Reitz <mreitz@redhat.com>
> > ---
> >  block/quorum.c | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++
> >  1 file changed, 51 insertions(+)
> > 
> > diff --git a/block/quorum.c b/block/quorum.c
> > index cef4424..677a96d 100644
> > --- a/block/quorum.c
> > +++ b/block/quorum.c
> > @@ -619,6 +619,56 @@ static void quorum_invalidate_cache(BlockDriverState *bs)
> >      }
> >  }
> >  
> > +static int64_t coroutine_fn quorum_co_get_block_status(BlockDriverState *bs,
> > +                                                       int64_t sector_num,
> > +                                                       int nb_sectors,
> > +                                                       int *pnum)
> > +{
> > +    BDRVQuorumState *s = bs->opaque;
> > +    QuorumVoteVersion *winner = NULL;
> > +    QuorumVotes result_votes, num_votes;
> > +    QuorumVoteValue result_value, num_value;
> > +    int i, num;
> > +    int64_t result = 0;
> > +
> > +    QLIST_INIT(&result_votes.vote_list);
> > +    QLIST_INIT(&num_votes.vote_list);
> > +    result_votes.compare = quorum_64bits_compare;
> > +    num_votes.compare = quorum_64bits_compare;
> > +
> > +    for (i = 0; i < s->total; i++) {
> > +        result = bdrv_get_block_status(s->bs[i], sector_num, nb_sectors, &num);
> > +        /* skip failed requests */
> > +        if (result < 0) {
> > +            continue;
> > +        }
> > +        result_value.l = result & BDRV_BLOCK_DATA;
> > +        num_value.l = num;
> > +        quorum_count_vote(&result_votes, &result_value, i);
> > +        quorum_count_vote(&num_votes, &num_value, i);
> > +    }
> 
> This doesn't work. bdrv_get_block_status() doesn't guarantee that it
> returns all consecutive blocks with the same status. You need to call it
> in a loop here, or change bdrv_get_block_status() so that it loops
> itself.

I don't see what can be done with the results generated by the loop.
Does quorum really need this function ? 

Best regards

Benoît

> 
> > +    winner = quorum_get_vote_winner(&result_votes);
> > +    if (winner->vote_count < s->threshold) {
> > +        result = -EIO;
> > +        goto free_exit;
> > +    }
> > +    result = winner->value.l;
> > +
> > +    winner = quorum_get_vote_winner(&num_votes);
> > +    if (winner->vote_count < s->threshold) {
> > +        result = -EIO;
> > +        goto free_exit;
> > +    }
> > +    *pnum = winner->value.l;
> 
> You can take the status from one group of devices and the number of
> blocks that share this state from another group?!
> 
> > +
> > +free_exit:
> > +    quorum_free_vote_list(&result_votes);
> > +    quorum_free_vote_list(&num_votes);
> > +
> > +    return result;
> > +}
> > +
> >  static BlockDriver bdrv_quorum = {
> >      .format_name        = "quorum",
> >      .protocol_name      = "quorum",
> > @@ -630,6 +680,7 @@ static BlockDriver bdrv_quorum = {
> >      .bdrv_aio_readv     = quorum_aio_readv,
> >      .bdrv_aio_writev    = quorum_aio_writev,
> >      .bdrv_invalidate_cache = quorum_invalidate_cache,
> > +    .bdrv_co_get_block_status = quorum_co_get_block_status,
> >  };
> >  
> >  static void bdrv_quorum_init(void)
> 
> Kevin

^ permalink raw reply	[flat|nested] 29+ messages in thread

end of thread, other threads:[~2014-02-05 16:28 UTC | newest]

Thread overview: 29+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2014-02-03 21:51 [Qemu-devel] [PATCH V15 00/13] Quorum block filter Benoît Canet
2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 01/13] quorum: Create quorum.c, add QuorumSingleAIOCB and QuorumAIOCB Benoît Canet
2014-02-04 12:57   ` Kevin Wolf
2014-02-05 13:29     ` Benoît Canet
2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 02/13] quorum: Create BDRVQuorumState and BlkDriver and do init Benoît Canet
2014-02-04 13:08   ` Kevin Wolf
2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 03/13] quorum: Add quorum_aio_writev and its dependencies Benoît Canet
2014-02-04 13:57   ` Kevin Wolf
2014-02-05 14:14     ` Benoît Canet
2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 04/13] blkverify: Extract qemu_iovec_clone() and qemu_iovec_compare() from blkverify Benoît Canet
2014-02-04 14:04   ` Kevin Wolf
2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 05/13] quorum: Add quorum_aio_readv Benoît Canet
2014-02-04 14:15   ` Kevin Wolf
2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 06/13] quorum: Add quorum mechanism Benoît Canet
2014-02-04 15:40   ` Kevin Wolf
2014-02-05 15:14     ` Benoît Canet
2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 07/13] quorum: Add quorum_getlength() Benoît Canet
2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 08/13] quorum: Add quorum_invalidate_cache() Benoît Canet
2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 09/13] quorum: Add quorum_co_get_block_status Benoît Canet
2014-02-04 15:49   ` Kevin Wolf
2014-02-05 16:28     ` Benoît Canet
2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 10/13] quorum: Add quorum_co_flush() Benoît Canet
2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 11/13] quorum: Implement recursive .bdrv_recurse_is_first_non_filter in quorum Benoît Canet
2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 12/13] quorum: Add quorum_open() and quorum_close() Benoît Canet
2014-02-04 16:08   ` Kevin Wolf
2014-02-05 16:11     ` Benoît Canet
2014-02-03 21:51 ` [Qemu-devel] [PATCH V15 13/13] quorum: Add unit test Benoît Canet
2014-02-04 16:13   ` Kevin Wolf
2014-02-04 21:53     ` Benoît Canet

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.