From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from eggs.gnu.org ([140.186.70.92]:34394) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1R1wBg-000245-QO for qemu-devel@nongnu.org; Fri, 09 Sep 2011 04:11:50 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1R1wBe-0002Cq-SG for qemu-devel@nongnu.org; Fri, 09 Sep 2011 04:11:48 -0400 Received: from mail-vx0-f173.google.com ([209.85.220.173]:63219) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1R1wBe-0002Cm-KT for qemu-devel@nongnu.org; Fri, 09 Sep 2011 04:11:46 -0400 Received: by vxj15 with SMTP id 15so563224vxj.4 for ; Fri, 09 Sep 2011 01:11:46 -0700 (PDT) Sender: Paolo Bonzini From: Paolo Bonzini Date: Fri, 9 Sep 2011 10:11:38 +0200 Message-Id: <1315555898-16957-1-git-send-email-pbonzini@redhat.com> In-Reply-To: <871uvquxi0.wl%morita.kazutaka@lab.ntt.co.jp> References: <871uvquxi0.wl%morita.kazutaka@lab.ntt.co.jp> Subject: [Qemu-devel] [PATCH v2 09/12] sheepdog: move coroutine send/recv function to generic code List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: qemu-devel@nongnu.org Cc: MORITA Kazutaka Outside coroutines, avoid busy waiting on EAGAIN by temporarily making the socket blocking. The API of qemu_recvv/qemu_sendv is slightly different from do_readv/do_writev because they do not handle coroutines. It returns the number of bytes written before encountering an EAGAIN. The specificity of yielding on EAGAIN is entirely in qemu-coroutine.c. Cc: MORITA Kazutaka Signed-off-by: Paolo Bonzini --- Thanks for the review. I checked with qemu-io that all of readv -v 0 524288 (x8) readv -v 0 262144 (x16) readv -v 0 1024 (x4096) readv -v 0 1536 (x2730) 1024 readv -v 0 1024 512 (x2730) 1024 work and produce the same output, while previously they would fail. Looks like it's hard to trigger the code just with qemu. block/sheepdog.c | 225 ++++++------------------------------------------------ cutils.c | 103 +++++++++++++++++++++++++ qemu-common.h | 3 + qemu-coroutine.c | 70 +++++++++++++++++ qemu-coroutine.h | 26 ++++++ 5 files changed, 225 insertions(+), 202 deletions(-) diff --git a/block/sheepdog.c b/block/sheepdog.c index af696a5..94e62a3 100644 --- a/block/sheepdog.c +++ b/block/sheepdog.c @@ -443,129 +443,6 @@ static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov, return acb; } -#ifdef _WIN32 - -struct msghdr { - struct iovec *msg_iov; - size_t msg_iovlen; -}; - -static ssize_t sendmsg(int s, const struct msghdr *msg, int flags) -{ - size_t size = 0; - char *buf, *p; - int i, ret; - - /* count the msg size */ - for (i = 0; i < msg->msg_iovlen; i++) { - size += msg->msg_iov[i].iov_len; - } - buf = g_malloc(size); - - p = buf; - for (i = 0; i < msg->msg_iovlen; i++) { - memcpy(p, msg->msg_iov[i].iov_base, msg->msg_iov[i].iov_len); - p += msg->msg_iov[i].iov_len; - } - - ret = send(s, buf, size, flags); - - g_free(buf); - return ret; -} - -static ssize_t recvmsg(int s, struct msghdr *msg, int flags) -{ - size_t size = 0; - char *buf, *p; - int i, ret; - - /* count the msg size */ - for (i = 0; i < msg->msg_iovlen; i++) { - size += msg->msg_iov[i].iov_len; - } - buf = g_malloc(size); - - ret = qemu_recv(s, buf, size, flags); - if (ret < 0) { - goto out; - } - - p = buf; - for (i = 0; i < msg->msg_iovlen; i++) { - memcpy(msg->msg_iov[i].iov_base, p, msg->msg_iov[i].iov_len); - p += msg->msg_iov[i].iov_len; - } -out: - g_free(buf); - return ret; -} - -#endif - -/* - * Send/recv data with iovec buffers - * - * This function send/recv data from/to the iovec buffer directly. - * The first `offset' bytes in the iovec buffer are skipped and next - * `len' bytes are used. - * - * For example, - * - * do_send_recv(sockfd, iov, len, offset, 1); - * - * is equals to - * - * char *buf = malloc(size); - * iov_to_buf(iov, iovcnt, buf, offset, size); - * send(sockfd, buf, size, 0); - * free(buf); - */ -static int do_send_recv(int sockfd, struct iovec *iov, int len, int offset, - int write) -{ - struct msghdr msg; - int ret, diff; - - memset(&msg, 0, sizeof(msg)); - msg.msg_iov = iov; - msg.msg_iovlen = 1; - - len += offset; - - while (iov->iov_len < len) { - len -= iov->iov_len; - - iov++; - msg.msg_iovlen++; - } - - diff = iov->iov_len - len; - iov->iov_len -= diff; - - while (msg.msg_iov->iov_len <= offset) { - offset -= msg.msg_iov->iov_len; - - msg.msg_iov++; - msg.msg_iovlen--; - } - - msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base + offset; - msg.msg_iov->iov_len -= offset; - - if (write) { - ret = sendmsg(sockfd, &msg, 0); - } else { - ret = recvmsg(sockfd, &msg, 0); - } - - msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base - offset; - msg.msg_iov->iov_len += offset; - - iov->iov_len += diff; - return ret; -} - static int connect_to_sdog(const char *addr, const char *port) { char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; @@ -618,65 +495,6 @@ success: return fd; } -static int do_readv_writev(int sockfd, struct iovec *iov, int len, - int iov_offset, int write) -{ - int ret; -again: - ret = do_send_recv(sockfd, iov, len, iov_offset, write); - if (ret < 0) { - if (errno == EINTR) { - goto again; - } - if (errno == EAGAIN) { - if (qemu_in_coroutine()) { - qemu_coroutine_yield(); - } - goto again; - } - error_report("failed to recv a rsp, %s", strerror(errno)); - return 1; - } - - iov_offset += ret; - len -= ret; - if (len) { - goto again; - } - - return 0; -} - -static int do_readv(int sockfd, struct iovec *iov, int len, int iov_offset) -{ - return do_readv_writev(sockfd, iov, len, iov_offset, 0); -} - -static int do_writev(int sockfd, struct iovec *iov, int len, int iov_offset) -{ - return do_readv_writev(sockfd, iov, len, iov_offset, 1); -} - -static int do_read_write(int sockfd, void *buf, int len, int write) -{ - struct iovec iov; - - iov.iov_base = buf; - iov.iov_len = len; - - return do_readv_writev(sockfd, &iov, len, 0, write); -} - -static int do_read(int sockfd, void *buf, int len) -{ - return do_read_write(sockfd, buf, len, 0); -} - -static int do_write(int sockfd, void *buf, int len) -{ - return do_read_write(sockfd, buf, len, 1); -} - static int send_req(int sockfd, SheepdogReq *hdr, void *data, unsigned int *wlen) { @@ -691,10 +509,9 @@ static int send_req(int sockfd, SheepdogReq *hdr, void *data, iov[1].iov_len = *wlen; } - ret = do_writev(sockfd, iov, sizeof(*hdr) + *wlen, 0); - if (ret) { + ret = qemu_sendv(sockfd, iov, sizeof(*hdr) + *wlen, 0); + if (ret < 0) { error_report("failed to send a req, %s", strerror(errno)); - ret = -1; } return ret; @@ -704,17 +521,19 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data, unsigned int *wlen, unsigned int *rlen) { int ret; + struct iovec iov; + socket_set_block(sockfd); ret = send_req(sockfd, hdr, data, wlen); - if (ret) { - ret = -1; + if (ret < 0) { goto out; } - ret = do_read(sockfd, hdr, sizeof(*hdr)); - if (ret) { + iov.iov_base = hdr; + iov.iov_len = sizeof(*hdr); + ret = qemu_recvv(sockfd, &iov, sizeof(*hdr), 0); + if (ret < 0) { error_report("failed to get a rsp, %s", strerror(errno)); - ret = -1; goto out; } @@ -723,15 +542,17 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data, } if (*rlen) { - ret = do_read(sockfd, data, *rlen); - if (ret) { + iov.iov_base = data; + iov.iov_len = *rlen; + ret = qemu_recvv(sockfd, &iov, *rlen, 0); + if (ret < 0) { error_report("failed to get the data, %s", strerror(errno)); - ret = -1; goto out; } } ret = 0; out: + socket_set_nonblock(sockfd); return ret; } @@ -793,8 +614,8 @@ static void coroutine_fn aio_read_response(void *opaque) } /* read a header */ - ret = do_read(fd, &rsp, sizeof(rsp)); - if (ret) { + ret = qemu_co_recv(fd, &rsp, sizeof(rsp)); + if (ret < 0) { error_report("failed to get the header, %s", strerror(errno)); goto out; } @@ -839,9 +660,9 @@ static void coroutine_fn aio_read_response(void *opaque) } break; case AIOCB_READ_UDATA: - ret = do_readv(fd, acb->qiov->iov, rsp.data_length, - aio_req->iov_offset); - if (ret) { + ret = qemu_co_recvv(fd, acb->qiov->iov, rsp.data_length, + aio_req->iov_offset); + if (ret < 0) { error_report("failed to get the data, %s", strerror(errno)); goto out; } @@ -1114,15 +935,15 @@ static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, set_cork(s->fd, 1); /* send a header */ - ret = do_write(s->fd, &hdr, sizeof(hdr)); - if (ret) { + ret = qemu_co_send(s->fd, &hdr, sizeof(hdr)); + if (ret < 0) { error_report("failed to send a req, %s", strerror(errno)); return -EIO; } if (wlen) { - ret = do_writev(s->fd, iov, wlen, aio_req->iov_offset); - if (ret) { + ret = qemu_co_sendv(s->fd, iov, wlen, aio_req->iov_offset); + if (ret < 0) { error_report("failed to send a data, %s", strerror(errno)); return -EIO; } diff --git a/cutils.c b/cutils.c index c91f887..229794e 100644 --- a/cutils.c +++ b/cutils.c @@ -25,6 +25,8 @@ #include "host-utils.h" #include +#include "qemu_socket.h" + void pstrcpy(char *buf, int buf_size, const char *str) { int c; @@ -415,3 +417,108 @@ int64_t strtosz(const char *nptr, char **end) { return strtosz_suffix(nptr, end, STRTOSZ_DEFSUFFIX_MB); } + +/* + * Send/recv data with iovec buffers + * + * This function send/recv data from/to the iovec buffer directly. + * The first `offset' bytes in the iovec buffer are skipped and next + * `len' bytes are used. + * + * For example, + * + * do_sendv_recvv(sockfd, iov, len, offset, 1); + * + * is equal to + * + * char *buf = malloc(size); + * iov_to_buf(iov, iovcnt, buf, offset, size); + * send(sockfd, buf, size, 0); + * free(buf); + */ +static int do_sendv_recvv(int sockfd, struct iovec *iov, int len, int offset, + int do_sendv) +{ + int ret, diff, iovlen; + struct iovec *last_iov; + + /* last_iov is inclusive, so count from one. */ + iovlen = 1; + last_iov = iov; + len += offset; + + while (last_iov->iov_len < len) { + len -= last_iov->iov_len; + + last_iov++; + iovlen++; + } + + diff = last_iov->iov_len - len; + last_iov->iov_len -= diff; + + while (iov->iov_len <= offset) { + offset -= iov->iov_len; + + iov++; + iovlen--; + } + + iov->iov_base = (char *) iov->iov_base + offset; + iov->iov_len -= offset; + + { +#ifdef CONFIG_IOVEC + struct msghdr msg; + memset(&msg, 0, sizeof(msg)); + msg.msg_iov = iov; + msg.msg_iovlen = iovlen; + + do { + if (do_sendv) { + ret = sendmsg(sockfd, &msg, 0); + } else { + ret = recvmsg(sockfd, &msg, 0); + } + } while (ret == -1 && errno == EINTR); +#else + struct iovec *p = iov; + ret = 0; + while (iovlen > 0) { + int rc; + if (do_sendv) { + rc = send(sockfd, p->iov_base, p->iov_len, 0); + } else { + rc = qemu_recv(sockfd, p->iov_base, p->iov_len, 0); + } + if (rc == -1) { + if (errno == EINTR) { + continue; + } + if (ret == 0) { + ret = -1; + } + break; + } + iovlen--, p++; + ret += rc; + } +#endif + } + + /* Undo the changes above */ + iov->iov_base = (char *) iov->iov_base - offset; + iov->iov_len += offset; + last_iov->iov_len += diff; + return ret; +} + +int qemu_recvv(int sockfd, struct iovec *iov, int len, int iov_offset) +{ + return do_sendv_recvv(sockfd, iov, len, iov_offset, 0); +} + +int qemu_sendv(int sockfd, struct iovec *iov, int len, int iov_offset) +{ + return do_sendv_recvv(sockfd, iov, len, iov_offset, 1); +} diff --git a/qemu-common.h b/qemu-common.h index 404c421..fc921cc 100644 --- a/qemu-common.h +++ b/qemu-common.h @@ -203,6 +203,9 @@ int qemu_pipe(int pipefd[2]); #define qemu_recv(sockfd, buf, len, flags) recv(sockfd, buf, len, flags) #endif +int qemu_recvv(int sockfd, struct iovec *iov, int len, int iov_offset); +int qemu_sendv(int sockfd, struct iovec *iov, int len, int iov_offset); + /* Error handling. */ void QEMU_NORETURN hw_error(const char *fmt, ...) GCC_FMT_ATTR(1, 2); diff --git a/qemu-coroutine.c b/qemu-coroutine.c index 600be26..f5abbb9 100644 --- a/qemu-coroutine.c +++ b/qemu-coroutine.c @@ -73,3 +73,69 @@ void coroutine_fn qemu_coroutine_yield(void) self->caller = NULL; coroutine_swap(self, to); } + +int coroutine_fn qemu_co_recvv(int sockfd, struct iovec *iov, + int len, int iov_offset) +{ + int total = 0; + int ret; + while (len) { + ret = qemu_recvv(sockfd, iov, len, iov_offset + total); + if (ret < 0) { + if (errno == EAGAIN) { + qemu_coroutine_yield(); + continue; + } + if (total == 0) { + total = -1; + } + break; + } + total += ret, len -= ret; + } + + return total; +} + +int coroutine_fn qemu_co_sendv(int sockfd, struct iovec *iov, + int len, int iov_offset) +{ + int total = 0; + int ret; + while (len) { + ret = qemu_sendv(sockfd, iov, len, iov_offset + total); + if (ret < 0) { + if (errno == EAGAIN) { + qemu_coroutine_yield(); + continue; + } + if (total == 0) { + total = -1; + } + break; + } + total += ret, len -= ret; + } + + return total; +} + +int coroutine_fn qemu_co_recv(int sockfd, void *buf, int len) +{ + struct iovec iov; + + iov.iov_base = buf; + iov.iov_len = len; + + return qemu_co_recvv(sockfd, &iov, len, 0); +} + +int coroutine_fn qemu_co_send(int sockfd, void *buf, int len) +{ + struct iovec iov; + + iov.iov_base = buf; + iov.iov_len = len; + + return qemu_co_sendv(sockfd, &iov, len, 0); +} diff --git a/qemu-coroutine.h b/qemu-coroutine.h index b8fc4f4..a1a41f6 100644 --- a/qemu-coroutine.h +++ b/qemu-coroutine.h @@ -188,4 +188,30 @@ void qemu_co_rwlock_wrlock(CoRwlock *lock); */ void qemu_co_rwlock_unlock(CoRwlock *lock); +/** + * Sends an iovec (or optionally a part of it) down a socket, yielding + * when the socket is full. + */ +int coroutine_fn qemu_co_sendv(int sockfd, struct iovec *iov, + int len, int iov_offset); + +/** + * Receives data into an iovec (or optionally into a part of it) from + * a socket, yielding when there is no data in the socket. + */ +int coroutine_fn qemu_co_recvv(int sockfd, struct iovec *iov, + int len, int iov_offset); + + +/** + * Sends a buffer down a socket, yielding when the socket is full. + */ +int coroutine_fn qemu_co_send(int sockfd, void *buf, int len); + +/** + * Receives data into a buffer from a socket, yielding when there + * is no data in the socket. + */ +int coroutine_fn qemu_co_recv(int sockfd, void *buf, int len); + #endif /* QEMU_COROUTINE_H */ -- 1.7.6