From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from eggs.gnu.org ([2001:4830:134:3::10]:37647) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1aQh8z-0000fJ-2u for qemu-devel@nongnu.org; Tue, 02 Feb 2016 15:01:46 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1aQh8v-0000BX-QW for qemu-devel@nongnu.org; Tue, 02 Feb 2016 15:01:45 -0500 Received: from mx1.redhat.com ([209.132.183.28]:39277) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1aQh8v-0000BT-Ig for qemu-devel@nongnu.org; Tue, 02 Feb 2016 15:01:41 -0500 Received: from int-mx11.intmail.prod.int.phx2.redhat.com (int-mx11.intmail.prod.int.phx2.redhat.com [10.5.11.24]) by mx1.redhat.com (Postfix) with ESMTPS id D6E9B9C0DF for ; Tue, 2 Feb 2016 20:01:40 +0000 (UTC) Date: Tue, 2 Feb 2016 20:01:36 +0000 From: "Dr. David Alan Gilbert" Message-ID: <20160202200136.GF4498@work-vm> References: <1452599056-27357-1-git-send-email-berrange@redhat.com> <1452599056-27357-14-git-send-email-berrange@redhat.com> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <1452599056-27357-14-git-send-email-berrange@redhat.com> Subject: Re: [Qemu-devel] [PATCH v1 13/22] migration: convert RDMA to use QIOChannel interface List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: "Daniel P. Berrange" Cc: Amit Shah , qemu-devel@nongnu.org, Juan Quintela * Daniel P. Berrange (berrange@redhat.com) wrote: > This converts the RDMA code to provide a subclass of > QIOChannel that uses RDMA for the data transport. > > The RDMA code would be much better off it it could > be split up in a generic RDMA layer, a QIOChannel > impl based on RMDA, and then the RMDA migration > glue. This is left as a future exercise for the brave. > > Signed-off-by: Daniel P. Berrange > --- > migration/rdma.c | 260 ++++++++++++++++++++++++++++++++++--------------------- > 1 file changed, 161 insertions(+), 99 deletions(-) > > diff --git a/migration/rdma.c b/migration/rdma.c > index bffbfaf..3e961cb 100644 > --- a/migration/rdma.c > +++ b/migration/rdma.c > @@ -374,14 +374,19 @@ typedef struct RDMAContext { > GHashTable *blockmap; > } RDMAContext; > > -/* > - * Interface to the rest of the migration call stack. > - */ > -typedef struct QEMUFileRDMA { > +#define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma" > +#define QIO_CHANNEL_RDMA(obj) \ > + OBJECT_CHECK(QIOChannelRDMA, (obj), TYPE_QIO_CHANNEL_RDMA) > + > +typedef struct QIOChannelRDMA QIOChannelRDMA; > + > + > +struct QIOChannelRDMA { > + QIOChannel parent; > RDMAContext *rdma; > + QEMUFile *file; > size_t len; > - void *file; > -} QEMUFileRDMA; > +}; > > /* > * Main structure for IB Send/Recv control messages. > @@ -2518,15 +2523,19 @@ static void *qemu_rdma_data_init(const char *host_port, Error **errp) > * SEND messages for control only. > * VM's ram is handled with regular RDMA messages. > */ > -static ssize_t qemu_rdma_put_buffer(void *opaque, const uint8_t *buf, > - int64_t pos, size_t size) > -{ > - QEMUFileRDMA *r = opaque; > - QEMUFile *f = r->file; > - RDMAContext *rdma = r->rdma; > - size_t remaining = size; > - uint8_t * data = (void *) buf; > +static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, > + const struct iovec *iov, > + size_t niov, > + int *fds, > + size_t nfds, > + Error **errp) > +{ > + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); > + QEMUFile *f = rioc->file; > + RDMAContext *rdma = rioc->rdma; > int ret; > + ssize_t done = 0; > + size_t i; > > CHECK_ERROR_STATE(); > > @@ -2540,27 +2549,31 @@ static ssize_t qemu_rdma_put_buffer(void *opaque, const uint8_t *buf, > return ret; > } > > - while (remaining) { > - RDMAControlHeader head; > + for (i = 0; i < niov; i++) { > + size_t remaining = iov[i].iov_len; > + uint8_t * data = (void *)iov[i].iov_base; > + while (remaining) { > + RDMAControlHeader head; > > - r->len = MIN(remaining, RDMA_SEND_INCREMENT); > - remaining -= r->len; > + rioc->len = MIN(remaining, RDMA_SEND_INCREMENT); > + remaining -= rioc->len; > > - /* Guaranteed to fit due to RDMA_SEND_INCREMENT MIN above */ > - head.len = (uint32_t)r->len; > - head.type = RDMA_CONTROL_QEMU_FILE; > + head.len = rioc->len; > + head.type = RDMA_CONTROL_QEMU_FILE; > > - ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL); > + ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL); > > - if (ret < 0) { > - rdma->error_state = ret; > - return ret; > - } > + if (ret < 0) { > + rdma->error_state = ret; > + return ret; > + } > > - data += r->len; > + data += rioc->len; > + done += rioc->len; > + } > } > > - return size; > + return done; > } > > static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf, > @@ -2585,41 +2598,65 @@ static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf, > * RDMA links don't use bytestreams, so we have to > * return bytes to QEMUFile opportunistically. > */ > -static ssize_t qemu_rdma_get_buffer(void *opaque, uint8_t *buf, > - int64_t pos, size_t size) > -{ > - QEMUFileRDMA *r = opaque; > - RDMAContext *rdma = r->rdma; > +static ssize_t qio_channel_rdma_readv(QIOChannel *ioc, > + const struct iovec *iov, > + size_t niov, > + int **fds, > + size_t *nfds, > + Error **errp) > +{ > + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); > + RDMAContext *rdma = rioc->rdma; > RDMAControlHeader head; > int ret = 0; > + ssize_t i; > + size_t done = 0; > > CHECK_ERROR_STATE(); > > - /* > - * First, we hold on to the last SEND message we > - * were given and dish out the bytes until we run > - * out of bytes. > - */ > - r->len = qemu_rdma_fill(r->rdma, buf, size, 0); > - if (r->len) { > - return r->len; > - } > + for (i = 0; i < niov; i++) { > + size_t want = iov[i].iov_len; > + uint8_t *data = (void *)iov[i].iov_base; > > - /* > - * Once we run out, we block and wait for another > - * SEND message to arrive. > - */ > - ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE); > + /* > + * First, we hold on to the last SEND message we > + * were given and dish out the bytes until we run > + * out of bytes. > + */ > + ret = qemu_rdma_fill(rioc->rdma, data, want, 0); > + if (ret > 0) { > + done += ret; > + if (ret < want) { > + break; > + } else { > + continue; > + } > + } > > - if (ret < 0) { > - rdma->error_state = ret; > - return ret; > - } > + /* > + * Once we run out, we block and wait for another > + * SEND message to arrive. > + */ > + ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE); > > - /* > - * SEND was received with new bytes, now try again. > - */ > - return qemu_rdma_fill(r->rdma, buf, size, 0); > + if (ret < 0) { > + rdma->error_state = ret; > + return ret; > + } > + > + /* > + * SEND was received with new bytes, now try again. > + */ > + ret = qemu_rdma_fill(rioc->rdma, data, want, 0); > + if (ret > 0) { > + done += ret; > + if (ret < want) { > + break; > + } > + } I don't quite understand the behaviour of this loop. If rdma_fill returns less than you wanted for the first iov we break. If it returns 0 then we try and get some more. The weird thing to me is if we have two iov entries; if the amount returned by the qemu_rdma_fill happens to match the size of the 1st iov then I think we end up doing the exchange_recv and waiting for more. Is that what we want? Why? Dave > + } > + rioc->len = done; > + return rioc->len; > } > > /* > @@ -2646,15 +2683,16 @@ static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma) > return 0; > } > > -static int qemu_rdma_close(void *opaque) > +static int qio_channel_rdma_close(QIOChannel *ioc, > + Error **errp) > { > + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); > trace_qemu_rdma_close(); > - QEMUFileRDMA *r = opaque; > - if (r->rdma) { > - qemu_rdma_cleanup(r->rdma); > - g_free(r->rdma); > + if (rioc->rdma) { > + qemu_rdma_cleanup(rioc->rdma); > + g_free(rioc->rdma); > + rioc->rdma = NULL; > } > - g_free(r); > return 0; > } > > @@ -2696,8 +2734,8 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque, > ram_addr_t block_offset, ram_addr_t offset, > size_t size, uint64_t *bytes_sent) > { > - QEMUFileRDMA *rfile = opaque; > - RDMAContext *rdma = rfile->rdma; > + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); > + RDMAContext *rdma = rioc->rdma; > int ret; > > CHECK_ERROR_STATE(); > @@ -2951,8 +2989,8 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque) > }; > RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT, > .repeat = 1 }; > - QEMUFileRDMA *rfile = opaque; > - RDMAContext *rdma = rfile->rdma; > + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); > + RDMAContext *rdma = rioc->rdma; > RDMALocalBlocks *local = &rdma->local_ram_blocks; > RDMAControlHeader head; > RDMARegister *reg, *registers; > @@ -3207,9 +3245,10 @@ out: > * We've already built our local RAMBlock list, but not yet sent the list to > * the source. > */ > -static int rdma_block_notification_handle(QEMUFileRDMA *rfile, const char *name) > +static int > +rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name) > { > - RDMAContext *rdma = rfile->rdma; > + RDMAContext *rdma = rioc->rdma; > int curr; > int found = -1; > > @@ -3251,8 +3290,8 @@ static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data) > static int qemu_rdma_registration_start(QEMUFile *f, void *opaque, > uint64_t flags, void *data) > { > - QEMUFileRDMA *rfile = opaque; > - RDMAContext *rdma = rfile->rdma; > + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); > + RDMAContext *rdma = rioc->rdma; > > CHECK_ERROR_STATE(); > > @@ -3271,8 +3310,8 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, > uint64_t flags, void *data) > { > Error *local_err = NULL, **errp = &local_err; > - QEMUFileRDMA *rfile = opaque; > - RDMAContext *rdma = rfile->rdma; > + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); > + RDMAContext *rdma = rioc->rdma; > RDMAControlHeader head = { .len = 0, .repeat = 1 }; > int ret = 0; > > @@ -3368,55 +3407,78 @@ err: > return ret; > } > > -static int qemu_rdma_get_fd(void *opaque) > -{ > - QEMUFileRDMA *rfile = opaque; > - RDMAContext *rdma = rfile->rdma; > - > - return rdma->comp_channel->fd; > -} > - > -static const QEMUFileOps rdma_read_ops = { > - .get_buffer = qemu_rdma_get_buffer, > - .get_fd = qemu_rdma_get_fd, > - .close = qemu_rdma_close, > -}; > - > static const QEMUFileHooks rdma_read_hooks = { > .hook_ram_load = rdma_load_hook, > }; > > -static const QEMUFileOps rdma_write_ops = { > - .put_buffer = qemu_rdma_put_buffer, > - .close = qemu_rdma_close, > -}; > - > static const QEMUFileHooks rdma_write_hooks = { > .before_ram_iterate = qemu_rdma_registration_start, > .after_ram_iterate = qemu_rdma_registration_stop, > .save_page = qemu_rdma_save_page, > }; > > -static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode) > + > +static void qio_channel_rdma_finalize(Object *obj) > +{ > + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj); > + if (rioc->rdma) { > + qemu_rdma_cleanup(rioc->rdma); > + g_free(rioc->rdma); > + rioc->rdma = NULL; > + } > +} > + > +static void qio_channel_rdma_class_init(ObjectClass *klass, > + void *class_data G_GNUC_UNUSED) > +{ > + QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass); > + > + ioc_klass->io_writev = qio_channel_rdma_writev; > + ioc_klass->io_readv = qio_channel_rdma_readv; > + /* XXX > + ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking; > + */ > + ioc_klass->io_close = qio_channel_rdma_close; > + /* XXX > + ioc_klass->io_create_watch = qio_channel_rdma_create_watch; > + */ > +} > + > +static const TypeInfo qio_channel_rdma_info = { > + .parent = TYPE_QIO_CHANNEL, > + .name = TYPE_QIO_CHANNEL_RDMA, > + .instance_size = sizeof(QIOChannelRDMA), > + .instance_finalize = qio_channel_rdma_finalize, > + .class_init = qio_channel_rdma_class_init, > +}; > + > +static void qio_channel_rdma_register_types(void) > +{ > + type_register_static(&qio_channel_rdma_info); > +} > + > +type_init(qio_channel_rdma_register_types); > + > +static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode) > { > - QEMUFileRDMA *r; > + QIOChannelRDMA *rioc; > > if (qemu_file_mode_is_not_valid(mode)) { > return NULL; > } > > - r = g_new0(QEMUFileRDMA, 1); > - r->rdma = rdma; > + rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA)); > + rioc->rdma = rdma; > > if (mode[0] == 'w') { > - r->file = qemu_fopen_ops(r, &rdma_write_ops); > - qemu_file_set_hooks(r->file, &rdma_write_hooks); > + rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc)); > + qemu_file_set_hooks(rioc->file, &rdma_write_hooks); > } else { > - r->file = qemu_fopen_ops(r, &rdma_read_ops); > - qemu_file_set_hooks(r->file, &rdma_read_hooks); > + rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc)); > + qemu_file_set_hooks(rioc->file, &rdma_read_hooks); > } > > - return r->file; > + return rioc->file; > } > > static void rdma_accept_incoming_migration(void *opaque) > -- > 2.5.0 > -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK