The chr_out chardev is connected to a filter-redirector running in the main loop. qemu_chr_fe_write_all might block here in compare_chr_send if the (socket-)buffer is full. If another filter-redirector in the main loop want's to send data to chr_pri_in it might also block if the buffer is full. This leads to a deadlock because both event loops get blocked. Fix this by converting compare_chr_send to a coroutine and putting the packets in a send queue. Also create a new function notify_chr_send, since that should be independend. Signed-off-by: Lukas Straub --- net/colo-compare.c | 173 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 130 insertions(+), 43 deletions(-) diff --git a/net/colo-compare.c b/net/colo-compare.c index 1de4220fe2..ff6a740284 100644 --- a/net/colo-compare.c +++ b/net/colo-compare.c @@ -32,6 +32,9 @@ #include "migration/migration.h" #include "util.h" +#include "block/aio-wait.h" +#include "qemu/coroutine.h" + #define TYPE_COLO_COMPARE "colo-compare" #define COLO_COMPARE(obj) \ OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) @@ -77,6 +80,20 @@ static int event_unhandled_count; * |packet | |packet + |packet | |packet + * +--------+ +--------+ +--------+ +--------+ */ + +typedef struct SendCo { + Coroutine *co; + GQueue send_list; + bool done; + int ret; +} SendCo; + +typedef struct SendEntry { + uint32_t size; + uint32_t vnet_hdr_len; + uint8_t buf[]; +} SendEntry; + typedef struct CompareState { Object parent; @@ -91,6 +108,7 @@ typedef struct CompareState { SocketReadState pri_rs; SocketReadState sec_rs; SocketReadState notify_rs; + SendCo sendco; bool vnet_hdr; uint32_t compare_timeout; uint32_t expired_scan_cycle; @@ -126,8 +144,11 @@ enum { static int compare_chr_send(CompareState *s, const uint8_t *buf, uint32_t size, - uint32_t vnet_hdr_len, - bool notify_remote_frame); + uint32_t vnet_hdr_len); + +static int notify_chr_send(CompareState *s, + const uint8_t *buf, + uint32_t size); static bool packet_matches_str(const char *str, const uint8_t *buf, @@ -145,7 +166,7 @@ static void notify_remote_frame(CompareState *s) char msg[] = "DO_CHECKPOINT"; int ret = 0; - ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true); + ret = notify_chr_send(s, (uint8_t *)msg, strlen(msg)); if (ret < 0) { error_report("Notify Xen COLO-frame failed"); } @@ -271,8 +292,7 @@ static void colo_release_primary_pkt(CompareState *s, Packet *pkt) ret = compare_chr_send(s, pkt->data, pkt->size, - pkt->vnet_hdr_len, - false); + pkt->vnet_hdr_len); if (ret < 0) { error_report("colo send primary packet failed"); } @@ -699,63 +719,123 @@ static void colo_compare_connection(void *opaque, void *user_data) } } -static int compare_chr_send(CompareState *s, - const uint8_t *buf, - uint32_t size, - uint32_t vnet_hdr_len, - bool notify_remote_frame) +static void coroutine_fn _compare_chr_send(void *opaque) { + CompareState *s = opaque; + SendCo *sendco = &s->sendco; int ret = 0; - uint32_t len = htonl(size); - if (!size) { - return 0; - } + while (!g_queue_is_empty(&sendco->send_list)) { + SendEntry *entry = g_queue_pop_tail(&sendco->send_list); + uint32_t len = htonl(entry->size); - if (notify_remote_frame) { - ret = qemu_chr_fe_write_all(&s->chr_notify_dev, - (uint8_t *)&len, - sizeof(len)); - } else { ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len)); - } - if (ret != sizeof(len)) { - goto err; - } + if (ret != sizeof(len)) { + g_free(entry); + goto err; + } - if (s->vnet_hdr) { - /* - * We send vnet header len make other module(like filter-redirector) - * know how to parse net packet correctly. - */ - len = htonl(vnet_hdr_len); + if (s->vnet_hdr) { + /* + * We send vnet header len make other module(like filter-redirector) + * know how to parse net packet correctly. + */ + len = htonl(entry->vnet_hdr_len); - if (!notify_remote_frame) { ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len)); + + if (ret != sizeof(len)) { + g_free(entry); + goto err; + } } - if (ret != sizeof(len)) { + ret = qemu_chr_fe_write_all(&s->chr_out, + (uint8_t *)entry->buf, + entry->size); + + if (ret != entry->size) { + g_free(entry); goto err; } + + g_free(entry); } - if (notify_remote_frame) { - ret = qemu_chr_fe_write_all(&s->chr_notify_dev, - (uint8_t *)buf, - size); - } else { - ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size); + sendco->ret = 0; + goto out; + +err: + while (!g_queue_is_empty(&sendco->send_list)) { + SendEntry *entry = g_queue_pop_tail(&sendco->send_list); + g_free(entry); } + sendco->ret = ret < 0 ? ret : -EIO; +out: + sendco->co = NULL; + sendco->done = true; + aio_wait_kick(); +} + +static int compare_chr_send(CompareState *s, + const uint8_t *buf, + uint32_t size, + uint32_t vnet_hdr_len) +{ + SendCo *sendco = &s->sendco; + SendEntry *entry; + + if (!size) { + return 0; + } + + entry = g_malloc(sizeof(SendEntry) + size); + entry->size = size; + entry->vnet_hdr_len = vnet_hdr_len; + memcpy(entry->buf, buf, size); + g_queue_push_head(&sendco->send_list, entry); + + if (sendco->done) { + sendco->co = qemu_coroutine_create(_compare_chr_send, s); + sendco->done = false; + qemu_coroutine_enter(sendco->co); + if (sendco->done) { + /* report early errors */ + return sendco->ret; + } + } + + /* assume success */ + return 0; +} + +static int notify_chr_send(CompareState *s, + const uint8_t *buf, + uint32_t size) +{ + int ret = 0; + uint32_t len = htonl(size); + + ret = qemu_chr_fe_write_all(&s->chr_notify_dev, + (uint8_t *)&len, + sizeof(len)); + + if (ret != sizeof(len)) { + goto err; + } + + ret = qemu_chr_fe_write_all(&s->chr_notify_dev, + (uint8_t *)buf, + size); if (ret != size) { goto err; } return 0; - err: return ret < 0 ? ret : -EIO; } @@ -1062,8 +1142,7 @@ static void compare_pri_rs_finalize(SocketReadState *pri_rs) compare_chr_send(s, pri_rs->buf, pri_rs->packet_len, - pri_rs->vnet_hdr_len, - false); + pri_rs->vnet_hdr_len); } else { /* compare packet in the specified connection */ colo_compare_connection(conn, s); @@ -1093,7 +1172,7 @@ static void compare_notify_rs_finalize(SocketReadState *notify_rs) if (packet_matches_str("COLO_USERSPACE_PROXY_INIT", notify_rs->buf, notify_rs->packet_len)) { - ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true); + ret = notify_chr_send(s, (uint8_t *)msg, strlen(msg)); if (ret < 0) { error_report("Notify Xen COLO-frame INIT failed"); } @@ -1199,6 +1278,9 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) QTAILQ_INSERT_TAIL(&net_compares, s, next); + s->sendco.done = true; + g_queue_init(&s->sendco.send_list); + g_queue_init(&s->conn_list); qemu_mutex_init(&event_mtx); @@ -1224,8 +1306,7 @@ static void colo_flush_packets(void *opaque, void *user_data) compare_chr_send(s, pkt->data, pkt->size, - pkt->vnet_hdr_len, - false); + pkt->vnet_hdr_len); packet_destroy(pkt, NULL); } while (!g_queue_is_empty(&conn->secondary_list)) { @@ -1281,6 +1362,11 @@ static void colo_compare_finalize(Object *obj) CompareState *s = COLO_COMPARE(obj); CompareState *tmp = NULL; + AioContext *ctx = iothread_get_aio_context(s->iothread); + aio_context_acquire(ctx); + AIO_WAIT_WHILE(ctx, !s->sendco.done); + aio_context_release(ctx); + qemu_chr_fe_deinit(&s->chr_pri_in, false); qemu_chr_fe_deinit(&s->chr_sec_in, false); qemu_chr_fe_deinit(&s->chr_out, false); @@ -1305,6 +1391,7 @@ static void colo_compare_finalize(Object *obj) g_queue_foreach(&s->conn_list, colo_flush_packets, s); g_queue_clear(&s->conn_list); + g_queue_clear(&s->sendco.send_list); if (s->connection_track_table) { g_hash_table_destroy(s->connection_track_table); -- 2.20.1