All of lore.kernel.org
 help / color / mirror / Atom feed
From: Juan Quintela <quintela@redhat.com>
To: Liang Li <liang.z.li@intel.com>
Cc: qemu-devel@nongnu.org, lcapitulino@redhat.com,
	yang.z.zhang@intel.com, amit.shah@redhat.com,
	dgilbert@redhat.com
Subject: Re: [Qemu-devel] [v6 08/14] migration: Add the core code of multi-thread compression
Date: Wed, 25 Mar 2015 12:47:57 +0100	[thread overview]
Message-ID: <87k2y51eoi.fsf@neno.neno> (raw)
In-Reply-To: <1427099549-10633-9-git-send-email-liang.z.li@intel.com> (Liang Li's message of "Mon, 23 Mar 2015 16:32:23 +0800")

Liang Li <liang.z.li@intel.com> wrote:
> Implement the core logic of the multiple thread compression. At this
> point, multiple thread compression can't co-work with xbzrle yet.
>
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>

> ---
>  arch_init.c | 184 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
>  1 file changed, 177 insertions(+), 7 deletions(-)
>
> diff --git a/arch_init.c b/arch_init.c
> index 48cae22..9f63c0f 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -355,12 +355,33 @@ static DecompressParam *decomp_param;
>  static QemuThread *decompress_threads;
>  static uint8_t *compressed_data_buf;
>  
> +static int do_compress_ram_page(CompressParam *param);
> +
>  static void *do_data_compress(void *opaque)
>  {
> -    while (!quit_comp_thread) {
> +    CompressParam *param = opaque;
>  
> -    /* To be done */
What is the different with changing this loop to:


> +    while (!quit_comp_thread) {

Here we don't have quit_comp_thread protected by anything.

> +        qemu_mutex_lock(&param->mutex);
> +        /* Re-check the quit_comp_thread in case of
> +         * terminate_compression_threads is called just before
> +         * qemu_mutex_lock(&param->mutex) and after
> +         * while(!quit_comp_thread), re-check it here can make
> +         * sure the compression thread terminate as expected.
> +         */
> +        while (!param->start && !quit_comp_thread) {

Here and next use is protected by param->mutex, but param is per
compression thread, so, it is not really protected.

> +            qemu_cond_wait(&param->cond, &param->mutex);
> +        }
> +        if (!quit_comp_thread) {
> +            do_compress_ram_page(param);
> +        }
> +        param->start = false;

param->start is pretected by param->mutex everywhere

> +        qemu_mutex_unlock(&param->mutex);
>  
> +        qemu_mutex_lock(comp_done_lock);
> +        param->done = true;

param->done protected by comp_done_lock

> +        qemu_cond_signal(comp_done_cond);
> +        qemu_mutex_unlock(comp_done_lock);
>      }
>  
>      return NULL;
> @@ -368,9 +389,15 @@ static void *do_data_compress(void *opaque)
>  
>  static inline void terminate_compression_threads(void)
>  {
> -    quit_comp_thread = true;
> +    int idx, thread_count;
>  
> -    /* To be done */
> +    thread_count = migrate_compress_threads();
> +    quit_comp_thread = true;

quite_comp_thread not protected again.

> +    for (idx = 0; idx < thread_count; idx++) {
> +        qemu_mutex_lock(&comp_param[idx].mutex);
> +        qemu_cond_signal(&comp_param[idx].cond);
> +        qemu_mutex_unlock(&comp_param[idx].mutex);
> +    }
>  }
>  
>  void migrate_compress_threads_join(void)
> @@ -420,6 +447,7 @@ void migrate_compress_threads_create(void)
>           * it's ops to empty.
>           */
>          comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
> +        comp_param[i].done = true;
>          qemu_mutex_init(&comp_param[i].mutex);
>          qemu_cond_init(&comp_param[i].cond);
>          qemu_thread_create(compress_threads + i, "compress",
> @@ -829,6 +857,97 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
>      return pages;
>  }
>  
> +static int do_compress_ram_page(CompressParam *param)
> +{
> +    int bytes_sent, blen;
> +    uint8_t *p;
> +    RAMBlock *block = param->block;
> +    ram_addr_t offset = param->offset;
> +
> +    p = memory_region_get_ram_ptr(block->mr) + (offset & TARGET_PAGE_MASK);
> +
> +    bytes_sent = save_page_header(param->file, block, offset |
> +                                  RAM_SAVE_FLAG_COMPRESS_PAGE);
> +    blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
> +                                     migrate_compress_level());
> +    bytes_sent += blen;
> +    atomic_inc(&acct_info.norm_pages);
> +
> +    return bytes_sent;
> +}
> +
> +static inline void start_compression(CompressParam *param)
> +{
> +    param->done = false;

Not protected (well, its caller have protected it by comp_done_lock.

> +    qemu_mutex_lock(&param->mutex);
> +    param->start = true;
> +    qemu_cond_signal(&param->cond);
> +    qemu_mutex_unlock(&param->mutex);
> +}
> +
> +
> +static uint64_t bytes_transferred;
> +
> +static void flush_compressed_data(QEMUFile *f)
> +{
> +    int idx, len, thread_count;
> +
> +    if (!migrate_use_compression()) {
> +        return;
> +    }
> +    thread_count = migrate_compress_threads();
> +    for (idx = 0; idx < thread_count; idx++) {
> +        if (!comp_param[idx].done) {

done is not protected here.

> +            qemu_mutex_lock(comp_done_lock);
> +            while (!comp_param[idx].done && !quit_comp_thread) {


Now, it is under comp_done_lock.  Bun none of its other uses is
protected by it.

And here done is proteced by comp_done_cond


> +                qemu_cond_wait(comp_done_cond, comp_done_lock);
> +            }
> +            qemu_mutex_unlock(comp_done_lock);
> +        }
> +        if (!quit_comp_thread) {

Here, it is unprotected again.

> +            len = qemu_put_qemu_file(f, comp_param[idx].file);
> +            bytes_transferred += len;
> +        }
> +    }
> +}
> +
> +static inline void set_compress_params(CompressParam *param, RAMBlock *block,
> +                                       ram_addr_t offset)
> +{
> +    param->block = block;
> +    param->offset = offset;
> +}
> +
> +static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
> +                                           ram_addr_t offset,
> +                                           uint64_t *bytes_transferred)
> +{
> +    int idx, thread_count, bytes_xmit = -1, pages = -1;
> +
> +    thread_count = migrate_compress_threads();
> +    qemu_mutex_lock(comp_done_lock);
> +    while (true) {
> +        for (idx = 0; idx < thread_count; idx++) {
> +            if (comp_param[idx].done) {
> +                bytes_xmit = qemu_put_qemu_file(f, comp_param[idx].file);
> +                set_compress_params(&comp_param[idx], block, offset);
> +                start_compression(&comp_param[idx]);
> +                pages = 1;
> +                *bytes_transferred += bytes_xmit;
> +                break;
> +            }
> +        }
> +        if (pages > 0) {
> +            break;
> +        } else {
> +            qemu_cond_wait(comp_done_cond, comp_done_lock);
> +        }
> +    }
> +    qemu_mutex_unlock(comp_done_lock);
> +
> +    return pages;
> +}
> +
>  /**
>   * ram_save_compressed_page: compress the given page and send it to the stream
>   *
> @@ -845,8 +964,59 @@ static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
>                                      uint64_t *bytes_transferred)
>  {
>      int pages = -1;
> +    uint64_t bytes_xmit;
> +    MemoryRegion *mr = block->mr;
> +    uint8_t *p;
> +    int ret;
> +
> +    p = memory_region_get_ram_ptr(mr) + offset;
>  
> -    /* To be done*/
> +    bytes_xmit = 0;
> +    ret = ram_control_save_page(f, block->offset,
> +                                offset, TARGET_PAGE_SIZE, &bytes_xmit);
> +    if (bytes_xmit) {
> +        *bytes_transferred += bytes_xmit;
> +        pages = 1;
> +    }
> +    if (block == last_sent_block) {
> +        offset |= RAM_SAVE_FLAG_CONTINUE;
> +    }
> +    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
> +        if (ret != RAM_SAVE_CONTROL_DELAYED) {
> +            if (bytes_xmit > 0) {
> +                acct_info.norm_pages++;
> +            } else if (bytes_xmit == 0) {
> +                acct_info.dup_pages++;
> +            }
> +        }
> +    } else {
> +        /* When starting the process of a new block, the first page of
> +         * the block should be sent out before other pages in the same
> +         * block, and all the pages in last block should have been sent
> +         * out, keeping this order is important, because the 'cont' flag
> +         * is used to avoid resending the block name.
> +         */
> +        if (block != last_sent_block) {
> +            flush_compressed_data(f);
> +            pages = save_zero_page(f, block, offset, p, bytes_transferred);
> +            if (pages == -1) {
> +                set_compress_params(&comp_param[0], block, offset);
> +                /* Use the qemu thread to compress the data to make sure the
> +                 * first page is sent out before other pages
> +                 */
> +                bytes_xmit = do_compress_ram_page(&comp_param[0]);
> +                qemu_put_qemu_file(f, comp_param[0].file);
> +                *bytes_transferred += bytes_xmit;
> +                pages = 1;
> +            }
> +        } else {
> +            pages = save_zero_page(f, block, offset, p, bytes_transferred);
> +            if (pages == -1) {
> +                pages = compress_page_with_multi_thread(f, block, offset,
> +                                                        bytes_transferred);
> +            }
> +        }
> +    }
>  
>      return pages;
>  }
> @@ -914,8 +1084,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage,
>      return pages;
>  }
>  
> -static uint64_t bytes_transferred;
> -
>  void acct_update_position(QEMUFile *f, size_t size, bool zero)
>  {
>      uint64_t pages = size / TARGET_PAGE_SIZE;
> @@ -1129,6 +1297,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
>          }
>          i++;
>      }
> +    flush_compressed_data(f);
>      rcu_read_unlock();
>  
>      /*
> @@ -1170,6 +1339,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>          }
>      }
>  
> +    flush_compressed_data(f);
>      ram_control_after_iterate(f, RAM_CONTROL_FINISH);
>      migration_end();

  reply	other threads:[~2015-03-25 11:48 UTC|newest]

Thread overview: 24+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2015-03-23  8:32 [Qemu-devel] [PATCH v6 0/14] migration: Add a new feature to do live migration Liang Li
2015-03-23  8:32 ` [Qemu-devel] [v6 01/14] docs: Add a doc about multiple thread compression Liang Li
2015-03-23  8:32 ` [Qemu-devel] [v6 02/14] migration: Add the framework of multi-thread compression Liang Li
2015-03-23  8:32 ` [Qemu-devel] [v6 03/14] migration: Add the framework of multi-thread decompression Liang Li
2015-03-23  8:32 ` [Qemu-devel] [v6 04/14] qemu-file: Add compression functions to QEMUFile Liang Li
2015-03-23  8:32 ` [Qemu-devel] [v6 05/14] arch_init: Alloc and free data struct for compression Liang Li
2015-03-23  8:32 ` [Qemu-devel] [v6 06/14] arch_init: Add and free data struct for decompression Liang Li
2015-03-23  8:32 ` [Qemu-devel] [v6 07/14] migration: Split save_zero_page from ram_save_page Liang Li
2015-03-23  8:32 ` [Qemu-devel] [v6 08/14] migration: Add the core code of multi-thread compression Liang Li
2015-03-25 11:47   ` Juan Quintela [this message]
2015-03-26  2:37     ` Li, Liang Z
2015-03-26 10:27       ` Juan Quintela
2015-03-27  2:59         ` Li, Liang Z
2015-03-27 10:47   ` Juan Quintela
2015-03-28  6:11     ` Li, Liang Z
2015-03-23  8:32 ` [Qemu-devel] [v6 09/14] migration: Make compression co-work with xbzrle Liang Li
2015-03-23  8:32 ` [Qemu-devel] [v6 10/14] migration: Add the core code for decompression Liang Li
2015-03-25 11:56   ` Juan Quintela
2015-03-26  2:46     ` Li, Liang Z
2015-03-23  8:32 ` [Qemu-devel] [v6 11/14] migration: Add interface to control compression Liang Li
2015-03-23  8:32 ` [Qemu-devel] [v6 12/14] migration: Use an array instead of 3 parameters Liang Li
2015-03-23  8:32 ` [Qemu-devel] [v6 13/14] migration: Add qmp commands to set and query parameters Liang Li
2015-03-23  8:32 ` [Qemu-devel] [v6 14/14] migration: Add hmp interface " Liang Li
2015-04-02  1:16 ` [Qemu-devel] [PATCH v6 0/14] migration: Add a new feature to do live migration Li, Liang Z

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=87k2y51eoi.fsf@neno.neno \
    --to=quintela@redhat.com \
    --cc=amit.shah@redhat.com \
    --cc=dgilbert@redhat.com \
    --cc=lcapitulino@redhat.com \
    --cc=liang.z.li@intel.com \
    --cc=qemu-devel@nongnu.org \
    --cc=yang.z.zhang@intel.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.