On Thu, Jun 18, 2015 at 04:49:20PM +0800, Wen Congyang wrote: CCing Alberto Garcia for the quorum block driver. > If the child is not ready, read/write/getlength/flush will > return -errno. It is not critical error, and can be ignored: > 1. read/write: > Just not report the error event. > 2. getlength: > just ignore it. If all children's getlength return -errno, > and be ignored, return -EIO. > 3. flush: > Just ignore it. If all children's getlength return -errno, > and be ignored, return 0. > > Usage: children.x.ignore-errors=true > > Signed-off-by: Wen Congyang > Signed-off-by: zhanghailiang > Signed-off-by: Gonglei > --- > block/quorum.c | 84 +++++++++++++++++++++++++++++++++++++++++++++++++++++----- > 1 file changed, 77 insertions(+), 7 deletions(-) > > diff --git a/block/quorum.c b/block/quorum.c > index 01cfac0..c5dbb69 100644 > --- a/block/quorum.c > +++ b/block/quorum.c > @@ -30,6 +30,7 @@ > #define QUORUM_OPT_BLKVERIFY "blkverify" > #define QUORUM_OPT_REWRITE "rewrite-corrupted" > #define QUORUM_OPT_READ_PATTERN "read-pattern" > +#define QUORUM_CHILDREN_OPT_IGNORE_ERRORS "ignore-errors" > > /* This union holds a vote hash value */ > typedef union QuorumVoteValue { > @@ -65,6 +66,7 @@ typedef struct QuorumVotes { > /* the following structure holds the state of one quorum instance */ > typedef struct BDRVQuorumState { > BlockDriverState **bs; /* children BlockDriverStates */ > + bool *ignore_errors; /* ignore children's error? */ > int num_children; /* children count */ > int threshold; /* if less than threshold children reads gave the > * same result a quorum error occurs. > @@ -99,6 +101,7 @@ typedef struct QuorumChildRequest { > uint8_t *buf; > int ret; > QuorumAIOCB *parent; > + int index; > } QuorumChildRequest; > > /* Quorum will use the following structure to track progress of each read/write > @@ -211,6 +214,7 @@ static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s, > acb->qcrs[i].buf = NULL; > acb->qcrs[i].ret = 0; > acb->qcrs[i].parent = acb; > + acb->qcrs[i].index = i; > } > > return acb; > @@ -304,7 +308,7 @@ static void quorum_aio_cb(void *opaque, int ret) > acb->count++; > if (ret == 0) { > acb->success_count++; > - } else { > + } else if (!s->ignore_errors[sacb->index]) { > quorum_report_bad(acb, sacb->aiocb->bs->node_name, ret); > } > assert(acb->count <= s->num_children); > @@ -719,19 +723,31 @@ static BlockAIOCB *quorum_aio_writev(BlockDriverState *bs, > static int64_t quorum_getlength(BlockDriverState *bs) > { > BDRVQuorumState *s = bs->opaque; > - int64_t result; > + int64_t result = -EIO; > int i; > > /* check that all file have the same length */ > - result = bdrv_getlength(s->bs[0]); > - if (result < 0) { > - return result; > - } > - for (i = 1; i < s->num_children; i++) { > + for (i = 0; i < s->num_children; i++) { > int64_t value = bdrv_getlength(s->bs[i]); > + > if (value < 0) { > return value; > } > + > + if (value == 0 && s->ignore_errors[i]) { > + /* > + * If the child is not ready, it cannot return -errno, > + * otherwise refresh_total_sectors() will fail when > + * we open the child. > + */ > + continue; > + } > + > + if (result == -EIO) { > + result = value; > + continue; > + } > + > if (value != result) { > return -EIO; > } > @@ -769,6 +785,9 @@ static coroutine_fn int quorum_co_flush(BlockDriverState *bs) > > for (i = 0; i < s->num_children; i++) { > result = bdrv_co_flush(s->bs[i]); > + if (result < 0 && s->ignore_errors[i]) { > + result = 0; > + } > result_value.l = result; > quorum_count_vote(&error_votes, &result_value, i); > } > @@ -843,6 +862,19 @@ static QemuOptsList quorum_runtime_opts = { > }, > }; > > +static QemuOptsList quorum_children_common_opts = { > + .name = "quorum children", > + .head = QTAILQ_HEAD_INITIALIZER(quorum_children_common_opts.head), > + .desc = { > + { > + .name = QUORUM_CHILDREN_OPT_IGNORE_ERRORS, > + .type = QEMU_OPT_BOOL, > + .help = "ignore child I/O error", > + }, > + { /* end of list */ } > + }, > +}; > + > static int parse_read_pattern(const char *opt) > { > int i; > @@ -861,6 +893,37 @@ static int parse_read_pattern(const char *opt) > return -EINVAL; > } > > +static int parse_children_options(BDRVQuorumState *s, QDict *options, > + const char *indexstr, int index, > + Error **errp) > +{ > + QemuOpts *children_opts = NULL; > + Error *local_err = NULL; > + int ret = 0; > + bool value; > + > + children_opts = qemu_opts_create(&quorum_children_common_opts, NULL, 0, > + &error_abort); > + qemu_opts_absorb_qdict_by_index(children_opts, options, indexstr, > + &local_err); > + if (local_err) { > + ret = -EINVAL; > + goto out; > + } > + > + value = qemu_opt_get_bool(children_opts, QUORUM_CHILDREN_OPT_IGNORE_ERRORS, > + false); > + s->ignore_errors[index] = value; > + > +out: > + qemu_opts_del(children_opts); > + /* propagate error */ > + if (local_err) { > + error_propagate(errp, local_err); > + } > + return ret; > +} > + > static int quorum_open(BlockDriverState *bs, QDict *options, int flags, > Error **errp) > { > @@ -931,12 +994,18 @@ static int quorum_open(BlockDriverState *bs, QDict *options, int flags, > /* allocate the children BlockDriverState array */ > s->bs = g_new0(BlockDriverState *, s->num_children); > opened = g_new0(bool, s->num_children); > + s->ignore_errors = g_new0(bool, s->num_children); > > for (i = 0; i < s->num_children; i++) { > char indexstr[32]; > ret = snprintf(indexstr, 32, "children.%d", i); > assert(ret < 32); > > + ret = parse_children_options(s, options, indexstr, i, &local_err); > + if (ret < 0) { > + goto close_exit; > + } > + > ret = bdrv_open_image(&s->bs[i], NULL, options, indexstr, bs, > &child_format, false, &local_err); > if (ret < 0) { > @@ -979,6 +1048,7 @@ static void quorum_close(BlockDriverState *bs) > } > > g_free(s->bs); > + g_free(s->ignore_errors); > } > > static void quorum_detach_aio_context(BlockDriverState *bs) > -- > 2.4.3 > >