From: Yehuda Sadeh Weinraub <yehuda@hq.newdream.net> To: Stefan Hajnoczi <stefanha@gmail.com> Cc: Anthony Liguori <anthony@codemonkey.ws>, Kevin Wolf <kwolf@redhat.com>, kvm@vger.kernel.org, qemu-devel@nongnu.org, Sage Weil <sage@newdream.net>, ceph-devel@vger.kernel.org, Christian Brunner <chb@muc.de> Subject: Re: [Qemu-devel] [PATCH -v5] ceph/rbd block driver for qemu-kvm Date: Sat, 9 Oct 2010 08:53:31 -0700 [thread overview] Message-ID: <AANLkTikU=X9yKCWX1k6gFUZz9CH2FFh8tUmWfrqmiFRz@mail.gmail.com> (raw) In-Reply-To: <AANLkTim35HsnQ=VJ7BAiqQtt6sZiNr6azC967ywocR7w@mail.gmail.com> On Sat, Oct 9, 2010 at 2:21 AM, Stefan Hajnoczi <stefanha@gmail.com> wrote: > On Fri, Oct 8, 2010 at 8:00 PM, Yehuda Sadeh <yehuda@hq.newdream.net> wrote: > No flush operation is supported. Can the guest be sure written data > is on stable storage when it receives completion? > That's part of the consistency that rados provides. >> +/* >> + * This aio completion is being called from rbd_aio_event_reader() and >> + * runs in qemu context. It schedules a bh, but just in case the aio >> + * was not cancelled before. > > Cancellation looks unsafe to me because acb is freed for cancel but > then accessed here! Also see my comment on aio_cancel() below. Right. Added ->cancelled field instead of releasing the acb. > >> +/* >> + * Cancel aio. Since we don't reference acb in a non qemu threads, >> + * it is safe to access it here. >> + */ >> +static void rbd_aio_cancel(BlockDriverAIOCB *blockacb) >> +{ >> + RBDAIOCB *acb = (RBDAIOCB *) blockacb; >> + qemu_bh_delete(acb->bh); >> + acb->bh = NULL; >> + qemu_aio_release(acb); > > Any pending librados completions are still running here and will then > cause acb to be accessed after they complete. If there is no safe way > to cancel then wait for the request to complete. Yeah, we'll keep the acb alive now and just set the ->cancelled. When last rados cb arrives for this acb we'll clean it up. > >> +} >> + >> +static AIOPool rbd_aio_pool = { >> + .aiocb_size = sizeof(RBDAIOCB), >> + .cancel = rbd_aio_cancel, >> +}; >> + >> +/* >> + * This is the callback function for rados_aio_read and _write >> + * >> + * Note: this function is being called from a non qemu thread so >> + * we need to be careful about what we do here. Generally we only >> + * write to the block notification pipe, and do the rest of the >> + * io completion handling from rbd_aio_event_reader() which >> + * runs in a qemu context. > > Do librados threads have all signals blocked? QEMU uses signals so it > is important that this signal not get sent to a librados thread and > discarded. I have seen this issue in the past when using threaded > libraries in QEMU. We're no longer blocking threads. We've hit that before too. > >> + */ >> +static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb) >> +{ >> + rcb->ret = rados_aio_get_return_value(c); >> + rados_aio_release(c); >> + if (write(rcb->s->fds[RBD_FD_WRITE], (void *)&rcb, sizeof(&rcb)) < 0) { > > You are writing RADOSCB* so sizeof(rcb) should be used. Oops. > >> + error_report("failed writing to acb->s->fds\n"); >> + qemu_free(rcb); >> + } >> +} >> + >> +/* Callback when all queued rados_aio requests are complete */ >> + >> +static void rbd_aio_bh_cb(void *opaque) >> +{ >> + RBDAIOCB *acb = opaque; >> + >> + if (!acb->write) { >> + qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size); >> + } >> + qemu_vfree(acb->bounce); >> + acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); >> + qemu_bh_delete(acb->bh); >> + acb->bh = NULL; >> + >> + qemu_aio_release(acb); >> +} >> + >> +static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, >> + int64_t sector_num, >> + QEMUIOVector *qiov, >> + int nb_sectors, >> + BlockDriverCompletionFunc *cb, >> + void *opaque, int write) >> +{ >> + RBDAIOCB *acb; >> + RADOSCB *rcb; >> + rados_completion_t c; >> + char n[RBD_MAX_SEG_NAME_SIZE]; >> + int64_t segnr, segoffs, segsize, last_segnr; >> + int64_t off, size; >> + char *buf; >> + >> + BDRVRBDState *s = bs->opaque; >> + >> + acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque); >> + acb->write = write; >> + acb->qiov = qiov; >> + acb->bounce = qemu_blockalign(bs, qiov->size); >> + acb->aiocnt = 0; >> + acb->ret = 0; >> + acb->error = 0; >> + acb->s = s; >> + >> + if (!acb->bh) { >> + acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); >> + } > > When do you expect acb->bh to be non-NULL? I guess this part was copied from somewhere else. We're not expecting it as we clean it up always, so check is removed. > >> + >> + if (write) { >> + qemu_iovec_to_buffer(acb->qiov, acb->bounce); >> + } >> + >> + buf = acb->bounce; >> + >> + off = sector_num * BDRV_SECTOR_SIZE; >> + size = nb_sectors * BDRV_SECTOR_SIZE; >> + segnr = off / s->objsize; >> + segoffs = off % s->objsize; >> + segsize = s->objsize - segoffs; >> + >> + last_segnr = ((off + size - 1) / s->objsize); >> + acb->aiocnt = (last_segnr - segnr) + 1; >> + >> + s->qemu_aio_count += acb->aiocnt; /* All the RADOSCB */ >> + >> + if (write && s->read_only) { >> + acb->ret = -EROFS; >> + return NULL; >> + } > > block.c:bdrv_aio_writev() will reject writes to read-only block > devices. This check can be eliminated and it also prevents leaking > acb here. Removed. Thanks, Yehuda diff --git a/block/rbd.c b/block/rbd.c index a51fc36..575e481 100644 --- a/block/rbd.c +++ b/block/rbd.c @@ -51,6 +51,7 @@ typedef struct RBDAIOCB { int aiocnt; int error; struct BDRVRBDState *s; + int cancelled; } RBDAIOCB; typedef struct RADOSCB { @@ -325,8 +326,18 @@ static void rbd_complete_aio(RADOSCB *rcb) int64_t r; int i; - r = rcb->ret; acb->aiocnt--; + + if (acb->cancelled) { + if (!acb->aiocnt) { + qemu_vfree(acb->bounce); + qemu_aio_release(acb); + } + goto done; + } + + r = rcb->ret; + if (acb->write) { if (r < 0) { acb->ret = r; @@ -356,6 +367,7 @@ static void rbd_complete_aio(RADOSCB *rcb) if (!acb->aiocnt && acb->bh) { qemu_bh_schedule(acb->bh); } +done: qemu_free(rcb); i = 0; } @@ -585,7 +597,7 @@ static void rbd_aio_cancel(BlockDriverAIOCB *blockacb) RBDAIOCB *acb = (RBDAIOCB *) blockacb; qemu_bh_delete(acb->bh); acb->bh = NULL; - qemu_aio_release(acb); + acb->cancelled = 1; } static AIOPool rbd_aio_pool = { @@ -606,7 +618,7 @@ static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb) { rcb->ret = rados_aio_get_return_value(c); rados_aio_release(c); - if (write(rcb->s->fds[RBD_FD_WRITE], (void *)&rcb, sizeof(&rcb)) < 0) { + if (write(rcb->s->fds[RBD_FD_WRITE], (void *)&rcb, sizeof(rcb)) < 0) { error_report("failed writing to acb->s->fds\n"); qemu_free(rcb); } @@ -654,10 +666,8 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, acb->ret = 0; acb->error = 0; acb->s = s; - - if (!acb->bh) { - acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); - } + acb->cancelled = 0; + acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); if (write) { qemu_iovec_to_buffer(acb->qiov, acb->bounce); @@ -676,11 +686,6 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, s->qemu_aio_count += acb->aiocnt; /* All the RADOSCB */ - if (write && s->read_only) { - acb->ret = -EROFS; - return NULL; - } - while (size > 0) { if (size < segsize) { segsize = size;
WARNING: multiple messages have this Message-ID (diff)
From: Yehuda Sadeh Weinraub <yehuda@hq.newdream.net> To: Stefan Hajnoczi <stefanha@gmail.com> Cc: Kevin Wolf <kwolf@redhat.com>, kvm@vger.kernel.org, qemu-devel@nongnu.org, Sage Weil <sage@newdream.net>, ceph-devel@vger.kernel.org, Christian Brunner <chb@muc.de> Subject: Re: [Qemu-devel] [PATCH -v5] ceph/rbd block driver for qemu-kvm Date: Sat, 9 Oct 2010 08:53:31 -0700 [thread overview] Message-ID: <AANLkTikU=X9yKCWX1k6gFUZz9CH2FFh8tUmWfrqmiFRz@mail.gmail.com> (raw) In-Reply-To: <AANLkTim35HsnQ=VJ7BAiqQtt6sZiNr6azC967ywocR7w@mail.gmail.com> On Sat, Oct 9, 2010 at 2:21 AM, Stefan Hajnoczi <stefanha@gmail.com> wrote: > On Fri, Oct 8, 2010 at 8:00 PM, Yehuda Sadeh <yehuda@hq.newdream.net> wrote: > No flush operation is supported. Can the guest be sure written data > is on stable storage when it receives completion? > That's part of the consistency that rados provides. >> +/* >> + * This aio completion is being called from rbd_aio_event_reader() and >> + * runs in qemu context. It schedules a bh, but just in case the aio >> + * was not cancelled before. > > Cancellation looks unsafe to me because acb is freed for cancel but > then accessed here! Also see my comment on aio_cancel() below. Right. Added ->cancelled field instead of releasing the acb. > >> +/* >> + * Cancel aio. Since we don't reference acb in a non qemu threads, >> + * it is safe to access it here. >> + */ >> +static void rbd_aio_cancel(BlockDriverAIOCB *blockacb) >> +{ >> + RBDAIOCB *acb = (RBDAIOCB *) blockacb; >> + qemu_bh_delete(acb->bh); >> + acb->bh = NULL; >> + qemu_aio_release(acb); > > Any pending librados completions are still running here and will then > cause acb to be accessed after they complete. If there is no safe way > to cancel then wait for the request to complete. Yeah, we'll keep the acb alive now and just set the ->cancelled. When last rados cb arrives for this acb we'll clean it up. > >> +} >> + >> +static AIOPool rbd_aio_pool = { >> + .aiocb_size = sizeof(RBDAIOCB), >> + .cancel = rbd_aio_cancel, >> +}; >> + >> +/* >> + * This is the callback function for rados_aio_read and _write >> + * >> + * Note: this function is being called from a non qemu thread so >> + * we need to be careful about what we do here. Generally we only >> + * write to the block notification pipe, and do the rest of the >> + * io completion handling from rbd_aio_event_reader() which >> + * runs in a qemu context. > > Do librados threads have all signals blocked? QEMU uses signals so it > is important that this signal not get sent to a librados thread and > discarded. I have seen this issue in the past when using threaded > libraries in QEMU. We're no longer blocking threads. We've hit that before too. > >> + */ >> +static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb) >> +{ >> + rcb->ret = rados_aio_get_return_value(c); >> + rados_aio_release(c); >> + if (write(rcb->s->fds[RBD_FD_WRITE], (void *)&rcb, sizeof(&rcb)) < 0) { > > You are writing RADOSCB* so sizeof(rcb) should be used. Oops. > >> + error_report("failed writing to acb->s->fds\n"); >> + qemu_free(rcb); >> + } >> +} >> + >> +/* Callback when all queued rados_aio requests are complete */ >> + >> +static void rbd_aio_bh_cb(void *opaque) >> +{ >> + RBDAIOCB *acb = opaque; >> + >> + if (!acb->write) { >> + qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size); >> + } >> + qemu_vfree(acb->bounce); >> + acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); >> + qemu_bh_delete(acb->bh); >> + acb->bh = NULL; >> + >> + qemu_aio_release(acb); >> +} >> + >> +static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, >> + int64_t sector_num, >> + QEMUIOVector *qiov, >> + int nb_sectors, >> + BlockDriverCompletionFunc *cb, >> + void *opaque, int write) >> +{ >> + RBDAIOCB *acb; >> + RADOSCB *rcb; >> + rados_completion_t c; >> + char n[RBD_MAX_SEG_NAME_SIZE]; >> + int64_t segnr, segoffs, segsize, last_segnr; >> + int64_t off, size; >> + char *buf; >> + >> + BDRVRBDState *s = bs->opaque; >> + >> + acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque); >> + acb->write = write; >> + acb->qiov = qiov; >> + acb->bounce = qemu_blockalign(bs, qiov->size); >> + acb->aiocnt = 0; >> + acb->ret = 0; >> + acb->error = 0; >> + acb->s = s; >> + >> + if (!acb->bh) { >> + acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); >> + } > > When do you expect acb->bh to be non-NULL? I guess this part was copied from somewhere else. We're not expecting it as we clean it up always, so check is removed. > >> + >> + if (write) { >> + qemu_iovec_to_buffer(acb->qiov, acb->bounce); >> + } >> + >> + buf = acb->bounce; >> + >> + off = sector_num * BDRV_SECTOR_SIZE; >> + size = nb_sectors * BDRV_SECTOR_SIZE; >> + segnr = off / s->objsize; >> + segoffs = off % s->objsize; >> + segsize = s->objsize - segoffs; >> + >> + last_segnr = ((off + size - 1) / s->objsize); >> + acb->aiocnt = (last_segnr - segnr) + 1; >> + >> + s->qemu_aio_count += acb->aiocnt; /* All the RADOSCB */ >> + >> + if (write && s->read_only) { >> + acb->ret = -EROFS; >> + return NULL; >> + } > > block.c:bdrv_aio_writev() will reject writes to read-only block > devices. This check can be eliminated and it also prevents leaking > acb here. Removed. Thanks, Yehuda diff --git a/block/rbd.c b/block/rbd.c index a51fc36..575e481 100644 --- a/block/rbd.c +++ b/block/rbd.c @@ -51,6 +51,7 @@ typedef struct RBDAIOCB { int aiocnt; int error; struct BDRVRBDState *s; + int cancelled; } RBDAIOCB; typedef struct RADOSCB { @@ -325,8 +326,18 @@ static void rbd_complete_aio(RADOSCB *rcb) int64_t r; int i; - r = rcb->ret; acb->aiocnt--; + + if (acb->cancelled) { + if (!acb->aiocnt) { + qemu_vfree(acb->bounce); + qemu_aio_release(acb); + } + goto done; + } + + r = rcb->ret; + if (acb->write) { if (r < 0) { acb->ret = r; @@ -356,6 +367,7 @@ static void rbd_complete_aio(RADOSCB *rcb) if (!acb->aiocnt && acb->bh) { qemu_bh_schedule(acb->bh); } +done: qemu_free(rcb); i = 0; } @@ -585,7 +597,7 @@ static void rbd_aio_cancel(BlockDriverAIOCB *blockacb) RBDAIOCB *acb = (RBDAIOCB *) blockacb; qemu_bh_delete(acb->bh); acb->bh = NULL; - qemu_aio_release(acb); + acb->cancelled = 1; } static AIOPool rbd_aio_pool = { @@ -606,7 +618,7 @@ static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb) { rcb->ret = rados_aio_get_return_value(c); rados_aio_release(c); - if (write(rcb->s->fds[RBD_FD_WRITE], (void *)&rcb, sizeof(&rcb)) < 0) { + if (write(rcb->s->fds[RBD_FD_WRITE], (void *)&rcb, sizeof(rcb)) < 0) { error_report("failed writing to acb->s->fds\n"); qemu_free(rcb); } @@ -654,10 +666,8 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, acb->ret = 0; acb->error = 0; acb->s = s; - - if (!acb->bh) { - acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); - } + acb->cancelled = 0; + acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); if (write) { qemu_iovec_to_buffer(acb->qiov, acb->bounce); @@ -676,11 +686,6 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, s->qemu_aio_count += acb->aiocnt; /* All the RADOSCB */ - if (write && s->read_only) { - acb->ret = -EROFS; - return NULL; - } - while (size > 0) { if (size < segsize) { segsize = size;
next prev parent reply other threads:[~2010-10-09 15:53 UTC|newest] Thread overview: 6+ messages / expand[flat|nested] mbox.gz Atom feed top 2010-10-08 19:00 [PATCH -v5] ceph/rbd block driver for qemu-kvm Yehuda Sadeh 2010-10-08 19:00 ` [Qemu-devel] " Yehuda Sadeh 2010-10-09 9:21 ` Stefan Hajnoczi 2010-10-09 9:21 ` Stefan Hajnoczi 2010-10-09 15:53 ` Yehuda Sadeh Weinraub [this message] 2010-10-09 15:53 ` Yehuda Sadeh Weinraub
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to='AANLkTikU=X9yKCWX1k6gFUZz9CH2FFh8tUmWfrqmiFRz@mail.gmail.com' \ --to=yehuda@hq.newdream.net \ --cc=anthony@codemonkey.ws \ --cc=ceph-devel@vger.kernel.org \ --cc=chb@muc.de \ --cc=kvm@vger.kernel.org \ --cc=kwolf@redhat.com \ --cc=qemu-devel@nongnu.org \ --cc=sage@newdream.net \ --cc=stefanha@gmail.com \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: linkBe sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes, see mirroring instructions on how to clone and mirror all data and code used by this external index.