From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from eggs.gnu.org ([2001:4830:134:3::10]:45536) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1aIxNc-0001gy-0C for qemu-devel@nongnu.org; Tue, 12 Jan 2016 06:44:53 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1aIxNX-0004Ce-Ap for qemu-devel@nongnu.org; Tue, 12 Jan 2016 06:44:51 -0500 Received: from mx1.redhat.com ([209.132.183.28]:42132) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1aIxNX-0004CP-3Y for qemu-devel@nongnu.org; Tue, 12 Jan 2016 06:44:47 -0500 Received: from int-mx11.intmail.prod.int.phx2.redhat.com (int-mx11.intmail.prod.int.phx2.redhat.com [10.5.11.24]) by mx1.redhat.com (Postfix) with ESMTPS id B357DAA5 for ; Tue, 12 Jan 2016 11:44:46 +0000 (UTC) From: "Daniel P. Berrange" Date: Tue, 12 Jan 2016 11:44:07 +0000 Message-Id: <1452599056-27357-14-git-send-email-berrange@redhat.com> In-Reply-To: <1452599056-27357-1-git-send-email-berrange@redhat.com> References: <1452599056-27357-1-git-send-email-berrange@redhat.com> Subject: [Qemu-devel] [PATCH v1 13/22] migration: convert RDMA to use QIOChannel interface List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: qemu-devel@nongnu.org Cc: Amit Shah , "Dr. David Alan Gilbert" , Juan Quintela This converts the RDMA code to provide a subclass of QIOChannel that uses RDMA for the data transport. The RDMA code would be much better off it it could be split up in a generic RDMA layer, a QIOChannel impl based on RMDA, and then the RMDA migration glue. This is left as a future exercise for the brave. Signed-off-by: Daniel P. Berrange --- migration/rdma.c | 260 ++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 161 insertions(+), 99 deletions(-) diff --git a/migration/rdma.c b/migration/rdma.c index bffbfaf..3e961cb 100644 --- a/migration/rdma.c +++ b/migration/rdma.c @@ -374,14 +374,19 @@ typedef struct RDMAContext { GHashTable *blockmap; } RDMAContext; -/* - * Interface to the rest of the migration call stack. - */ -typedef struct QEMUFileRDMA { +#define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma" +#define QIO_CHANNEL_RDMA(obj) \ + OBJECT_CHECK(QIOChannelRDMA, (obj), TYPE_QIO_CHANNEL_RDMA) + +typedef struct QIOChannelRDMA QIOChannelRDMA; + + +struct QIOChannelRDMA { + QIOChannel parent; RDMAContext *rdma; + QEMUFile *file; size_t len; - void *file; -} QEMUFileRDMA; +}; /* * Main structure for IB Send/Recv control messages. @@ -2518,15 +2523,19 @@ static void *qemu_rdma_data_init(const char *host_port, Error **errp) * SEND messages for control only. * VM's ram is handled with regular RDMA messages. */ -static ssize_t qemu_rdma_put_buffer(void *opaque, const uint8_t *buf, - int64_t pos, size_t size) -{ - QEMUFileRDMA *r = opaque; - QEMUFile *f = r->file; - RDMAContext *rdma = r->rdma; - size_t remaining = size; - uint8_t * data = (void *) buf; +static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, + const struct iovec *iov, + size_t niov, + int *fds, + size_t nfds, + Error **errp) +{ + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); + QEMUFile *f = rioc->file; + RDMAContext *rdma = rioc->rdma; int ret; + ssize_t done = 0; + size_t i; CHECK_ERROR_STATE(); @@ -2540,27 +2549,31 @@ static ssize_t qemu_rdma_put_buffer(void *opaque, const uint8_t *buf, return ret; } - while (remaining) { - RDMAControlHeader head; + for (i = 0; i < niov; i++) { + size_t remaining = iov[i].iov_len; + uint8_t * data = (void *)iov[i].iov_base; + while (remaining) { + RDMAControlHeader head; - r->len = MIN(remaining, RDMA_SEND_INCREMENT); - remaining -= r->len; + rioc->len = MIN(remaining, RDMA_SEND_INCREMENT); + remaining -= rioc->len; - /* Guaranteed to fit due to RDMA_SEND_INCREMENT MIN above */ - head.len = (uint32_t)r->len; - head.type = RDMA_CONTROL_QEMU_FILE; + head.len = rioc->len; + head.type = RDMA_CONTROL_QEMU_FILE; - ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL); + ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL); - if (ret < 0) { - rdma->error_state = ret; - return ret; - } + if (ret < 0) { + rdma->error_state = ret; + return ret; + } - data += r->len; + data += rioc->len; + done += rioc->len; + } } - return size; + return done; } static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf, @@ -2585,41 +2598,65 @@ static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf, * RDMA links don't use bytestreams, so we have to * return bytes to QEMUFile opportunistically. */ -static ssize_t qemu_rdma_get_buffer(void *opaque, uint8_t *buf, - int64_t pos, size_t size) -{ - QEMUFileRDMA *r = opaque; - RDMAContext *rdma = r->rdma; +static ssize_t qio_channel_rdma_readv(QIOChannel *ioc, + const struct iovec *iov, + size_t niov, + int **fds, + size_t *nfds, + Error **errp) +{ + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); + RDMAContext *rdma = rioc->rdma; RDMAControlHeader head; int ret = 0; + ssize_t i; + size_t done = 0; CHECK_ERROR_STATE(); - /* - * First, we hold on to the last SEND message we - * were given and dish out the bytes until we run - * out of bytes. - */ - r->len = qemu_rdma_fill(r->rdma, buf, size, 0); - if (r->len) { - return r->len; - } + for (i = 0; i < niov; i++) { + size_t want = iov[i].iov_len; + uint8_t *data = (void *)iov[i].iov_base; - /* - * Once we run out, we block and wait for another - * SEND message to arrive. - */ - ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE); + /* + * First, we hold on to the last SEND message we + * were given and dish out the bytes until we run + * out of bytes. + */ + ret = qemu_rdma_fill(rioc->rdma, data, want, 0); + if (ret > 0) { + done += ret; + if (ret < want) { + break; + } else { + continue; + } + } - if (ret < 0) { - rdma->error_state = ret; - return ret; - } + /* + * Once we run out, we block and wait for another + * SEND message to arrive. + */ + ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE); - /* - * SEND was received with new bytes, now try again. - */ - return qemu_rdma_fill(r->rdma, buf, size, 0); + if (ret < 0) { + rdma->error_state = ret; + return ret; + } + + /* + * SEND was received with new bytes, now try again. + */ + ret = qemu_rdma_fill(rioc->rdma, data, want, 0); + if (ret > 0) { + done += ret; + if (ret < want) { + break; + } + } + } + rioc->len = done; + return rioc->len; } /* @@ -2646,15 +2683,16 @@ static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma) return 0; } -static int qemu_rdma_close(void *opaque) +static int qio_channel_rdma_close(QIOChannel *ioc, + Error **errp) { + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); trace_qemu_rdma_close(); - QEMUFileRDMA *r = opaque; - if (r->rdma) { - qemu_rdma_cleanup(r->rdma); - g_free(r->rdma); + if (rioc->rdma) { + qemu_rdma_cleanup(rioc->rdma); + g_free(rioc->rdma); + rioc->rdma = NULL; } - g_free(r); return 0; } @@ -2696,8 +2734,8 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque, ram_addr_t block_offset, ram_addr_t offset, size_t size, uint64_t *bytes_sent) { - QEMUFileRDMA *rfile = opaque; - RDMAContext *rdma = rfile->rdma; + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); + RDMAContext *rdma = rioc->rdma; int ret; CHECK_ERROR_STATE(); @@ -2951,8 +2989,8 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque) }; RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT, .repeat = 1 }; - QEMUFileRDMA *rfile = opaque; - RDMAContext *rdma = rfile->rdma; + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); + RDMAContext *rdma = rioc->rdma; RDMALocalBlocks *local = &rdma->local_ram_blocks; RDMAControlHeader head; RDMARegister *reg, *registers; @@ -3207,9 +3245,10 @@ out: * We've already built our local RAMBlock list, but not yet sent the list to * the source. */ -static int rdma_block_notification_handle(QEMUFileRDMA *rfile, const char *name) +static int +rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name) { - RDMAContext *rdma = rfile->rdma; + RDMAContext *rdma = rioc->rdma; int curr; int found = -1; @@ -3251,8 +3290,8 @@ static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data) static int qemu_rdma_registration_start(QEMUFile *f, void *opaque, uint64_t flags, void *data) { - QEMUFileRDMA *rfile = opaque; - RDMAContext *rdma = rfile->rdma; + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); + RDMAContext *rdma = rioc->rdma; CHECK_ERROR_STATE(); @@ -3271,8 +3310,8 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, uint64_t flags, void *data) { Error *local_err = NULL, **errp = &local_err; - QEMUFileRDMA *rfile = opaque; - RDMAContext *rdma = rfile->rdma; + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); + RDMAContext *rdma = rioc->rdma; RDMAControlHeader head = { .len = 0, .repeat = 1 }; int ret = 0; @@ -3368,55 +3407,78 @@ err: return ret; } -static int qemu_rdma_get_fd(void *opaque) -{ - QEMUFileRDMA *rfile = opaque; - RDMAContext *rdma = rfile->rdma; - - return rdma->comp_channel->fd; -} - -static const QEMUFileOps rdma_read_ops = { - .get_buffer = qemu_rdma_get_buffer, - .get_fd = qemu_rdma_get_fd, - .close = qemu_rdma_close, -}; - static const QEMUFileHooks rdma_read_hooks = { .hook_ram_load = rdma_load_hook, }; -static const QEMUFileOps rdma_write_ops = { - .put_buffer = qemu_rdma_put_buffer, - .close = qemu_rdma_close, -}; - static const QEMUFileHooks rdma_write_hooks = { .before_ram_iterate = qemu_rdma_registration_start, .after_ram_iterate = qemu_rdma_registration_stop, .save_page = qemu_rdma_save_page, }; -static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode) + +static void qio_channel_rdma_finalize(Object *obj) +{ + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj); + if (rioc->rdma) { + qemu_rdma_cleanup(rioc->rdma); + g_free(rioc->rdma); + rioc->rdma = NULL; + } +} + +static void qio_channel_rdma_class_init(ObjectClass *klass, + void *class_data G_GNUC_UNUSED) +{ + QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass); + + ioc_klass->io_writev = qio_channel_rdma_writev; + ioc_klass->io_readv = qio_channel_rdma_readv; + /* XXX + ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking; + */ + ioc_klass->io_close = qio_channel_rdma_close; + /* XXX + ioc_klass->io_create_watch = qio_channel_rdma_create_watch; + */ +} + +static const TypeInfo qio_channel_rdma_info = { + .parent = TYPE_QIO_CHANNEL, + .name = TYPE_QIO_CHANNEL_RDMA, + .instance_size = sizeof(QIOChannelRDMA), + .instance_finalize = qio_channel_rdma_finalize, + .class_init = qio_channel_rdma_class_init, +}; + +static void qio_channel_rdma_register_types(void) +{ + type_register_static(&qio_channel_rdma_info); +} + +type_init(qio_channel_rdma_register_types); + +static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode) { - QEMUFileRDMA *r; + QIOChannelRDMA *rioc; if (qemu_file_mode_is_not_valid(mode)) { return NULL; } - r = g_new0(QEMUFileRDMA, 1); - r->rdma = rdma; + rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA)); + rioc->rdma = rdma; if (mode[0] == 'w') { - r->file = qemu_fopen_ops(r, &rdma_write_ops); - qemu_file_set_hooks(r->file, &rdma_write_hooks); + rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc)); + qemu_file_set_hooks(rioc->file, &rdma_write_hooks); } else { - r->file = qemu_fopen_ops(r, &rdma_read_ops); - qemu_file_set_hooks(r->file, &rdma_read_hooks); + rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc)); + qemu_file_set_hooks(rioc->file, &rdma_read_hooks); } - return r->file; + return rioc->file; } static void rdma_accept_incoming_migration(void *opaque) -- 2.5.0