From: Sheng Qiu 1. virtio scsi initiator detect io timeout and vhost connection's liveness. 2. reconnect to vhost and resend pending io if current vhost connection is dead Signed-off-by: Sheng Qiu --- include/spdk/io_channel.h | 20 +++- include/spdk_internal/bdev.h | 10 ++ include/spdk_internal/virtio.h | 13 +++ lib/bdev/bdev.c | 2 +- lib/bdev/virtio/bdev_virtio_scsi.c | 201 +++++++++++++++++++++++++++++++++---- lib/util/io_channel.c | 33 +++--- lib/vhost/vhost.c | 1 + lib/virtio/virtio.c | 2 +- lib/virtio/virtio_user.c | 106 +++++++++++++++++++ 9 files changed, 353 insertions(+), 35 deletions(-) diff --git a/include/spdk/io_channel.h b/include/spdk/io_channel.h index 83a26ed..9e37f53 100644 --- a/include/spdk/io_channel.h +++ b/include/spdk/io_channel.h @@ -47,10 +47,10 @@ extern "C" { #endif struct spdk_thread; -struct spdk_io_channel; struct spdk_io_channel_iter; struct spdk_poller; + typedef void (*spdk_thread_fn)(void *ctx); typedef void (*spdk_thread_pass_msg)(spdk_thread_fn fn, void *ctx, void *thread_ctx); @@ -70,6 +70,21 @@ typedef void (*spdk_io_device_unregister_cb)(void *io_device); typedef void (*spdk_channel_msg)(struct spdk_io_channel_iter *i); typedef void (*spdk_channel_for_each_cpl)(struct spdk_io_channel_iter *i, int status); +struct spdk_io_channel { + struct spdk_thread *thread; + struct io_device *dev; + uint32_t ref; + TAILQ_ENTRY(spdk_io_channel) tailq; + spdk_io_channel_destroy_cb destroy_cb; + + /* + * Modules will allocate extra memory off the end of this structure + * to store references to hardware-specific references (i.e. NVMe queue + * pairs, or references to child device spdk_io_channels (i.e. + * virtual bdevs). + */ +}; + /** * \brief Initializes the calling thread for I/O channel allocation. * @@ -185,6 +200,9 @@ void spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unr */ struct spdk_io_channel *spdk_get_io_channel(void *io_device); +void spdk_get_all_io_channel(void *io_device, struct spdk_io_channel **ch_array, int size, + int *cnt); + /** * \brief Releases a reference to an I/O channel. This happens asynchronously. * diff --git a/include/spdk_internal/bdev.h b/include/spdk_internal/bdev.h index 2b49819..d12907c 100644 --- a/include/spdk_internal/bdev.h +++ b/include/spdk_internal/bdev.h @@ -374,6 +374,15 @@ struct spdk_bdev_io { /** It may be used by modules to put the bdev_io into its own list. */ TAILQ_ENTRY(spdk_bdev_io) module_link; + /** outstanding request queue link */ + TAILQ_ENTRY(spdk_bdev_io) outstanding_req_link; + + /** submit tick */ + uint64_t submit_tick; + + /** The I/O channel that this was submitted on. */ + struct spdk_io_channel *io_ch; + /** * Per I/O context for use by the bdev module. */ @@ -415,6 +424,7 @@ void spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb void spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status); +void spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io); /** * Complete a bdev_io with an NVMe status code. * diff --git a/include/spdk_internal/virtio.h b/include/spdk_internal/virtio.h index 69416d5..e6143e8 100644 --- a/include/spdk_internal/virtio.h +++ b/include/spdk_internal/virtio.h @@ -46,6 +46,7 @@ #include "spdk/json.h" #include "spdk/io_channel.h" #include "spdk/pci_ids.h" +#include "spdk_internal/bdev.h" /** * The maximum virtqueue size is 2^15. Use that value as the end of @@ -295,6 +296,10 @@ int virtio_dev_start(struct virtio_dev *vdev, uint16_t max_queues, */ void virtio_dev_stop(struct virtio_dev *vdev); +/* free the allocated queue */ +void +virtio_free_queues(struct virtio_dev *dev); + /** * Destruct a virtio device. Note that it must be in the stopped state. * The virtio_dev should be manually freed afterwards. @@ -453,6 +458,14 @@ int virtio_pci_dev_enumerate(virtio_pci_create_cb enum_cb, uint16_t pci_device_i int virtio_user_dev_init(struct virtio_dev *vdev, const char *name, const char *path, uint32_t queue_size); +/* check vhost aliveness */ +int +virtio_user_check_vhost_alive(struct virtio_dev *vdev); + +/* reconnect vhost */ +void +virtio_user_dev_reconnect(struct virtio_dev *vdev); + /** * Initialize virtio_dev for a given PCI device. * The virtio_dev has to be freed with \c virtio_dev_destruct. diff --git a/lib/bdev/bdev.c b/lib/bdev/bdev.c index c3aa063..3d34e44 100644 --- a/lib/bdev/bdev.c +++ b/lib/bdev/bdev.c @@ -767,7 +767,7 @@ spdk_bdev_put_io(struct spdk_bdev_io *bdev_io) } } -static void +void spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) { struct spdk_bdev *bdev = bdev_io->bdev; diff --git a/lib/bdev/virtio/bdev_virtio_scsi.c b/lib/bdev/virtio/bdev_virtio_scsi.c index 94f4835..0602597 100644 --- a/lib/bdev/virtio/bdev_virtio_scsi.c +++ b/lib/bdev/virtio/bdev_virtio_scsi.c @@ -66,6 +66,10 @@ #define VIRTIO_SCSI_EVENTQ 1 #define VIRTIO_SCSI_REQUESTQ 2 +/* io timeout value in second */ +#define VIRTIO_SCSI_IO_TIMEMOUT (1) + + static int bdev_virtio_initialize(void); static void bdev_virtio_finish(void); @@ -90,6 +94,9 @@ struct virtio_scsi_dev { /** Device marked for removal. */ bool removed; + + /** protect reconnect process */ + pthread_mutex_t reconn_mutex; }; struct virtio_scsi_io_ctx { @@ -165,6 +172,15 @@ struct bdev_virtio_io_channel { /** Virtio response poller. */ struct spdk_poller *poller; + + /** Outstanding Req in this channel */ + TAILQ_HEAD(virtio_outstanding_req_head, spdk_bdev_io) outstanding_req; + + /** protect channel's status update */ + pthread_mutex_t lock; + + /** whether channel is reconnecting */ + bool reconnecting; }; /** Module finish in progress */ @@ -268,6 +284,7 @@ virtio_scsi_dev_init(struct virtio_scsi_dev *svdev, uint16_t max_queues) TAILQ_INIT(&svdev->luns); svdev->scan_ctx = NULL; svdev->removed = false; + pthread_mutex_init(&svdev->reconn_mutex, NULL); spdk_io_device_register(svdev, bdev_virtio_scsi_ch_create_cb, bdev_virtio_scsi_ch_destroy_cb, @@ -445,29 +462,52 @@ bdev_virtio_send_io(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) struct bdev_virtio_io_channel *virtio_channel = spdk_io_channel_get_ctx(ch); struct virtqueue *vq = virtio_channel->vq; struct virtio_scsi_io_ctx *io_ctx = (struct virtio_scsi_io_ctx *)bdev_io->driver_ctx; - int rc; + int rc = 0; - rc = virtqueue_req_start(vq, bdev_io, bdev_io->u.bdev.iovcnt + 2); - if (rc == -ENOMEM) { - spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM); - return; - } else if (rc != 0) { - spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); - return; + /* this lock is per channel, only in reconnecting phase it has contention */ + pthread_mutex_lock(&virtio_channel->lock); + + /* only submit io to vq if it's not in reconnecting */ + if (!virtio_channel->reconnecting) { + + rc = virtqueue_req_start(vq, bdev_io, bdev_io->u.bdev.iovcnt + 2); + if (rc == -ENOMEM) { + SPDK_ERRLOG("no memory to submit io\n"); + spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM); + pthread_mutex_unlock(&virtio_channel->lock); + return; + } else if (rc != 0) { + SPDK_ERRLOG("io submit failed\n"); + spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); + pthread_mutex_unlock(&virtio_channel->lock); + return; + } + + virtqueue_req_add_iovs(vq, &io_ctx->iov_req, 1, SPDK_VIRTIO_DESC_RO); + if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { + virtqueue_req_add_iovs(vq, &io_ctx->iov_resp, 1, SPDK_VIRTIO_DESC_WR); + virtqueue_req_add_iovs(vq, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, + SPDK_VIRTIO_DESC_WR); + } else { + virtqueue_req_add_iovs(vq, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, + SPDK_VIRTIO_DESC_RO); + virtqueue_req_add_iovs(vq, &io_ctx->iov_resp, 1, SPDK_VIRTIO_DESC_WR); + } + + virtqueue_req_flush(vq); } - virtqueue_req_add_iovs(vq, &io_ctx->iov_req, 1, SPDK_VIRTIO_DESC_RO); - if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { - virtqueue_req_add_iovs(vq, &io_ctx->iov_resp, 1, SPDK_VIRTIO_DESC_WR); - virtqueue_req_add_iovs(vq, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, - SPDK_VIRTIO_DESC_WR); - } else { - virtqueue_req_add_iovs(vq, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, - SPDK_VIRTIO_DESC_RO); - virtqueue_req_add_iovs(vq, &io_ctx->iov_resp, 1, SPDK_VIRTIO_DESC_WR); + /* add io to pending queue if it's successfully submit or it's in reconnecting phase */ + if (rc == 0) { + bdev_io->submit_tick = spdk_get_ticks(); + bdev_io->io_ch = ch; + struct spdk_bdev_io *last = TAILQ_LAST(&virtio_channel->outstanding_req, + virtio_outstanding_req_head); + assert(last != bdev_io); + TAILQ_INSERT_TAIL(&virtio_channel->outstanding_req, bdev_io, outstanding_req_link); } - virtqueue_req_flush(vq); + pthread_mutex_unlock(&virtio_channel->lock); } static void @@ -592,6 +632,7 @@ static int _bdev_virtio_submit_request(struct spdk_io_channel *ch, struct spdk_b default: return -1; } + return 0; } @@ -700,6 +741,113 @@ bdev_virtio_io_cpl(struct spdk_bdev_io *bdev_io) spdk_bdev_io_complete_scsi_status(bdev_io, io_ctx->resp.status, sk, asc, ascq); } +static int +bdev_virtio_check_timeout(struct bdev_virtio_io_channel *ch) +{ + uint64_t t02; + struct spdk_bdev_io *bdev_io; + uint64_t timeout_tick = VIRTIO_SCSI_IO_TIMEMOUT * spdk_get_ticks_hz(); + + t02 = spdk_get_ticks(); + + bdev_io = TAILQ_FIRST(&ch->outstanding_req); + if (bdev_io == NULL) { + return 0; + } + + /* check the oldest request */ + if (bdev_io->submit_tick + timeout_tick > t02) { + return 0; + } else { + return 1; + } +} + +static void +bdev_virtio_update_all_channel_status(struct virtio_scsi_dev *svdev, bool is_reconnecting) +{ + struct spdk_io_channel *ch[1024] = {NULL}; + int cnt = 0, i; + struct bdev_virtio_io_channel *virtio_channel; + + spdk_get_all_io_channel(svdev, ch, 1024, &cnt); + for (i = 0; i < cnt; i++) { + virtio_channel = spdk_io_channel_get_ctx(ch[i]); + pthread_mutex_lock(&virtio_channel->lock); + virtio_channel->reconnecting = is_reconnecting; + pthread_mutex_unlock(&virtio_channel->lock); + } +} + +static void +bdev_virtio_update_all_io_channel_vq(struct virtio_scsi_dev *svdev) +{ + struct spdk_io_channel *ch[1024] = {NULL}; + int cnt = 0, i; + struct bdev_virtio_io_channel *virtio_channel; + int32_t queue_idx; + + spdk_get_all_io_channel(svdev, ch, 1024, &cnt); + for (i = 0; i < cnt; i++) { + virtio_channel = spdk_io_channel_get_ctx(ch[i]); + queue_idx = virtio_dev_find_and_acquire_queue(&svdev->vdev, VIRTIO_SCSI_REQUESTQ); + if (queue_idx < 0) { + SPDK_ERRLOG("Couldn't get an unused queue for the io_channel.\n"); + assert(0); + } + + virtio_channel->vq = svdev->vdev.vqs[queue_idx]; + virtio_channel->svdev = svdev; + } +} + +/* + * reconnect to vhost + * return only if reconnect is successful + * */ +static bool +bdev_virtio_reconnect(struct virtio_scsi_dev *svdev) +{ + bool reconnected = false; + /* only one thread can initiate reconnect */ + pthread_mutex_lock(&svdev->reconn_mutex); + if (virtio_user_check_vhost_alive(&svdev->vdev) != 0) { + /* lock all virtio channels */ + bdev_virtio_update_all_channel_status(svdev, true); + virtio_user_dev_reconnect(&svdev->vdev); + /* update all virtio channels pointing to new vqs*/ + bdev_virtio_update_all_io_channel_vq(svdev); + /* unlock all virtio channels */ + bdev_virtio_update_all_channel_status(svdev, false); + reconnected = true; + } + pthread_mutex_unlock(&svdev->reconn_mutex); + + return reconnected; +} + +/* + * resend pending requests + * */ +static void +bdev_virtio_resend_pending_req(struct virtio_scsi_dev *svdev, struct bdev_virtio_io_channel *ch) +{ + struct spdk_bdev_io *bdev_io, *tmp; + TAILQ_HEAD(virtio_outstanding_req_head, spdk_bdev_io) requests; + + TAILQ_INIT(&requests); + TAILQ_SWAP(&ch->outstanding_req, &requests, spdk_bdev_io, outstanding_req_link); + + TAILQ_FOREACH_SAFE(bdev_io, &requests, outstanding_req_link, tmp) { + TAILQ_REMOVE(&requests, (struct spdk_bdev_io *)bdev_io, outstanding_req_link); + + /* Make sure it's in PENDING status */ + if (bdev_io->status != SPDK_BDEV_IO_STATUS_PENDING) + abort(); + spdk_bdev_io_submit(bdev_io); + } +} + static void bdev_virtio_poll(void *arg) { @@ -730,6 +878,9 @@ bdev_virtio_poll(void *arg) continue; } + /* remove from outstanding queue */ + TAILQ_REMOVE(&ch->outstanding_req, (struct spdk_bdev_io *)io[i], outstanding_req_link); + bdev_virtio_io_cpl(io[i]); } @@ -750,6 +901,17 @@ bdev_virtio_poll(void *arg) _virtio_scsi_dev_scan_finish(scan_ctx, rc); } } + + return; + } + + if (cnt == 0 && bdev_virtio_check_timeout(ch)) { + /* block and reconnect */ + bool reconnected = bdev_virtio_reconnect(svdev); + + /* resend all outstanding io */ + if (reconnected) + bdev_virtio_resend_pending_req(svdev, ch); } } @@ -913,6 +1075,9 @@ bdev_virtio_scsi_ch_create_cb(void *io_device, void *ctx_buf) ch->svdev = svdev; ch->vq = vq; + ch->reconnecting = false; + pthread_mutex_init(&ch->lock, NULL); + TAILQ_INIT(&ch->outstanding_req); ch->poller = spdk_poller_register(bdev_virtio_poll, ch, 0); diff --git a/lib/util/io_channel.c b/lib/util/io_channel.c index e1ac2c2..0e803c5 100644 --- a/lib/util/io_channel.c +++ b/lib/util/io_channel.c @@ -60,20 +60,6 @@ struct io_device { static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); -struct spdk_io_channel { - struct spdk_thread *thread; - struct io_device *dev; - uint32_t ref; - TAILQ_ENTRY(spdk_io_channel) tailq; - spdk_io_channel_destroy_cb destroy_cb; - - /* - * Modules will allocate extra memory off the end of this structure - * to store references to hardware-specific references (i.e. NVMe queue - * pairs, or references to child device spdk_io_channels (i.e. - * virtual bdevs). - */ -}; struct spdk_thread { pthread_t thread_id; @@ -396,6 +382,25 @@ spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregist _spdk_io_device_attempt_free(dev); } +void +spdk_get_all_io_channel(void *io_device, struct spdk_io_channel **ch_array, int size, int *cnt) +{ + struct spdk_thread *thread; + struct spdk_io_channel *ch; + int i = 0; + + TAILQ_FOREACH(thread, &g_threads, tailq) { + TAILQ_FOREACH(ch, &thread->io_channels, tailq) { + if (ch->dev->io_device == io_device) { + ch_array[i++] = ch; + assert(i < size); + } + } + } + + *cnt = i; +} + struct spdk_io_channel * spdk_get_io_channel(void *io_device) { diff --git a/lib/vhost/vhost.c b/lib/vhost/vhost.c index bd3473e..79a2a21 100644 --- a/lib/vhost/vhost.c +++ b/lib/vhost/vhost.c @@ -1138,6 +1138,7 @@ destroy_connection(int vid) /* since pollers are not running it safe not to use spdk_event here */ vdev->vid = -1; + vdev->status = 0; pthread_mutex_unlock(&g_spdk_vhost_mutex); } diff --git a/lib/virtio/virtio.c b/lib/virtio/virtio.c index 3e22d9e..f4c0e5d 100644 --- a/lib/virtio/virtio.c +++ b/lib/virtio/virtio.c @@ -233,7 +233,7 @@ fail_q_alloc: return ret; } -static void +void virtio_free_queues(struct virtio_dev *dev) { uint16_t nr_vq = dev->max_queues; diff --git a/lib/virtio/virtio_user.c b/lib/virtio/virtio_user.c index d7010cc..86237a5 100644 --- a/lib/virtio/virtio_user.c +++ b/lib/virtio/virtio_user.c @@ -46,6 +46,10 @@ #include "spdk_internal/virtio.h" +/* reconnect interval in second */ +#define RECONNECT_INTERVAL (3) + + static int virtio_user_create_queue(struct virtio_dev *vdev, uint32_t queue_sel) { @@ -400,6 +404,108 @@ static const struct virtio_dev_ops virtio_user_ops = { .dump_json_config = virtio_user_dump_json_config, }; + + +int +virtio_user_check_vhost_alive(struct virtio_dev *vdev) +{ + struct virtio_user_dev *dev = vdev->ctx; + uint64_t host_max_queues; + int ret; + + /* ignore SIGPIPE, so won't exit on broken pipe */ + signal(SIGPIPE, SIG_IGN); + + /* Hack: use VHOST_USER_GET_QUEUE_NUM to check vhost alive */ + ret = dev->ops->send_request(dev, VHOST_USER_GET_QUEUE_NUM, &host_max_queues); + return ret; +} + +static int +vhost_user_check_vhost_socket(struct virtio_user_dev *dev) +{ + int fd; + struct sockaddr_un un; + ssize_t rc; + + fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (fd < 0) + return -1; + + memset(&un, 0, sizeof(un)); + un.sun_family = AF_UNIX; + + rc = snprintf(un.sun_path, sizeof(un.sun_path), "%s", dev->path); + if (rc < 0 || (size_t)rc >= sizeof(un.sun_path)) { + close(fd); + return -1; + } + + if (connect(fd, (struct sockaddr *)&un, sizeof(un)) < 0) { + close(fd); + return -1; + } + + close(fd); + return 0; +} + +static int +_virtio_user_del_queue(struct virtio_dev *vdev, uint32_t queue_sel) +{ + struct virtio_user_dev *dev = vdev->ctx; + close(dev->callfds[queue_sel]); + close(dev->kickfds[queue_sel]); + dev->callfds[queue_sel] = -1; + dev->kickfds[queue_sel] = -1; + return 0; +} + +static int +virtio_user_dev_reconnect_impl(struct virtio_dev *vdev, const char *name, const char *path, + uint32_t queue_size) +{ + virtio_user_queue_setup(vdev, _virtio_user_del_queue); + virtio_free_queues(vdev); + virtio_dev_destruct(vdev); + + virtio_user_dev_init(vdev, name, path, queue_size); + virtio_dev_start(vdev, vdev->max_queues - vdev->fixed_queues_num, vdev->fixed_queues_num); + return 0; +} + +void +virtio_user_dev_reconnect(struct virtio_dev *vdev) +{ + bool alive = false; + + while (1) { + struct virtio_user_dev *dev = vdev->ctx; + + /* check vhost liveness before reconnect */ + if (!alive) + alive = virtio_user_check_vhost_alive(vdev) == 0; + + if (!alive) { + if (vhost_user_check_vhost_socket(dev) == 0) { + SPDK_NOTICELOG("Reconnecting: start reconnect %s\n", dev->path); + char path[PATH_MAX]; + memcpy(path, dev->path, PATH_MAX); + virtio_user_dev_reconnect_impl(vdev, vdev->name, path, dev->queue_size); + + sleep(RECONNECT_INTERVAL); + } else { + /* vhost socket not ready, wait some time*/ + sleep(RECONNECT_INTERVAL); + } + } else { + break; + } + } + + SPDK_NOTICELOG("Reconnecting: done\n"); +} + int virtio_user_dev_init(struct virtio_dev *vdev, const char *name, const char *path, uint32_t queue_size) -- 1.9.1