On Wed, 29 Apr 2020 05:37:17 +0000 "Zhang, Chen" wrote: > > -----Original Message----- > > From: Lukas Straub > > Sent: Monday, April 27, 2020 3:22 PM > > To: Zhang, Chen > > Cc: qemu-devel ; Li Zhijian > > ; Jason Wang ; Marc- > > André Lureau ; Paolo Bonzini > > > > Subject: Re: [PATCH v3 3/6] net/colo-compare.c: Fix deadlock in > > compare_chr_send > > > > On Mon, 27 Apr 2020 03:36:57 +0000 > > "Zhang, Chen" wrote: > > > > > > -----Original Message----- > > > > From: Lukas Straub > > > > Sent: Monday, April 27, 2020 5:19 AM > > > > To: qemu-devel > > > > Cc: Zhang, Chen ; Li Zhijian > > > > ; Jason Wang ; Marc- > > > > André Lureau ; Paolo Bonzini > > > > > > > > Subject: [PATCH v3 3/6] net/colo-compare.c: Fix deadlock in > > > > compare_chr_send > > > > > > > > The chr_out chardev is connected to a filter-redirector running in > > > > the main loop. qemu_chr_fe_write_all might block here in > > > > compare_chr_send if the (socket-)buffer is full. > > > > If another filter-redirector in the main loop want's to send data to > > > > chr_pri_in it might also block if the buffer is full. This leads to > > > > a deadlock because both event loops get blocked. > > > > > > > > Fix this by converting compare_chr_send to a coroutine and putting > > > > the packets in a send queue. Also create a new function > > > > notify_chr_send, since that should be independend. > > > > > > > > Signed-off-by: Lukas Straub > > > > --- > > > > net/colo-compare.c | 173 ++++++++++++++++++++++++++++++++++--- > > ---- > > > > ---- > > > > 1 file changed, 130 insertions(+), 43 deletions(-) > > > > > > > > diff --git a/net/colo-compare.c b/net/colo-compare.c index > > > > 1de4220fe2..ff6a740284 100644 > > > > --- a/net/colo-compare.c > > > > +++ b/net/colo-compare.c > > > > @@ -32,6 +32,9 @@ > > > > #include "migration/migration.h" > > > > #include "util.h" > > > > > > > > +#include "block/aio-wait.h" > > > > +#include "qemu/coroutine.h" > > > > + > > > > #define TYPE_COLO_COMPARE "colo-compare" > > > > #define COLO_COMPARE(obj) \ > > > > OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) @@ - > > 77,6 > > > > +80,20 @@ static int event_unhandled_count; > > > > * |packet | |packet + |packet | |packet + > > > > * +--------+ +--------+ +--------+ +--------+ > > > > */ > > > > + > > > > +typedef struct SendCo { > > > > + Coroutine *co; > > > > + GQueue send_list; > > > > + bool done; > > > > + int ret; > > > > +} SendCo; > > > > + > > > > +typedef struct SendEntry { > > > > + uint32_t size; > > > > + uint32_t vnet_hdr_len; > > > > + uint8_t buf[]; > > > > +} SendEntry; > > > > + > > > > typedef struct CompareState { > > > > Object parent; > > > > > > > > @@ -91,6 +108,7 @@ typedef struct CompareState { > > > > SocketReadState pri_rs; > > > > SocketReadState sec_rs; > > > > SocketReadState notify_rs; > > > > + SendCo sendco; > > > > bool vnet_hdr; > > > > uint32_t compare_timeout; > > > > uint32_t expired_scan_cycle; > > > > @@ -126,8 +144,11 @@ enum { > > > > static int compare_chr_send(CompareState *s, > > > > const uint8_t *buf, > > > > uint32_t size, > > > > - uint32_t vnet_hdr_len, > > > > - bool notify_remote_frame); > > > > + uint32_t vnet_hdr_len); > > > > + > > > > +static int notify_chr_send(CompareState *s, > > > > + const uint8_t *buf, > > > > + uint32_t size); > > > > > > > > static bool packet_matches_str(const char *str, > > > > const uint8_t *buf, @@ -145,7 +166,7 > > > > @@ static void notify_remote_frame(CompareState *s) > > > > char msg[] = "DO_CHECKPOINT"; > > > > int ret = 0; > > > > > > > > - ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true); > > > > + ret = notify_chr_send(s, (uint8_t *)msg, strlen(msg)); > > > > if (ret < 0) { > > > > error_report("Notify Xen COLO-frame failed"); > > > > } > > > > @@ -271,8 +292,7 @@ static void > > > > colo_release_primary_pkt(CompareState > > > > *s, Packet *pkt) > > > > ret = compare_chr_send(s, > > > > pkt->data, > > > > pkt->size, > > > > - pkt->vnet_hdr_len, > > > > - false); > > > > + pkt->vnet_hdr_len); > > > > if (ret < 0) { > > > > error_report("colo send primary packet failed"); > > > > } > > > > @@ -699,63 +719,123 @@ static void colo_compare_connection(void > > > > *opaque, void *user_data) > > > > } > > > > } > > > > > > > > -static int compare_chr_send(CompareState *s, > > > > - const uint8_t *buf, > > > > - uint32_t size, > > > > - uint32_t vnet_hdr_len, > > > > - bool notify_remote_frame) > > > > +static void coroutine_fn _compare_chr_send(void *opaque) > > > > { > > > > + CompareState *s = opaque; > > > > + SendCo *sendco = &s->sendco; > > > > int ret = 0; > > > > - uint32_t len = htonl(size); > > > > > > > > - if (!size) { > > > > - return 0; > > > > - } > > > > + while (!g_queue_is_empty(&sendco->send_list)) { > > > > + SendEntry *entry = g_queue_pop_tail(&sendco->send_list); > > > > + uint32_t len = htonl(entry->size); > > > > > > > > - if (notify_remote_frame) { > > > > - ret = qemu_chr_fe_write_all(&s->chr_notify_dev, > > > > - (uint8_t *)&len, > > > > - sizeof(len)); > > > > - } else { > > > > ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, > > sizeof(len)); > > > > - } > > > > > > > > - if (ret != sizeof(len)) { > > > > - goto err; > > > > - } > > > > + if (ret != sizeof(len)) { > > > > + g_free(entry); > > > > + goto err; > > > > + } > > > > > > > > - if (s->vnet_hdr) { > > > > - /* > > > > - * We send vnet header len make other module(like filter-redirector) > > > > - * know how to parse net packet correctly. > > > > - */ > > > > - len = htonl(vnet_hdr_len); > > > > + if (s->vnet_hdr) { > > > > + /* > > > > + * We send vnet header len make other module(like filter- > > redirector) > > > > + * know how to parse net packet correctly. > > > > + */ > > > > + len = htonl(entry->vnet_hdr_len); > > > > > > > > - if (!notify_remote_frame) { > > > > ret = qemu_chr_fe_write_all(&s->chr_out, > > > > (uint8_t *)&len, > > > > sizeof(len)); > > > > + > > > > + if (ret != sizeof(len)) { > > > > + g_free(entry); > > > > + goto err; > > > > + } > > > > } > > > > > > > > - if (ret != sizeof(len)) { > > > > + ret = qemu_chr_fe_write_all(&s->chr_out, > > > > + (uint8_t *)entry->buf, > > > > + entry->size); > > > > + > > > > + if (ret != entry->size) { > > > > + g_free(entry); > > > > goto err; > > > > } > > > > + > > > > + g_free(entry); > > > > } > > > > > > > > - if (notify_remote_frame) { > > > > - ret = qemu_chr_fe_write_all(&s->chr_notify_dev, > > > > - (uint8_t *)buf, > > > > - size); > > > > - } else { > > > > - ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size); > > > > + sendco->ret = 0; > > > > + goto out; > > > > + > > > > +err: > > > > + while (!g_queue_is_empty(&sendco->send_list)) { > > > > + SendEntry *entry = g_queue_pop_tail(&sendco->send_list); > > > > + g_free(entry); > > > > } > > > > + sendco->ret = ret < 0 ? ret : -EIO; > > > > +out: > > > > + sendco->co = NULL; > > > > + sendco->done = true; > > > > + aio_wait_kick(); > > > > +} > > > > + > > > > +static int compare_chr_send(CompareState *s, > > > > + const uint8_t *buf, > > > > + uint32_t size, > > > > + uint32_t vnet_hdr_len) { > > > > + SendCo *sendco = &s->sendco; > > > > + SendEntry *entry; > > > > + > > > > + if (!size) { > > > > + return 0; > > > > + } > > > > + > > > > + entry = g_malloc(sizeof(SendEntry) + size); > > > > + entry->size = size; > > > > + entry->vnet_hdr_len = vnet_hdr_len; > > > > + memcpy(entry->buf, buf, size); > > > > + g_queue_push_head(&sendco->send_list, entry); > > > > + > > > > + if (sendco->done) { > > > > + sendco->co = qemu_coroutine_create(_compare_chr_send, s); > > > > + sendco->done = false; > > > > + qemu_coroutine_enter(sendco->co); > > > > + if (sendco->done) { > > > > + /* report early errors */ > > > > + return sendco->ret; > > > > + } > > > > + } > > > > + > > > > + /* assume success */ > > > > + return 0; > > > > +} > > > > + > > > > > > Why not make notify_chr_send same as compare_chr_send? > > > > Hello, > > The notify chardev_dev is not affected from this deadlock issue and is > > independent from the outdev chardev. So it wouldn't make sense for notify > > messages to wait in the queue if the outdev chardev is blocked. Also, the > > code is easier to understand this way. > > > > Yes, I means maybe the deadlock issue will also occur in Xen COLO side, we can resolve the potential problem here. Ok, I will change it in the next version. > Thanks > Zhang Chen > > > Regards, > > Lukas Straub > > > > > Thanks > > > Zhang Chen > > > > > > > +static int notify_chr_send(CompareState *s, > > > > + const uint8_t *buf, > > > > + uint32_t size) { > > > > + int ret = 0; > > > > + uint32_t len = htonl(size); > > > > + > > > > + ret = qemu_chr_fe_write_all(&s->chr_notify_dev, > > > > + (uint8_t *)&len, > > > > + sizeof(len)); > > > > + > > > > + if (ret != sizeof(len)) { > > > > + goto err; > > > > + } > > > > + > > > > + ret = qemu_chr_fe_write_all(&s->chr_notify_dev, > > > > + (uint8_t *)buf, > > > > + size); > > > > > > > > if (ret != size) { > > > > goto err; > > > > } > > > > > > > > return 0; > > > > - > > > > err: > > > > return ret < 0 ? ret : -EIO; > > > > } > > > > @@ -1062,8 +1142,7 @@ static void > > > > compare_pri_rs_finalize(SocketReadState *pri_rs) > > > > compare_chr_send(s, > > > > pri_rs->buf, > > > > pri_rs->packet_len, > > > > - pri_rs->vnet_hdr_len, > > > > - false); > > > > + pri_rs->vnet_hdr_len); > > > > } else { > > > > /* compare packet in the specified connection */ > > > > colo_compare_connection(conn, s); @@ -1093,7 +1172,7 @@ > > > > static void compare_notify_rs_finalize(SocketReadState *notify_rs) > > > > if (packet_matches_str("COLO_USERSPACE_PROXY_INIT", > > > > notify_rs->buf, > > > > notify_rs->packet_len)) { > > > > - ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true); > > > > + ret = notify_chr_send(s, (uint8_t *)msg, strlen(msg)); > > > > if (ret < 0) { > > > > error_report("Notify Xen COLO-frame INIT failed"); > > > > } > > > > @@ -1199,6 +1278,9 @@ static void > > > > colo_compare_complete(UserCreatable > > > > *uc, Error **errp) > > > > > > > > QTAILQ_INSERT_TAIL(&net_compares, s, next); > > > > > > > > + s->sendco.done = true; > > > > + g_queue_init(&s->sendco.send_list); > > > > + > > > > g_queue_init(&s->conn_list); > > > > > > > > qemu_mutex_init(&event_mtx); > > > > @@ -1224,8 +1306,7 @@ static void colo_flush_packets(void *opaque, > > > > void > > > > *user_data) > > > > compare_chr_send(s, > > > > pkt->data, > > > > pkt->size, > > > > - pkt->vnet_hdr_len, > > > > - false); > > > > + pkt->vnet_hdr_len); > > > > packet_destroy(pkt, NULL); > > > > } > > > > while (!g_queue_is_empty(&conn->secondary_list)) { @@ -1281,6 > > > > +1362,11 @@ static void colo_compare_finalize(Object *obj) > > > > CompareState *s = COLO_COMPARE(obj); > > > > CompareState *tmp = NULL; > > > > > > > > + AioContext *ctx = iothread_get_aio_context(s->iothread); > > > > + aio_context_acquire(ctx); > > > > + AIO_WAIT_WHILE(ctx, !s->sendco.done); > > > > + aio_context_release(ctx); > > > > + > > > > qemu_chr_fe_deinit(&s->chr_pri_in, false); > > > > qemu_chr_fe_deinit(&s->chr_sec_in, false); > > > > qemu_chr_fe_deinit(&s->chr_out, false); @@ -1305,6 +1391,7 @@ > > > > static void colo_compare_finalize(Object *obj) > > > > g_queue_foreach(&s->conn_list, colo_flush_packets, s); > > > > > > > > g_queue_clear(&s->conn_list); > > > > + g_queue_clear(&s->sendco.send_list); > > > > > > > > if (s->connection_track_table) { > > > > g_hash_table_destroy(s->connection_track_table); > > > > -- > > > > 2.20.1 > > > >