On Thu, 7 May 2020 11:00:26 +0000 "Zhang, Chen" wrote: > > -----Original Message----- > > From: Lukas Straub > > Sent: Monday, May 4, 2020 6:28 PM > > To: qemu-devel > > Cc: Zhang, Chen ; Li Zhijian > > ; Jason Wang ; Marc- > > André Lureau ; Paolo Bonzini > > > > Subject: [PATCH v4 3/6] net/colo-compare.c: Fix deadlock in > > compare_chr_send > > > > The chr_out chardev is connected to a filter-redirector running in the > > main loop. qemu_chr_fe_write_all might block here in compare_chr_send > > if the (socket-)buffer is full. > > If another filter-redirector in the main loop want's to send data to > > chr_pri_in it might also block if the buffer is full. This leads to a > > deadlock because both event loops get blocked. > > > > Fix this by converting compare_chr_send to a coroutine and putting the > > packets in a send queue. > > > > Signed-off-by: Lukas Straub > > --- > > net/colo-compare.c | 187 ++++++++++++++++++++++++++++++++++------- > > ---- > > net/colo.c | 7 ++ > > net/colo.h | 1 + > > 3 files changed, 150 insertions(+), 45 deletions(-) > > > > diff --git a/net/colo-compare.c b/net/colo-compare.c index > > 1de4220fe2..2a4e7f7c4e 100644 > > --- a/net/colo-compare.c > > +++ b/net/colo-compare.c > > @@ -32,6 +32,9 @@ > > #include "migration/migration.h" > > #include "util.h" > > > > +#include "block/aio-wait.h" > > +#include "qemu/coroutine.h" > > + > > #define TYPE_COLO_COMPARE "colo-compare" > > #define COLO_COMPARE(obj) \ > > OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) @@ -77,6 > > +80,23 @@ static int event_unhandled_count; > > * |packet | |packet + |packet | |packet + > > * +--------+ +--------+ +--------+ +--------+ > > */ > > + > > +typedef struct SendCo { > > + Coroutine *co; > > + struct CompareState *s; > > + CharBackend *chr; > > + GQueue send_list; > > + bool notify_remote_frame; > > + bool done; > > + int ret; > > +} SendCo; > > + > > +typedef struct SendEntry { > > + uint32_t size; > > + uint32_t vnet_hdr_len; > > + uint8_t *buf; > > +} SendEntry; > > + > > typedef struct CompareState { > > Object parent; > > > > @@ -91,6 +111,8 @@ typedef struct CompareState { > > SocketReadState pri_rs; > > SocketReadState sec_rs; > > SocketReadState notify_rs; > > + SendCo out_sendco; > > + SendCo notify_sendco; > > bool vnet_hdr; > > uint32_t compare_timeout; > > uint32_t expired_scan_cycle; > > @@ -124,10 +146,11 @@ enum { > > > > > > static int compare_chr_send(CompareState *s, > > - const uint8_t *buf, > > + uint8_t *buf, > > uint32_t size, > > uint32_t vnet_hdr_len, > > - bool notify_remote_frame); > > + bool notify_remote_frame, > > + bool zero_copy); > > > > static bool packet_matches_str(const char *str, > > const uint8_t *buf, @@ -145,7 +168,7 > > @@ static void notify_remote_frame(CompareState *s) > > char msg[] = "DO_CHECKPOINT"; > > int ret = 0; > > > > - ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true); > > + ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, > > + false); > > if (ret < 0) { > > error_report("Notify Xen COLO-frame failed"); > > } > > @@ -272,12 +295,13 @@ static void > > colo_release_primary_pkt(CompareState *s, Packet *pkt) > > pkt->data, > > pkt->size, > > pkt->vnet_hdr_len, > > - false); > > + false, > > + true); > > if (ret < 0) { > > error_report("colo send primary packet failed"); > > } > > trace_colo_compare_main("packet same and release packet"); > > - packet_destroy(pkt, NULL); > > + packet_destroy_partial(pkt, NULL); > > } > > > > /* > > @@ -699,65 +723,115 @@ static void colo_compare_connection(void > > *opaque, void *user_data) > > } > > } > > > > -static int compare_chr_send(CompareState *s, > > - const uint8_t *buf, > > - uint32_t size, > > - uint32_t vnet_hdr_len, > > - bool notify_remote_frame) > > +static void coroutine_fn _compare_chr_send(void *opaque) > > { > > + SendCo *sendco = opaque; > > + CompareState *s = sendco->s; > > int ret = 0; > > - uint32_t len = htonl(size); > > > > - if (!size) { > > - return 0; > > - } > > + while (!g_queue_is_empty(&sendco->send_list)) { > > + SendEntry *entry = g_queue_pop_tail(&sendco->send_list); > > + uint32_t len = htonl(entry->size); > > > > - if (notify_remote_frame) { > > - ret = qemu_chr_fe_write_all(&s->chr_notify_dev, > > - (uint8_t *)&len, > > - sizeof(len)); > > - } else { > > - ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len)); > > - } > > + ret = qemu_chr_fe_write_all(sendco->chr, (uint8_t *)&len, > > + sizeof(len)); > > > > - if (ret != sizeof(len)) { > > - goto err; > > - } > > + if (ret != sizeof(len)) { > > + g_free(entry->buf); > > + g_slice_free(SendEntry, entry); > > + goto err; > > + } > > > > - if (s->vnet_hdr) { > > - /* > > - * We send vnet header len make other module(like filter-redirector) > > - * know how to parse net packet correctly. > > - */ > > - len = htonl(vnet_hdr_len); > > + if (!sendco->notify_remote_frame && s->vnet_hdr) { > > + /* > > + * We send vnet header len make other module(like filter-redirector) > > + * know how to parse net packet correctly. > > + */ > > + len = htonl(entry->vnet_hdr_len); > > > > - if (!notify_remote_frame) { > > - ret = qemu_chr_fe_write_all(&s->chr_out, > > + ret = qemu_chr_fe_write_all(sendco->chr, > > (uint8_t *)&len, > > sizeof(len)); > > + > > + if (ret != sizeof(len)) { > > + g_free(entry->buf); > > + g_slice_free(SendEntry, entry); > > + goto err; > > + } > > } > > > > - if (ret != sizeof(len)) { > > + ret = qemu_chr_fe_write_all(sendco->chr, > > + (uint8_t *)entry->buf, > > + entry->size); > > + > > + if (ret != entry->size) { > > + g_free(entry->buf); > > + g_slice_free(SendEntry, entry); > > goto err; > > } > > + > > + g_free(entry->buf); > > + g_slice_free(SendEntry, entry); > > } > > > > + sendco->ret = 0; > > + goto out; > > + > > +err: > > + while (!g_queue_is_empty(&sendco->send_list)) { > > + SendEntry *entry = g_queue_pop_tail(&sendco->send_list); > > + g_free(entry->buf); > > + g_slice_free(SendEntry, entry); > > + } > > + sendco->ret = ret < 0 ? ret : -EIO; > > +out: > > + sendco->co = NULL; > > + sendco->done = true; > > + aio_wait_kick(); > > +} > > + > > +static int compare_chr_send(CompareState *s, > > + uint8_t *buf, > > + uint32_t size, > > + uint32_t vnet_hdr_len, > > + bool notify_remote_frame, > > + bool zero_copy) { > > + SendCo *sendco; > > + SendEntry *entry; > > + > > if (notify_remote_frame) { > > - ret = qemu_chr_fe_write_all(&s->chr_notify_dev, > > - (uint8_t *)buf, > > - size); > > + sendco = &s->notify_sendco; > > } else { > > - ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size); > > + sendco = &s->out_sendco; > > } > > > > - if (ret != size) { > > - goto err; > > + if (!size) { > > + return 0; > > } > > > > - return 0; > > + entry = g_slice_new(SendEntry); > > + entry->size = size; > > + entry->vnet_hdr_len = vnet_hdr_len; > > + if (zero_copy) { > > + entry->buf = buf; > > + } else { > > + entry->buf = g_malloc(size); > > + memcpy(entry->buf, buf, size); > > + } > > + g_queue_push_head(&sendco->send_list, entry); > > + > > + if (sendco->done) { > > + sendco->co = qemu_coroutine_create(_compare_chr_send, sendco); > > + sendco->done = false; > > + qemu_coroutine_enter(sendco->co); > > + if (sendco->done) { > > + /* report early errors */ > > + return sendco->ret; > > + } > > + } > > > > -err: > > - return ret < 0 ? ret : -EIO; > > + /* assume success */ > > + return 0; > > } > > > > static int compare_chr_can_read(void *opaque) @@ -1063,6 +1137,7 @@ > > static void compare_pri_rs_finalize(SocketReadState *pri_rs) > > pri_rs->buf, > > pri_rs->packet_len, > > pri_rs->vnet_hdr_len, > > + false, > > false); > > } else { > > /* compare packet in the specified connection */ @@ -1093,7 > > +1168,7 @@ static void compare_notify_rs_finalize(SocketReadState > > +*notify_rs) > > if (packet_matches_str("COLO_USERSPACE_PROXY_INIT", > > notify_rs->buf, > > notify_rs->packet_len)) { > > - ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true); > > + ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, > > + true, false); > > if (ret < 0) { > > error_report("Notify Xen COLO-frame INIT failed"); > > } > > @@ -1199,6 +1274,18 @@ static void > > colo_compare_complete(UserCreatable *uc, Error **errp) > > > > QTAILQ_INSERT_TAIL(&net_compares, s, next); > > > > + s->out_sendco.s = s; > > + s->out_sendco.chr = &s->chr_out; > > + s->out_sendco.notify_remote_frame = false; > > + s->out_sendco.done = true; > > + g_queue_init(&s->out_sendco.send_list); > > + > > + s->notify_sendco.s = s; > > + s->notify_sendco.chr = &s->chr_notify_dev; > > + s->notify_sendco.notify_remote_frame = true; > > + s->notify_sendco.done = true; > > + g_queue_init(&s->notify_sendco.send_list); > > + > > No need to init the notify_sendco each time, because the notify dev just an optional parameter. > You can use the if (s->notify_dev) here. Just Xen use the chr_notify_dev. Ok, I will change that and the code below in the next version. > Overall, make the chr_send job to coroutine is a good idea. It looks good for me. > And your patch inspired me, it looks we can re-use the compare_chr_send code on filter mirror/redirector too. I already have patch for that, but I don't think it is a good idea, because the guest then can send packets faster than colo-compare can process. This leads bufferbloat and the performance drops in my tests: Client-to-server tcp: without patch: ~66 Mbit/s with patch: ~59 Mbit/s Server-to-client tcp: without patch: ~702 Kbit/s with patch: ~328 Kbit/s Regards, Lukas Straub > Tested-by: Zhang Chen > > > > g_queue_init(&s->conn_list); > > > > qemu_mutex_init(&event_mtx); > > @@ -1225,8 +1312,9 @@ static void colo_flush_packets(void *opaque, > > void > > *user_data) > > pkt->data, > > pkt->size, > > pkt->vnet_hdr_len, > > - false); > > - packet_destroy(pkt, NULL); > > + false, > > + true); > > + packet_destroy_partial(pkt, NULL); > > } > > while (!g_queue_is_empty(&conn->secondary_list)) { > > pkt = g_queue_pop_head(&conn->secondary_list); > > @@ -1301,10 +1389,19 @@ static void colo_compare_finalize(Object *obj) > > } > > } > > > > + AioContext *ctx = iothread_get_aio_context(s->iothread); > > + aio_context_acquire(ctx); > > + AIO_WAIT_WHILE(ctx, !s->out_sendco.done); > > + AIO_WAIT_WHILE(ctx, !s->notify_sendco.done); > > Same as above. > > > + aio_context_release(ctx); > > + > > /* Release all unhandled packets after compare thead exited */ > > g_queue_foreach(&s->conn_list, colo_flush_packets, s); > > + AIO_WAIT_WHILE(NULL, !s->out_sendco.done); > > > > g_queue_clear(&s->conn_list); > > + g_queue_clear(&s->out_sendco.send_list); > > + g_queue_clear(&s->notify_sendco.send_list); > > Same as above. > > > > > if (s->connection_track_table) { > > g_hash_table_destroy(s->connection_track_table); > > diff --git a/net/colo.c b/net/colo.c > > index 8196b35837..a6c66d829a 100644 > > --- a/net/colo.c > > +++ b/net/colo.c > > @@ -185,6 +185,13 @@ void packet_destroy(void *opaque, void *user_data) > > g_slice_free(Packet, pkt); > > } > > > > +void packet_destroy_partial(void *opaque, void *user_data) { > > + Packet *pkt = opaque; > > + > > + g_slice_free(Packet, pkt); > > +} > > + > > /* > > * Clear hashtable, stop this hash growing really huge > > */ > > diff --git a/net/colo.h b/net/colo.h > > index 679314b1ca..573ab91785 100644 > > --- a/net/colo.h > > +++ b/net/colo.h > > @@ -102,5 +102,6 @@ bool connection_has_tracked(GHashTable > > *connection_track_table, void connection_hashtable_reset(GHashTable > > *connection_track_table); Packet *packet_new(const void *data, int > > size, int vnet_hdr_len); void packet_destroy(void *opaque, void > > *user_data); > > +void packet_destroy_partial(void *opaque, void *user_data); > > > > #endif /* NET_COLO_H */ > > -- > > 2.20.1 >