From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from [140.186.70.92] (port=52938 helo=eggs.gnu.org) by lists.gnu.org with esmtp (Exim 4.43) id 1Pq6ob-00067G-Mh for qemu-devel@nongnu.org; Thu, 17 Feb 2011 11:39:39 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1Pq6oW-0001vg-57 for qemu-devel@nongnu.org; Thu, 17 Feb 2011 11:34:46 -0500 Received: from egg.sh.bytemark.co.uk ([212.110.161.171]:42248) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1Pq6oV-0001vc-Q0 for qemu-devel@nongnu.org; Thu, 17 Feb 2011 11:34:44 -0500 From: Nicholas Thomas In-Reply-To: <4D5BBC7B.9020807@redhat.com> References: <1297712422.12551.2.camel@den> <4D5A5ECD.7060701@redhat.com> <1297805193.12551.39.camel@den> <4D5BBC7B.9020807@redhat.com> Content-Type: text/plain; charset="UTF-8" Date: Thu, 17 Feb 2011 16:34:42 +0000 Message-ID: <1297960482.21300.62.camel@desk4.office.bytemark.co.uk> Mime-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: [Qemu-devel] [PATCH 3/3] block/nbd: Make the NBD block device use the AIO interface List-Id: qemu-devel.nongnu.org List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Kevin Wolf Cc: Stefan Hajnoczi , qemu-devel@nongnu.org Signed-off-by: Nick Thomas --- block/nbd.c | 549 ++++++++++++++++++++++++++++++++++++++++++++++++++--------- 1 files changed, 464 insertions(+), 85 deletions(-) diff --git a/block/nbd.c b/block/nbd.c index c8dc763..1387227 100644 --- a/block/nbd.c +++ b/block/nbd.c @@ -1,11 +1,12 @@ /* - * QEMU Block driver for NBD + * QEMU Block driver for NBD - asynchronous IO * * Copyright (C) 2008 Bull S.A.S. * Author: Laurent Vivier * * Some parts: * Copyright (C) 2007 Anthony Liguori + * Copyright (C) 2011 Nick Thomas * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -27,66 +28,132 @@ */ #include "qemu-common.h" +#include "qemu_socket.h" #include "nbd.h" #include "module.h" #include #include -#define EN_OPTSTR ":exportname=" +#define EN_OPTSTR ":exportname=" +#define SECTOR_SIZE 512 + +#define DEBUG_NBD + +#if defined(DEBUG_NBD) +#define logout(fmt, ...) \ + fprintf(stderr, "nbd\t%-24s" fmt, __func__, ##__VA_ARGS__) +#else +#define logout(fmt, ...) ((void)0) +#endif + + +typedef struct NBDAIOCB NBDAIOCB; + +typedef struct AIOReq { + NBDAIOCB *aiocb; + unsigned int iov_offset; + + off_t offset; + size_t data_len; + uint8_t flags; + uint64_t handle; + + QLIST_ENTRY(AIOReq) outstanding_aio_siblings; + QLIST_ENTRY(AIOReq) aioreq_siblings; +} AIOReq; + typedef struct BDRVNBDState { int sock; off_t size; size_t blocksize; + + /* Filled in by nbd_config. Store host_spec because DNS may change */ + bool tcp_conn; /* True, we use TCP. False, UNIX domain sockets */ + char *export_name; /* An NBD server may export several devices */ + char *host_spec; /* Path to socket (UNIX) or hostname/IP (TCP) */ + uint16_t tcp_port; + + /* We use these for asynchronous I/O */ + uint64_t aioreq_seq_num; + QLIST_HEAD(outstanding_aio_head, AIOReq) outstanding_aio_head; } BDRVNBDState; -static int nbd_open(BlockDriverState *bs, const char* filename, int flags) +enum AIOCBState { + AIOCB_WRITE_UDATA, + AIOCB_READ_UDATA, +}; + +struct NBDAIOCB { + BlockDriverAIOCB common; + + QEMUIOVector *qiov; + + int64_t sector_num; + int nb_sectors; + + int ret; + enum AIOCBState aiocb_type; + + QEMUBH *bh; + void (*aio_done_func)(NBDAIOCB *); + + int canceled; + + QLIST_HEAD(aioreq_head, AIOReq) aioreq_head; +}; + +static inline int free_aio_req(BDRVNBDState *s, AIOReq *aio_req) { - BDRVNBDState *s = bs->opaque; - uint32_t nbdflags; + NBDAIOCB *acb = aio_req->aiocb; + QLIST_REMOVE(aio_req, outstanding_aio_siblings); + QLIST_REMOVE(aio_req, aioreq_siblings); + qemu_free(aio_req); + + return !QLIST_EMPTY(&acb->aioreq_head); +} +static int nbd_config(BDRVNBDState *s, const char* filename, int flags) +{ char *file; - char *name; - const char *host; + char *export_name; + const char *host_spec; const char *unixpath; - int sock; - off_t size; - size_t blocksize; - int ret; int err = -EINVAL; file = qemu_strdup(filename); - name = strstr(file, EN_OPTSTR); - if (name) { - if (name[strlen(EN_OPTSTR)] == 0) { + export_name = strstr(file, EN_OPTSTR); + if (export_name) { + if (export_name[strlen(EN_OPTSTR)] == 0) { goto out; } - name[0] = 0; - name += strlen(EN_OPTSTR); + export_name[0] = 0; /* truncate 'file' */ + export_name += strlen(EN_OPTSTR); + s->export_name = qemu_strdup(export_name); } - if (!strstart(file, "nbd:", &host)) { + /* extract the host_spec - fail if it's not nbd:* */ + if (!strstart(file, "nbd:", &host_spec)) { goto out; } - if (strstart(host, "unix:", &unixpath)) { - - if (unixpath[0] != '/') { + /* are we a UNIX or TCP socket? */ + if (strstart(host_spec, "unix:", &unixpath)) { + if (unixpath[0] != '/') { /* We demand an absolute path*/ goto out; } - - sock = unix_socket_outgoing(unixpath); - + s->tcp_conn = false; + s->host_spec = qemu_strdup(unixpath); } else { + /* We should have an : string to split up */ uint16_t port = NBD_DEFAULT_PORT; char *p, *r; char hostname[128]; - pstrcpy(hostname, 128, host); - - p = strchr(hostname, ':'); + pstrcpy(hostname, 128, host_spec); + p = strchr(hostname, ':'); /* FIXME: IPv6 */ if (p != NULL) { *p = '\0'; p++; @@ -96,121 +163,433 @@ static int nbd_open(BlockDriverState *bs, const char* filename, int flags) goto out; } } + s->tcp_conn = true; + s->host_spec = qemu_strdup(hostname); + s->tcp_port = port; + } - sock = tcp_socket_outgoing(hostname, port); + err = 0; + +out: + qemu_free(file); + if (err != 0) { + if (s->export_name != NULL) { + qemu_free(s->export_name); + } + if (s->host_spec != NULL) { + qemu_free(s->host_spec); + } + } + return err; +} + +static void aio_read_response(void *opaque) +{ + BDRVNBDState *s = opaque; + struct nbd_reply reply; + + AIOReq *aio_req = NULL; + NBDAIOCB *acb; + int rest; + + if (QLIST_EMPTY(&s->outstanding_aio_head)) { + return; } + /* read the header */ + if (nbd_receive_reply(s->sock, &reply) == -1) { + logout("Failed to read response from socket\n"); + /* Having failed to read the reply header, we can't know which + * aio_req this corresponds to - so we can't signal a failure. + */ + return; + } + + /* find the right aio_req from the outstanding_aio list */ + QLIST_FOREACH(aio_req, &s->outstanding_aio_head, outstanding_aio_siblings) { + if (aio_req->handle == reply.handle) { + break; + } + } + + if (!aio_req) { + logout("cannot find aio_req for handle %lu\n", reply.handle); + return; + } + + acb = aio_req->aiocb; + + if (acb->aiocb_type == AIOCB_READ_UDATA) { + off_t offset = 0; + int ret = 0; + size_t total = aio_req->data_len; + + while (offset < total) { + ret = nbd_wr_aio(s->sock, acb->qiov->iov, total - offset, offset, + true); + + if (ret == -1) { + logout("Error reading from NBD server: %i (%s)\n", + errno, strerror(errno)); + return; + } + + offset += ret; + } + } + + if (reply.error != 0) { + acb->ret = -EIO; + logout("NBD request resulted in error %i\n", reply.error); + } + + rest = free_aio_req(s, aio_req); + if (!rest) { + acb->aio_done_func(acb); + } + + return; +} + +static int aio_flush_request(void *opaque) +{ + BDRVNBDState *s = opaque; + + return !QLIST_EMPTY(&s->outstanding_aio_head); +} + +/* + * Connect to the NBD server specified in the state object + */ +static int nbd_establish_connection(BlockDriverState *bs) +{ + BDRVNBDState *s = bs->opaque; + int sock; + int ret; + off_t size; + size_t blocksize; + uint32_t nbdflags; + + if (s->tcp_conn == true) { + sock = tcp_socket_outgoing(s->host_spec, s->tcp_port); + } else { + sock = unix_socket_outgoing(s->host_spec); + } + + /* Failed to establish connection */ if (sock == -1) { - err = -errno; - goto out; + logout("Failed to establish connection to NBD server\n"); + return -errno; } - ret = nbd_receive_negotiate(sock, name, &nbdflags, &size, &blocksize); + /* NBD handshake */ + ret = nbd_receive_negotiate(sock, s->export_name, &nbdflags, &size, + &blocksize); if (ret == -1) { - err = -errno; - goto out; + logout("Failed to negotiate with the NBD server\n"); + closesocket(sock); + return -errno; } + /* Now that we're connected, set the socket to be non-blocking */ + socket_set_nonblock(sock); + s->sock = sock; s->size = size; s->blocksize = blocksize; - err = 0; -out: - qemu_free(file); - return err; + /* Response handler. This is called when there is data to read */ + qemu_aio_set_fd_handler(sock, aio_read_response, NULL, aio_flush_request, + NULL, s); + logout("Established connection with NBD server\n"); + return 0; } -static int nbd_read(BlockDriverState *bs, int64_t sector_num, - uint8_t *buf, int nb_sectors) +static void nbd_teardown_connection(BlockDriverState *bs) { + /* Send the final packet to the NBD server and close the socket */ BDRVNBDState *s = bs->opaque; struct nbd_request request; - struct nbd_reply reply; - request.type = NBD_CMD_READ; + request.type = NBD_CMD_DISC; request.handle = (uint64_t)(intptr_t)bs; - request.from = sector_num * 512;; - request.len = nb_sectors * 512; + request.from = 0; + request.len = 0; + nbd_send_request(s->sock, &request); - if (nbd_send_request(s->sock, &request) == -1) - return -errno; + qemu_aio_set_fd_handler(s->sock, NULL, NULL, NULL, NULL, NULL); + closesocket(s->sock); + logout("Connection to NBD server closed\n"); + return; +} - if (nbd_receive_reply(s->sock, &reply) == -1) - return -errno; +static int nbd_open(BlockDriverState *bs, const char* filename, int flags) +{ + BDRVNBDState *s = bs->opaque; + int result; - if (reply.error !=0) - return -reply.error; + /* Pop the config into our state object. Exit if invalid. */ + result = nbd_config(s, filename, flags); - if (reply.handle != request.handle) - return -EIO; + if (result != 0) { + return result; + } - if (nbd_wr_sync(s->sock, buf, request.len, 1) != request.len) - return -EIO; + QLIST_INIT(&s->outstanding_aio_head); + + /* establish TCP connection, return error if it fails + * TODO: Configurable retry-until-timeout behaviour. + */ + result = nbd_establish_connection(bs); + if (result != 0) { + return result; + } return 0; } -static int nbd_write(BlockDriverState *bs, int64_t sector_num, - const uint8_t *buf, int nb_sectors) +static void nbd_close(BlockDriverState *bs) { + nbd_teardown_connection(bs); BDRVNBDState *s = bs->opaque; + + if (s->export_name != NULL) { + qemu_free(s->export_name); + } + if (s->host_spec != NULL) { + qemu_free(s->host_spec); + } + + return; +} + +static int add_aio_request(BDRVNBDState *s, AIOReq *aio_req, QEMUIOVector *qiov, + enum AIOCBState aiocb_type) +{ struct nbd_request request; - struct nbd_reply reply; - request.type = NBD_CMD_WRITE; - request.handle = (uint64_t)(intptr_t)bs; - request.from = sector_num * 512;; - request.len = nb_sectors * 512; + request.from = aio_req->offset; + request.len = aio_req->data_len; + request.handle = aio_req->handle; + + if (aiocb_type == AIOCB_READ_UDATA) { + request.type = NBD_CMD_READ; + } else { + request.type = NBD_CMD_WRITE; + } - if (nbd_send_request(s->sock, &request) == -1) + /* Write the request to the socket. Header first. */ + if (nbd_send_request(s->sock, &request) == -1) { + /* TODO: retry handling. This leads to -EIO and request cancellation */ + logout("writing request header to server failed\n"); return -errno; + } - if (nbd_wr_sync(s->sock, (uint8_t*)buf, request.len, 0) != request.len) - return -EIO; + /* If this is a write, send the data too */ + if (aiocb_type == AIOCB_WRITE_UDATA) { + int ret = 0; + off_t offset = 0; + size_t total = aio_req->data_len; + + while (offset < total) { + ret = nbd_wr_aio(s->sock, qiov->iov, total - offset, offset, false); + if (ret == -1) { + logout("Error writing request data to NBD server: %i (% s)\n", + errno, strerror(errno)); + return -EIO; + } - if (nbd_receive_reply(s->sock, &reply) == -1) - return -errno; + offset += ret; + } + } + + return 0; +} + +static inline AIOReq *alloc_aio_req(BDRVNBDState *s, NBDAIOCB *acb, + unsigned int data_len, + uint64_t offset, uint8_t flags, + unsigned int iov_offset) +{ + AIOReq *aio_req; + + aio_req = qemu_malloc(sizeof(*aio_req)); + aio_req->aiocb = acb; + aio_req->iov_offset = iov_offset; + aio_req->offset = offset; + aio_req->data_len = data_len; + aio_req->flags = flags; + aio_req->handle = s->aioreq_seq_num++; /* FIXME: Trivially guessable */ + + QLIST_INSERT_HEAD(&s->outstanding_aio_head, aio_req, + outstanding_aio_siblings); + QLIST_INSERT_HEAD(&acb->aioreq_head, aio_req, aioreq_siblings); + + return aio_req; +} + +static void nbd_finish_aiocb(NBDAIOCB *acb) +{ + if (!acb->canceled) { + acb->common.cb(acb->common.opaque, acb->ret); + } + qemu_aio_release(acb); +} + + +static void nbd_aio_cancel(BlockDriverAIOCB *blockacb) +{ + NBDAIOCB *acb = (NBDAIOCB *)blockacb; + + /* + * We cannot cancel the requests which are already sent to + * the servers, so we just complete the request with -EIO here. + */ + acb->common.cb(acb->common.opaque, -EIO); + acb->canceled = 1; +} + +static AIOPool nbd_aio_pool = { + .aiocb_size = sizeof(NBDAIOCB), + .cancel = nbd_aio_cancel, +}; + +static NBDAIOCB *nbd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov, + int64_t sector_num, int nb_sectors, + BlockDriverCompletionFunc *cb, void *opaque) +{ + NBDAIOCB *acb; + + acb = qemu_aio_get(&nbd_aio_pool, bs, cb, opaque); - if (reply.error !=0) - return -reply.error; + acb->qiov = qiov; + + acb->sector_num = sector_num; + acb->nb_sectors = nb_sectors; + + acb->aio_done_func = NULL; + acb->canceled = 0; + acb->bh = NULL; + acb->ret = 0; + QLIST_INIT(&acb->aioreq_head); + return acb; +} - if (reply.handle != request.handle) +static int nbd_schedule_bh(QEMUBHFunc *cb, NBDAIOCB *acb) +{ + if (acb->bh) { + logout("bug: %d %d\n", acb->aiocb_type, acb->aiocb_type); + return -EIO; + } + + acb->bh = qemu_bh_new(cb, acb); + if (!acb->bh) { + logout("oom: %d %d\n", acb->aiocb_type, acb->aiocb_type); return -EIO; + } + + qemu_bh_schedule(acb->bh); return 0; } -static void nbd_close(BlockDriverState *bs) +/* + * Send I/O requests to the server. + * + * This function sends requests to the server, links the requests to + * the outstanding_list in BDRVNBDState, and exits without waiting for + * the response. The responses are received in the `aio_read_response' + * function which is called from the main loop as a fd handler. + */ +static void nbd_readv_writev_bh_cb(void *p) { - BDRVNBDState *s = bs->opaque; - struct nbd_request request; + NBDAIOCB *acb = p; + int ret = 0; + unsigned long len, done = 0, total = acb->nb_sectors * SECTOR_SIZE; + unsigned long idx = acb->sector_num; - request.type = NBD_CMD_DISC; - request.handle = (uint64_t)(intptr_t)bs; - request.from = 0; - request.len = 0; - nbd_send_request(s->sock, &request); + uint64_t offset = acb->sector_num * SECTOR_SIZE; + BDRVNBDState *s = acb->common.bs->opaque; + + AIOReq *aio_req; - close(s->sock); + qemu_bh_delete(acb->bh); + acb->bh = NULL; + + while (done != total) { + uint8_t flags = 0; + + len = total - done; + + aio_req = alloc_aio_req(s, acb, len, offset, flags, done); + + ret = add_aio_request(s, aio_req, acb->qiov, acb->aiocb_type); + + if (ret < 0) { + free_aio_req(s, aio_req); + acb->ret = -EIO; + goto out; + } + + offset = 0; + idx++; + done += len; + } +out: + if (QLIST_EMPTY(&acb->aioreq_head)) { + nbd_finish_aiocb(acb); + } } +static BlockDriverAIOCB *nbd_aio_readv(BlockDriverState *bs, + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, + BlockDriverCompletionFunc *cb, void *opaque) +{ + NBDAIOCB *acb; + int i; + + acb = nbd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque); + acb->aiocb_type = AIOCB_READ_UDATA; + acb->aio_done_func = nbd_finish_aiocb; + + for (i = 0; i < qiov->niov; i++) { + memset(qiov->iov[i].iov_base, 0, qiov->iov[i].iov_len); + } + + nbd_schedule_bh(nbd_readv_writev_bh_cb, acb); + return &acb->common; +} + +static BlockDriverAIOCB *nbd_aio_writev(BlockDriverState *bs, + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, + BlockDriverCompletionFunc *cb, void *opaque) +{ + NBDAIOCB *acb; + + acb = nbd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque); + acb->aiocb_type = AIOCB_WRITE_UDATA; + acb->aio_done_func = nbd_finish_aiocb; + + nbd_schedule_bh(nbd_readv_writev_bh_cb, acb); + return &acb->common; +} + + static int64_t nbd_getlength(BlockDriverState *bs) { BDRVNBDState *s = bs->opaque; - return s->size; } static BlockDriver bdrv_nbd = { - .format_name = "nbd", - .instance_size = sizeof(BDRVNBDState), - .bdrv_file_open = nbd_open, - .bdrv_read = nbd_read, - .bdrv_write = nbd_write, - .bdrv_close = nbd_close, - .bdrv_getlength = nbd_getlength, - .protocol_name = "nbd", + .format_name = "nbd", + .instance_size = sizeof(BDRVNBDState), + .bdrv_file_open = nbd_open, + .bdrv_aio_readv = nbd_aio_readv, + .bdrv_aio_writev = nbd_aio_writev, + .bdrv_close = nbd_close, + .bdrv_getlength = nbd_getlength, + .protocol_name = "nbd" }; static void bdrv_nbd_init(void) -- 1.7.0.4