From mboxrd@z Thu Jan 1 00:00:00 1970 From: "Dr. David Alan Gilbert" Subject: Re: [PATCH 2/8] migration: stop allocating and freeing memory frequently Date: Thu, 15 Mar 2018 11:03:51 +0000 Message-ID: <20180315110350.GB3062@work-vm> References: <20180313075739.11194-1-xiaoguangrong@tencent.com> <20180313075739.11194-3-xiaoguangrong@tencent.com> Mime-Version: 1.0 Content-Type: text/plain; charset=us-ascii Cc: kvm@vger.kernel.org, mst@redhat.com, mtosatti@redhat.com, Xiao Guangrong , qemu-devel@nongnu.org, pbonzini@redhat.com To: guangrong.xiao@gmail.com, quintela@redhat.com Return-path: Content-Disposition: inline In-Reply-To: <20180313075739.11194-3-xiaoguangrong@tencent.com> List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: qemu-devel-bounces+gceq-qemu-devel2=m.gmane.org@nongnu.org Sender: "Qemu-devel" List-Id: kvm.vger.kernel.org * guangrong.xiao@gmail.com (guangrong.xiao@gmail.com) wrote: > From: Xiao Guangrong > > Current code uses compress2()/uncompress() to compress/decompress > memory, these two function manager memory allocation and release > internally, that causes huge memory is allocated and freed very > frequently > > More worse, frequently returning memory to kernel will flush TLBs > and trigger invalidation callbacks on mmu-notification which > interacts with KVM MMU, that dramatically reduce the performance > of VM > > So, we maintain the memory by ourselves and reuse it for each > compression and decompression I think > Signed-off-by: Xiao Guangrong > --- > migration/qemu-file.c | 34 ++++++++++-- > migration/qemu-file.h | 6 ++- > migration/ram.c | 142 +++++++++++++++++++++++++++++++++++++------------- > 3 files changed, 140 insertions(+), 42 deletions(-) > > diff --git a/migration/qemu-file.c b/migration/qemu-file.c > index 2ab2bf362d..1ff33a1ffb 100644 > --- a/migration/qemu-file.c > +++ b/migration/qemu-file.c > @@ -658,6 +658,30 @@ uint64_t qemu_get_be64(QEMUFile *f) > return v; > } > > +/* return the size after compression, or negative value on error */ > +static int qemu_compress_data(z_stream *stream, uint8_t *dest, size_t dest_len, > + const uint8_t *source, size_t source_len) > +{ > + int err; > + > + err = deflateReset(stream); > + if (err != Z_OK) { > + return -1; > + } > + > + stream->avail_in = source_len; > + stream->next_in = (uint8_t *)source; > + stream->avail_out = dest_len; > + stream->next_out = dest; > + > + err = deflate(stream, Z_FINISH); > + if (err != Z_STREAM_END) { > + return -1; > + } > + > + return stream->next_out - dest; > +} > + > /* Compress size bytes of data start at p with specific compression > * level and store the compressed data to the buffer of f. > * > @@ -668,8 +692,8 @@ uint64_t qemu_get_be64(QEMUFile *f) > * data, return -1. > */ > > -ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size, > - int level) > +ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream, > + const uint8_t *p, size_t size) > { > ssize_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t); > > @@ -683,8 +707,10 @@ ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size, > return -1; > } > } > - if (compress2(f->buf + f->buf_index + sizeof(int32_t), (uLongf *)&blen, > - (Bytef *)p, size, level) != Z_OK) { > + > + blen = qemu_compress_data(stream, f->buf + f->buf_index + sizeof(int32_t), > + blen, p, size); > + if (blen < 0) { > error_report("Compress Failed!"); > return 0; > } > diff --git a/migration/qemu-file.h b/migration/qemu-file.h > index aae4e5ed36..d123b21ca8 100644 > --- a/migration/qemu-file.h > +++ b/migration/qemu-file.h > @@ -25,6 +25,8 @@ > #ifndef MIGRATION_QEMU_FILE_H > #define MIGRATION_QEMU_FILE_H > > +#include > + > /* Read a chunk of data from a file at the given position. The pos argument > * can be ignored if the file is only be used for streaming. The number of > * bytes actually read should be returned. > @@ -132,8 +134,8 @@ bool qemu_file_is_writable(QEMUFile *f); > > size_t qemu_peek_buffer(QEMUFile *f, uint8_t **buf, size_t size, size_t offset); > size_t qemu_get_buffer_in_place(QEMUFile *f, uint8_t **buf, size_t size); > -ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size, > - int level); > +ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream, > + const uint8_t *p, size_t size); > int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src); > > /* > diff --git a/migration/ram.c b/migration/ram.c > index 615693f180..fff3f31e90 100644 > --- a/migration/ram.c > +++ b/migration/ram.c > @@ -264,6 +264,7 @@ struct CompressParam { > QemuCond cond; > RAMBlock *block; > ram_addr_t offset; > + z_stream stream; > }; > typedef struct CompressParam CompressParam; > > @@ -275,6 +276,7 @@ struct DecompressParam { > void *des; > uint8_t *compbuf; > int len; > + z_stream stream; > }; > typedef struct DecompressParam DecompressParam; > > @@ -294,7 +296,7 @@ static QemuThread *decompress_threads; > static QemuMutex decomp_done_lock; > static QemuCond decomp_done_cond; > > -static int do_compress_ram_page(QEMUFile *f, RAMBlock *block, > +static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, > ram_addr_t offset); > > static void *do_data_compress(void *opaque) > @@ -311,7 +313,7 @@ static void *do_data_compress(void *opaque) > param->block = NULL; > qemu_mutex_unlock(¶m->mutex); > > - do_compress_ram_page(param->file, block, offset); > + do_compress_ram_page(param->file, ¶m->stream, block, offset); > > qemu_mutex_lock(&comp_done_lock); > param->done = true; > @@ -352,10 +354,17 @@ static void compress_threads_save_cleanup(void) > terminate_compression_threads(); > thread_count = migrate_compress_threads(); > for (i = 0; i < thread_count; i++) { > + /* something in compress_threads_save_setup() is wrong. */ > + if (!comp_param[i].stream.opaque) { > + break; > + } > + > qemu_thread_join(compress_threads + i); > qemu_fclose(comp_param[i].file); > qemu_mutex_destroy(&comp_param[i].mutex); > qemu_cond_destroy(&comp_param[i].cond); > + deflateEnd(&comp_param[i].stream); > + comp_param[i].stream.opaque = NULL; > } > qemu_mutex_destroy(&comp_done_lock); > qemu_cond_destroy(&comp_done_cond); > @@ -365,12 +374,12 @@ static void compress_threads_save_cleanup(void) > comp_param = NULL; > } > > -static void compress_threads_save_setup(void) > +static int compress_threads_save_setup(void) > { > int i, thread_count; > > if (!migrate_use_compression()) { > - return; > + return 0; > } > thread_count = migrate_compress_threads(); > compress_threads = g_new0(QemuThread, thread_count); > @@ -378,6 +387,12 @@ static void compress_threads_save_setup(void) > qemu_cond_init(&comp_done_cond); > qemu_mutex_init(&comp_done_lock); > for (i = 0; i < thread_count; i++) { > + if (deflateInit(&comp_param[i].stream, > + migrate_compress_level()) != Z_OK) { > + goto exit; > + } > + comp_param[i].stream.opaque = &comp_param[i]; > + > /* comp_param[i].file is just used as a dummy buffer to save data, > * set its ops to empty. > */ > @@ -390,6 +405,11 @@ static void compress_threads_save_setup(void) > do_data_compress, comp_param + i, > QEMU_THREAD_JOINABLE); > } > + return 0; > + > +exit: > + compress_threads_save_cleanup(); > + return -1; > } > > /* Multiple fd's */ > @@ -1026,7 +1046,7 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage) > return pages; > } > > -static int do_compress_ram_page(QEMUFile *f, RAMBlock *block, > +static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, > ram_addr_t offset) > { > RAMState *rs = ram_state; > @@ -1035,8 +1055,7 @@ static int do_compress_ram_page(QEMUFile *f, RAMBlock *block, > > bytes_sent = save_page_header(rs, f, block, offset | > RAM_SAVE_FLAG_COMPRESS_PAGE); > - blen = qemu_put_compression_data(f, p, TARGET_PAGE_SIZE, > - migrate_compress_level()); > + blen = qemu_put_compression_data(f, stream, p, TARGET_PAGE_SIZE); > if (blen < 0) { > bytes_sent = 0; > qemu_file_set_error(migrate_get_current()->to_dst_file, blen); > @@ -2209,9 +2228,14 @@ static int ram_save_setup(QEMUFile *f, void *opaque) > RAMState **rsp = opaque; > RAMBlock *block; > > + if (compress_threads_save_setup()) { > + return -1; > + } > + > /* migration has already setup the bitmap, reuse it. */ > if (!migration_in_colo_state()) { > if (ram_init_all(rsp) != 0) { > + compress_threads_save_cleanup(); > return -1; > } > } > @@ -2231,7 +2255,6 @@ static int ram_save_setup(QEMUFile *f, void *opaque) > } > > rcu_read_unlock(); > - compress_threads_save_setup(); > > ram_control_before_iterate(f, RAM_CONTROL_SETUP); > ram_control_after_iterate(f, RAM_CONTROL_SETUP); > @@ -2495,6 +2518,30 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size) > } > } > > +/* return the size after decompression, or negative value on error */ > +static int qemu_uncompress(z_stream *stream, uint8_t *dest, size_t dest_len, > + uint8_t *source, size_t source_len) > +{ > + int err; > + > + err = inflateReset(stream); > + if (err != Z_OK) { > + return -1; > + } > + > + stream->avail_in = source_len; > + stream->next_in = source; > + stream->avail_out = dest_len; > + stream->next_out = dest; > + > + err = inflate(stream, Z_NO_FLUSH); > + if (err != Z_STREAM_END) { > + return -1; > + } > + > + return stream->total_out; > +} > + > static void *do_data_decompress(void *opaque) > { > DecompressParam *param = opaque; > @@ -2511,13 +2558,13 @@ static void *do_data_decompress(void *opaque) > qemu_mutex_unlock(¶m->mutex); > > pagesize = TARGET_PAGE_SIZE; > - /* uncompress() will return failed in some case, especially > + /* qemu_uncompress() will return failed in some case, especially > * when the page is dirted when doing the compression, it's > * not a problem because the dirty page will be retransferred > * and uncompress() won't break the data in other pages. > */ > - uncompress((Bytef *)des, &pagesize, > - (const Bytef *)param->compbuf, len); > + qemu_uncompress(¶m->stream, des, pagesize, > + param->compbuf, len); > > qemu_mutex_lock(&decomp_done_lock); > param->done = true; > @@ -2552,30 +2599,6 @@ static void wait_for_decompress_done(void) > qemu_mutex_unlock(&decomp_done_lock); > } > > -static void compress_threads_load_setup(void) > -{ > - int i, thread_count; > - > - if (!migrate_use_compression()) { > - return; > - } > - thread_count = migrate_decompress_threads(); > - decompress_threads = g_new0(QemuThread, thread_count); > - decomp_param = g_new0(DecompressParam, thread_count); > - qemu_mutex_init(&decomp_done_lock); > - qemu_cond_init(&decomp_done_cond); > - for (i = 0; i < thread_count; i++) { > - qemu_mutex_init(&decomp_param[i].mutex); > - qemu_cond_init(&decomp_param[i].cond); > - decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE)); > - decomp_param[i].done = true; > - decomp_param[i].quit = false; > - qemu_thread_create(decompress_threads + i, "decompress", > - do_data_decompress, decomp_param + i, > - QEMU_THREAD_JOINABLE); > - } > -} > - > static void compress_threads_load_cleanup(void) > { > int i, thread_count; > @@ -2585,16 +2608,26 @@ static void compress_threads_load_cleanup(void) > } > thread_count = migrate_decompress_threads(); > for (i = 0; i < thread_count; i++) { > + if (!decomp_param[i].stream.opaque) { > + break; > + } > + > qemu_mutex_lock(&decomp_param[i].mutex); > decomp_param[i].quit = true; > qemu_cond_signal(&decomp_param[i].cond); > qemu_mutex_unlock(&decomp_param[i].mutex); > } > for (i = 0; i < thread_count; i++) { > + if (!decomp_param[i].stream.opaque) { > + break; > + } > + > qemu_thread_join(decompress_threads + i); > qemu_mutex_destroy(&decomp_param[i].mutex); > qemu_cond_destroy(&decomp_param[i].cond); > g_free(decomp_param[i].compbuf); > + inflateEnd(&decomp_param[i].stream); > + decomp_param[i].stream.opaque = NULL; > } > g_free(decompress_threads); > g_free(decomp_param); > @@ -2602,6 +2635,40 @@ static void compress_threads_load_cleanup(void) > decomp_param = NULL; > } > > +static int compress_threads_load_setup(void) > +{ > + int i, thread_count; > + > + if (!migrate_use_compression()) { > + return 0; > + } > + > + thread_count = migrate_decompress_threads(); > + decompress_threads = g_new0(QemuThread, thread_count); > + decomp_param = g_new0(DecompressParam, thread_count); > + qemu_mutex_init(&decomp_done_lock); > + qemu_cond_init(&decomp_done_cond); > + for (i = 0; i < thread_count; i++) { > + if (inflateInit(&decomp_param[i].stream) != Z_OK) { > + goto exit; > + } > + decomp_param[i].stream.opaque = &decomp_param[i]; > + > + qemu_mutex_init(&decomp_param[i].mutex); > + qemu_cond_init(&decomp_param[i].cond); > + decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE)); > + decomp_param[i].done = true; > + decomp_param[i].quit = false; > + qemu_thread_create(decompress_threads + i, "decompress", > + do_data_decompress, decomp_param + i, > + QEMU_THREAD_JOINABLE); > + } > + return 0; > +exit: > + compress_threads_load_cleanup(); I don't think this is safe; if inflateInit(..) fails in not-the-last thread, compress_threads_load_cleanup() will try and destroy all the mutex's and condition variables, even though they've not yet all been _init'd. However, other than that I think the patch is OK; a chat with Dan Berrange has convinced me this probably doesn't affect the stream format, so that's OK. One thing I would like is a comment as to how the 'opaque' field is being used; I don't think I quite understand what you're doing there. Dave > + return -1; > +} > + > static void decompress_data_with_multi_threads(QEMUFile *f, > void *host, int len) > { > @@ -2641,8 +2708,11 @@ static void decompress_data_with_multi_threads(QEMUFile *f, > */ > static int ram_load_setup(QEMUFile *f, void *opaque) > { > + if (compress_threads_load_setup()) { > + return -1; > + } > + > xbzrle_load_setup(); > - compress_threads_load_setup(); > ramblock_recv_map_init(); > return 0; > } > -- > 2.14.3 > > -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from eggs.gnu.org ([2001:4830:134:3::10]:37850) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1ewQg8-0005k7-Us for qemu-devel@nongnu.org; Thu, 15 Mar 2018 07:04:14 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1ewQg6-0005zn-Us for qemu-devel@nongnu.org; Thu, 15 Mar 2018 07:04:12 -0400 Received: from mx3-rdu2.redhat.com ([66.187.233.73]:49338 helo=mx1.redhat.com) by eggs.gnu.org with esmtps (TLS1.0:DHE_RSA_AES_256_CBC_SHA1:32) (Exim 4.71) (envelope-from ) id 1ewQg6-0005zN-Nq for qemu-devel@nongnu.org; Thu, 15 Mar 2018 07:04:10 -0400 Date: Thu, 15 Mar 2018 11:03:51 +0000 From: "Dr. David Alan Gilbert" Message-ID: <20180315110350.GB3062@work-vm> References: <20180313075739.11194-1-xiaoguangrong@tencent.com> <20180313075739.11194-3-xiaoguangrong@tencent.com> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <20180313075739.11194-3-xiaoguangrong@tencent.com> Subject: Re: [Qemu-devel] [PATCH 2/8] migration: stop allocating and freeing memory frequently List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: guangrong.xiao@gmail.com, quintela@redhat.com Cc: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com, Xiao Guangrong , qemu-devel@nongnu.org, kvm@vger.kernel.org * guangrong.xiao@gmail.com (guangrong.xiao@gmail.com) wrote: > From: Xiao Guangrong > > Current code uses compress2()/uncompress() to compress/decompress > memory, these two function manager memory allocation and release > internally, that causes huge memory is allocated and freed very > frequently > > More worse, frequently returning memory to kernel will flush TLBs > and trigger invalidation callbacks on mmu-notification which > interacts with KVM MMU, that dramatically reduce the performance > of VM > > So, we maintain the memory by ourselves and reuse it for each > compression and decompression I think > Signed-off-by: Xiao Guangrong > --- > migration/qemu-file.c | 34 ++++++++++-- > migration/qemu-file.h | 6 ++- > migration/ram.c | 142 +++++++++++++++++++++++++++++++++++++------------- > 3 files changed, 140 insertions(+), 42 deletions(-) > > diff --git a/migration/qemu-file.c b/migration/qemu-file.c > index 2ab2bf362d..1ff33a1ffb 100644 > --- a/migration/qemu-file.c > +++ b/migration/qemu-file.c > @@ -658,6 +658,30 @@ uint64_t qemu_get_be64(QEMUFile *f) > return v; > } > > +/* return the size after compression, or negative value on error */ > +static int qemu_compress_data(z_stream *stream, uint8_t *dest, size_t dest_len, > + const uint8_t *source, size_t source_len) > +{ > + int err; > + > + err = deflateReset(stream); > + if (err != Z_OK) { > + return -1; > + } > + > + stream->avail_in = source_len; > + stream->next_in = (uint8_t *)source; > + stream->avail_out = dest_len; > + stream->next_out = dest; > + > + err = deflate(stream, Z_FINISH); > + if (err != Z_STREAM_END) { > + return -1; > + } > + > + return stream->next_out - dest; > +} > + > /* Compress size bytes of data start at p with specific compression > * level and store the compressed data to the buffer of f. > * > @@ -668,8 +692,8 @@ uint64_t qemu_get_be64(QEMUFile *f) > * data, return -1. > */ > > -ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size, > - int level) > +ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream, > + const uint8_t *p, size_t size) > { > ssize_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t); > > @@ -683,8 +707,10 @@ ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size, > return -1; > } > } > - if (compress2(f->buf + f->buf_index + sizeof(int32_t), (uLongf *)&blen, > - (Bytef *)p, size, level) != Z_OK) { > + > + blen = qemu_compress_data(stream, f->buf + f->buf_index + sizeof(int32_t), > + blen, p, size); > + if (blen < 0) { > error_report("Compress Failed!"); > return 0; > } > diff --git a/migration/qemu-file.h b/migration/qemu-file.h > index aae4e5ed36..d123b21ca8 100644 > --- a/migration/qemu-file.h > +++ b/migration/qemu-file.h > @@ -25,6 +25,8 @@ > #ifndef MIGRATION_QEMU_FILE_H > #define MIGRATION_QEMU_FILE_H > > +#include > + > /* Read a chunk of data from a file at the given position. The pos argument > * can be ignored if the file is only be used for streaming. The number of > * bytes actually read should be returned. > @@ -132,8 +134,8 @@ bool qemu_file_is_writable(QEMUFile *f); > > size_t qemu_peek_buffer(QEMUFile *f, uint8_t **buf, size_t size, size_t offset); > size_t qemu_get_buffer_in_place(QEMUFile *f, uint8_t **buf, size_t size); > -ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size, > - int level); > +ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream, > + const uint8_t *p, size_t size); > int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src); > > /* > diff --git a/migration/ram.c b/migration/ram.c > index 615693f180..fff3f31e90 100644 > --- a/migration/ram.c > +++ b/migration/ram.c > @@ -264,6 +264,7 @@ struct CompressParam { > QemuCond cond; > RAMBlock *block; > ram_addr_t offset; > + z_stream stream; > }; > typedef struct CompressParam CompressParam; > > @@ -275,6 +276,7 @@ struct DecompressParam { > void *des; > uint8_t *compbuf; > int len; > + z_stream stream; > }; > typedef struct DecompressParam DecompressParam; > > @@ -294,7 +296,7 @@ static QemuThread *decompress_threads; > static QemuMutex decomp_done_lock; > static QemuCond decomp_done_cond; > > -static int do_compress_ram_page(QEMUFile *f, RAMBlock *block, > +static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, > ram_addr_t offset); > > static void *do_data_compress(void *opaque) > @@ -311,7 +313,7 @@ static void *do_data_compress(void *opaque) > param->block = NULL; > qemu_mutex_unlock(¶m->mutex); > > - do_compress_ram_page(param->file, block, offset); > + do_compress_ram_page(param->file, ¶m->stream, block, offset); > > qemu_mutex_lock(&comp_done_lock); > param->done = true; > @@ -352,10 +354,17 @@ static void compress_threads_save_cleanup(void) > terminate_compression_threads(); > thread_count = migrate_compress_threads(); > for (i = 0; i < thread_count; i++) { > + /* something in compress_threads_save_setup() is wrong. */ > + if (!comp_param[i].stream.opaque) { > + break; > + } > + > qemu_thread_join(compress_threads + i); > qemu_fclose(comp_param[i].file); > qemu_mutex_destroy(&comp_param[i].mutex); > qemu_cond_destroy(&comp_param[i].cond); > + deflateEnd(&comp_param[i].stream); > + comp_param[i].stream.opaque = NULL; > } > qemu_mutex_destroy(&comp_done_lock); > qemu_cond_destroy(&comp_done_cond); > @@ -365,12 +374,12 @@ static void compress_threads_save_cleanup(void) > comp_param = NULL; > } > > -static void compress_threads_save_setup(void) > +static int compress_threads_save_setup(void) > { > int i, thread_count; > > if (!migrate_use_compression()) { > - return; > + return 0; > } > thread_count = migrate_compress_threads(); > compress_threads = g_new0(QemuThread, thread_count); > @@ -378,6 +387,12 @@ static void compress_threads_save_setup(void) > qemu_cond_init(&comp_done_cond); > qemu_mutex_init(&comp_done_lock); > for (i = 0; i < thread_count; i++) { > + if (deflateInit(&comp_param[i].stream, > + migrate_compress_level()) != Z_OK) { > + goto exit; > + } > + comp_param[i].stream.opaque = &comp_param[i]; > + > /* comp_param[i].file is just used as a dummy buffer to save data, > * set its ops to empty. > */ > @@ -390,6 +405,11 @@ static void compress_threads_save_setup(void) > do_data_compress, comp_param + i, > QEMU_THREAD_JOINABLE); > } > + return 0; > + > +exit: > + compress_threads_save_cleanup(); > + return -1; > } > > /* Multiple fd's */ > @@ -1026,7 +1046,7 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage) > return pages; > } > > -static int do_compress_ram_page(QEMUFile *f, RAMBlock *block, > +static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, > ram_addr_t offset) > { > RAMState *rs = ram_state; > @@ -1035,8 +1055,7 @@ static int do_compress_ram_page(QEMUFile *f, RAMBlock *block, > > bytes_sent = save_page_header(rs, f, block, offset | > RAM_SAVE_FLAG_COMPRESS_PAGE); > - blen = qemu_put_compression_data(f, p, TARGET_PAGE_SIZE, > - migrate_compress_level()); > + blen = qemu_put_compression_data(f, stream, p, TARGET_PAGE_SIZE); > if (blen < 0) { > bytes_sent = 0; > qemu_file_set_error(migrate_get_current()->to_dst_file, blen); > @@ -2209,9 +2228,14 @@ static int ram_save_setup(QEMUFile *f, void *opaque) > RAMState **rsp = opaque; > RAMBlock *block; > > + if (compress_threads_save_setup()) { > + return -1; > + } > + > /* migration has already setup the bitmap, reuse it. */ > if (!migration_in_colo_state()) { > if (ram_init_all(rsp) != 0) { > + compress_threads_save_cleanup(); > return -1; > } > } > @@ -2231,7 +2255,6 @@ static int ram_save_setup(QEMUFile *f, void *opaque) > } > > rcu_read_unlock(); > - compress_threads_save_setup(); > > ram_control_before_iterate(f, RAM_CONTROL_SETUP); > ram_control_after_iterate(f, RAM_CONTROL_SETUP); > @@ -2495,6 +2518,30 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size) > } > } > > +/* return the size after decompression, or negative value on error */ > +static int qemu_uncompress(z_stream *stream, uint8_t *dest, size_t dest_len, > + uint8_t *source, size_t source_len) > +{ > + int err; > + > + err = inflateReset(stream); > + if (err != Z_OK) { > + return -1; > + } > + > + stream->avail_in = source_len; > + stream->next_in = source; > + stream->avail_out = dest_len; > + stream->next_out = dest; > + > + err = inflate(stream, Z_NO_FLUSH); > + if (err != Z_STREAM_END) { > + return -1; > + } > + > + return stream->total_out; > +} > + > static void *do_data_decompress(void *opaque) > { > DecompressParam *param = opaque; > @@ -2511,13 +2558,13 @@ static void *do_data_decompress(void *opaque) > qemu_mutex_unlock(¶m->mutex); > > pagesize = TARGET_PAGE_SIZE; > - /* uncompress() will return failed in some case, especially > + /* qemu_uncompress() will return failed in some case, especially > * when the page is dirted when doing the compression, it's > * not a problem because the dirty page will be retransferred > * and uncompress() won't break the data in other pages. > */ > - uncompress((Bytef *)des, &pagesize, > - (const Bytef *)param->compbuf, len); > + qemu_uncompress(¶m->stream, des, pagesize, > + param->compbuf, len); > > qemu_mutex_lock(&decomp_done_lock); > param->done = true; > @@ -2552,30 +2599,6 @@ static void wait_for_decompress_done(void) > qemu_mutex_unlock(&decomp_done_lock); > } > > -static void compress_threads_load_setup(void) > -{ > - int i, thread_count; > - > - if (!migrate_use_compression()) { > - return; > - } > - thread_count = migrate_decompress_threads(); > - decompress_threads = g_new0(QemuThread, thread_count); > - decomp_param = g_new0(DecompressParam, thread_count); > - qemu_mutex_init(&decomp_done_lock); > - qemu_cond_init(&decomp_done_cond); > - for (i = 0; i < thread_count; i++) { > - qemu_mutex_init(&decomp_param[i].mutex); > - qemu_cond_init(&decomp_param[i].cond); > - decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE)); > - decomp_param[i].done = true; > - decomp_param[i].quit = false; > - qemu_thread_create(decompress_threads + i, "decompress", > - do_data_decompress, decomp_param + i, > - QEMU_THREAD_JOINABLE); > - } > -} > - > static void compress_threads_load_cleanup(void) > { > int i, thread_count; > @@ -2585,16 +2608,26 @@ static void compress_threads_load_cleanup(void) > } > thread_count = migrate_decompress_threads(); > for (i = 0; i < thread_count; i++) { > + if (!decomp_param[i].stream.opaque) { > + break; > + } > + > qemu_mutex_lock(&decomp_param[i].mutex); > decomp_param[i].quit = true; > qemu_cond_signal(&decomp_param[i].cond); > qemu_mutex_unlock(&decomp_param[i].mutex); > } > for (i = 0; i < thread_count; i++) { > + if (!decomp_param[i].stream.opaque) { > + break; > + } > + > qemu_thread_join(decompress_threads + i); > qemu_mutex_destroy(&decomp_param[i].mutex); > qemu_cond_destroy(&decomp_param[i].cond); > g_free(decomp_param[i].compbuf); > + inflateEnd(&decomp_param[i].stream); > + decomp_param[i].stream.opaque = NULL; > } > g_free(decompress_threads); > g_free(decomp_param); > @@ -2602,6 +2635,40 @@ static void compress_threads_load_cleanup(void) > decomp_param = NULL; > } > > +static int compress_threads_load_setup(void) > +{ > + int i, thread_count; > + > + if (!migrate_use_compression()) { > + return 0; > + } > + > + thread_count = migrate_decompress_threads(); > + decompress_threads = g_new0(QemuThread, thread_count); > + decomp_param = g_new0(DecompressParam, thread_count); > + qemu_mutex_init(&decomp_done_lock); > + qemu_cond_init(&decomp_done_cond); > + for (i = 0; i < thread_count; i++) { > + if (inflateInit(&decomp_param[i].stream) != Z_OK) { > + goto exit; > + } > + decomp_param[i].stream.opaque = &decomp_param[i]; > + > + qemu_mutex_init(&decomp_param[i].mutex); > + qemu_cond_init(&decomp_param[i].cond); > + decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE)); > + decomp_param[i].done = true; > + decomp_param[i].quit = false; > + qemu_thread_create(decompress_threads + i, "decompress", > + do_data_decompress, decomp_param + i, > + QEMU_THREAD_JOINABLE); > + } > + return 0; > +exit: > + compress_threads_load_cleanup(); I don't think this is safe; if inflateInit(..) fails in not-the-last thread, compress_threads_load_cleanup() will try and destroy all the mutex's and condition variables, even though they've not yet all been _init'd. However, other than that I think the patch is OK; a chat with Dan Berrange has convinced me this probably doesn't affect the stream format, so that's OK. One thing I would like is a comment as to how the 'opaque' field is being used; I don't think I quite understand what you're doing there. Dave > + return -1; > +} > + > static void decompress_data_with_multi_threads(QEMUFile *f, > void *host, int len) > { > @@ -2641,8 +2708,11 @@ static void decompress_data_with_multi_threads(QEMUFile *f, > */ > static int ram_load_setup(QEMUFile *f, void *opaque) > { > + if (compress_threads_load_setup()) { > + return -1; > + } > + > xbzrle_load_setup(); > - compress_threads_load_setup(); > ramblock_recv_map_init(); > return 0; > } > -- > 2.14.3 > > -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK