On 2017-09-18 08:02, Fam Zheng wrote: > On Wed, 09/13 20:18, Max Reitz wrote: >> In order to talk to the source BDS (and maybe in the future to the >> target BDS as well) directly, we need to convert our existing AIO >> requests into coroutine I/O requests. >> >> Signed-off-by: Max Reitz >> --- >> block/mirror.c | 134 +++++++++++++++++++++++++++++++++------------------------ >> 1 file changed, 78 insertions(+), 56 deletions(-) >> >> diff --git a/block/mirror.c b/block/mirror.c >> index 4664b0516f..2b3297aa61 100644 >> --- a/block/mirror.c >> +++ b/block/mirror.c >> @@ -80,6 +80,9 @@ typedef struct MirrorOp { >> QEMUIOVector qiov; >> int64_t offset; >> uint64_t bytes; >> + >> + /* Set by mirror_co_read() before yielding for the first time */ >> + uint64_t bytes_copied; >> } MirrorOp; >> >> typedef enum MirrorMethod { >> @@ -101,7 +104,7 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read, >> } >> } >> >> -static void mirror_iteration_done(MirrorOp *op, int ret) >> +static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret) >> { >> MirrorBlockJob *s = op->s; >> struct iovec *iov; >> @@ -138,9 +141,8 @@ static void mirror_iteration_done(MirrorOp *op, int ret) >> } >> } >> >> -static void mirror_write_complete(void *opaque, int ret) >> +static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret) >> { >> - MirrorOp *op = opaque; >> MirrorBlockJob *s = op->s; >> >> aio_context_acquire(blk_get_aio_context(s->common.blk)); >> @@ -158,9 +160,8 @@ static void mirror_write_complete(void *opaque, int ret) >> aio_context_release(blk_get_aio_context(s->common.blk)); >> } >> >> -static void mirror_read_complete(void *opaque, int ret) >> +static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret) >> { >> - MirrorOp *op = opaque; >> MirrorBlockJob *s = op->s; >> >> aio_context_acquire(blk_get_aio_context(s->common.blk)); >> @@ -176,8 +177,11 @@ static void mirror_read_complete(void *opaque, int ret) >> >> mirror_iteration_done(op, ret); >> } else { >> - blk_aio_pwritev(s->target, op->offset, &op->qiov, >> - 0, mirror_write_complete, op); >> + int ret; >> + >> + ret = blk_co_pwritev(s->target, op->offset, >> + op->qiov.size, &op->qiov, 0); >> + mirror_write_complete(op, ret); >> } >> aio_context_release(blk_get_aio_context(s->common.blk)); >> } >> @@ -242,53 +246,49 @@ static inline void mirror_wait_for_io(MirrorBlockJob *s) >> * (new_end - offset) if tail is rounded up or down due to >> * alignment or buffer limit. >> */ >> -static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset, >> - uint64_t bytes) >> +static void coroutine_fn mirror_co_read(void *opaque) >> { >> + MirrorOp *op = opaque; >> + MirrorBlockJob *s = op->s; >> BlockBackend *source = s->common.blk; >> int nb_chunks; >> uint64_t ret; >> - MirrorOp *op; >> uint64_t max_bytes; >> >> max_bytes = s->granularity * s->max_iov; >> >> /* We can only handle as much as buf_size at a time. */ >> - bytes = MIN(s->buf_size, MIN(max_bytes, bytes)); >> - assert(bytes); >> - assert(bytes < BDRV_REQUEST_MAX_BYTES); >> - ret = bytes; >> + op->bytes = MIN(s->buf_size, MIN(max_bytes, op->bytes)); >> + assert(op->bytes); >> + assert(op->bytes < BDRV_REQUEST_MAX_BYTES); >> + op->bytes_copied = op->bytes; >> >> if (s->cow_bitmap) { >> - ret += mirror_cow_align(s, &offset, &bytes); >> + op->bytes_copied += mirror_cow_align(s, &op->offset, &op->bytes); >> } >> - assert(bytes <= s->buf_size); >> + /* Cannot exceed BDRV_REQUEST_MAX_BYTES + INT_MAX */ >> + assert(op->bytes_copied <= UINT_MAX); >> + assert(op->bytes <= s->buf_size); >> /* The offset is granularity-aligned because: >> * 1) Caller passes in aligned values; >> * 2) mirror_cow_align is used only when target cluster is larger. */ >> - assert(QEMU_IS_ALIGNED(offset, s->granularity)); >> + assert(QEMU_IS_ALIGNED(op->offset, s->granularity)); >> /* The range is sector-aligned, since bdrv_getlength() rounds up. */ >> - assert(QEMU_IS_ALIGNED(bytes, BDRV_SECTOR_SIZE)); >> - nb_chunks = DIV_ROUND_UP(bytes, s->granularity); >> + assert(QEMU_IS_ALIGNED(op->bytes, BDRV_SECTOR_SIZE)); >> + nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity); >> >> while (s->buf_free_count < nb_chunks) { >> - trace_mirror_yield_in_flight(s, offset, s->in_flight); >> + trace_mirror_yield_in_flight(s, op->offset, s->in_flight); >> mirror_wait_for_io(s); >> } >> >> - /* Allocate a MirrorOp that is used as an AIO callback. */ >> - op = g_new(MirrorOp, 1); >> - op->s = s; >> - op->offset = offset; >> - op->bytes = bytes; >> - >> /* Now make a QEMUIOVector taking enough granularity-sized chunks >> * from s->buf_free. >> */ >> qemu_iovec_init(&op->qiov, nb_chunks); >> while (nb_chunks-- > 0) { >> MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free); >> - size_t remaining = bytes - op->qiov.size; >> + size_t remaining = op->bytes - op->qiov.size; >> >> QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next); >> s->buf_free_count--; >> @@ -297,53 +297,75 @@ static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset, >> >> /* Copy the dirty cluster. */ >> s->in_flight++; >> - s->bytes_in_flight += bytes; >> - trace_mirror_one_iteration(s, offset, bytes); >> + s->bytes_in_flight += op->bytes; >> + trace_mirror_one_iteration(s, op->offset, op->bytes); >> >> - blk_aio_preadv(source, offset, &op->qiov, 0, mirror_read_complete, op); >> - return ret; >> + ret = blk_co_preadv(source, op->offset, op->bytes, &op->qiov, 0); >> + mirror_read_complete(op, ret); >> } >> >> -static void mirror_do_zero_or_discard(MirrorBlockJob *s, >> - int64_t offset, >> - uint64_t bytes, >> - bool is_discard) >> +static void coroutine_fn mirror_co_zero(void *opaque) >> { >> - MirrorOp *op; >> + MirrorOp *op = opaque; >> + int ret; >> >> - /* Allocate a MirrorOp that is used as an AIO callback. The qiov is zeroed >> - * so the freeing in mirror_iteration_done is nop. */ >> - op = g_new0(MirrorOp, 1); >> - op->s = s; >> - op->offset = offset; >> - op->bytes = bytes; >> + op->s->in_flight++; >> + op->s->bytes_in_flight += op->bytes; >> >> - s->in_flight++; >> - s->bytes_in_flight += bytes; >> - if (is_discard) { >> - blk_aio_pdiscard(s->target, offset, >> - op->bytes, mirror_write_complete, op); >> - } else { >> - blk_aio_pwrite_zeroes(s->target, offset, >> - op->bytes, s->unmap ? BDRV_REQ_MAY_UNMAP : 0, >> - mirror_write_complete, op); >> - } >> + ret = blk_co_pwrite_zeroes(op->s->target, op->offset, op->bytes, >> + op->s->unmap ? BDRV_REQ_MAY_UNMAP : 0); >> + mirror_write_complete(op, ret); >> +} >> + >> +static void coroutine_fn mirror_co_discard(void *opaque) >> +{ >> + MirrorOp *op = opaque; >> + int ret; >> + >> + op->s->in_flight++; >> + op->s->bytes_in_flight += op->bytes; >> + >> + ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes); >> + mirror_write_complete(op, ret); >> } >> >> static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset, >> unsigned bytes, MirrorMethod mirror_method) >> { >> + MirrorOp *op; >> + Coroutine *co; >> + unsigned ret = bytes; >> + >> + op = g_new(MirrorOp, 1); >> + *op = (MirrorOp){ >> + .s = s, >> + .offset = offset, >> + .bytes = bytes, >> + }; >> + >> switch (mirror_method) { >> case MIRROR_METHOD_COPY: >> - return mirror_do_read(s, offset, bytes); >> + co = qemu_coroutine_create(mirror_co_read, op); >> + break; >> case MIRROR_METHOD_ZERO: >> + co = qemu_coroutine_create(mirror_co_zero, op); >> + break; >> case MIRROR_METHOD_DISCARD: >> - mirror_do_zero_or_discard(s, offset, bytes, >> - mirror_method == MIRROR_METHOD_DISCARD); >> - return bytes; >> + co = qemu_coroutine_create(mirror_co_discard, op); >> + break; >> default: >> abort(); >> } >> + >> + qemu_coroutine_enter(co); >> + >> + if (mirror_method == MIRROR_METHOD_COPY) { >> + /* Same assertion as in mirror_co_read() */ >> + assert(op->bytes_copied <= UINT_MAX); >> + ret = op->bytes_copied; >> + } > > This special casing is a bit ugly. Can you just make mirror_co_zero and > mirror_co_discard set op->bytes_copied too? (and perhaps rename to > op->bytes_handled) If so the comment in MirrorOp needs an update too. Sure. > And is it better to initialize it to -1 before entering coroutine, then assert > it is != -1 afterwards? Sounds good, will do. Max