All of lore.kernel.org
 help / color / mirror / Atom feed
From: Juan Quintela <quintela@redhat.com>
To: Liang Li <liang.z.li@intel.com>
Cc: armbru@redhat.com, qemu-devel@nongnu.org,
	Yang Zhang <yang.z.zhang@intel.com>,
	amit.shah@redhat.com, lcapitulino@redhat.com,
	dgilbert@redhat.com
Subject: Re: [Qemu-devel] [v5 08/12] migration: Add the core code of multi-thread compression
Date: Wed, 11 Feb 2015 12:44:56 +0100	[thread overview]
Message-ID: <87bnl0brk7.fsf@neno.neno> (raw)
In-Reply-To: <1423623986-590-9-git-send-email-liang.z.li@intel.com> (Liang Li's message of "Wed, 11 Feb 2015 11:06:22 +0800")

Liang Li <liang.z.li@intel.com> wrote:
> Implement the core logic of the multiple thread compression. At this
> point, multiple thread compression can't co-work with xbzrle yet.
>
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>


> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -363,18 +363,44 @@ static QemuMutex *comp_done_lock;
>  static QemuCond *comp_done_cond;
>  /* The empty QEMUFileOps will be used by file in CompressParam */
>  static const QEMUFileOps empty_ops = { };
> +
> +/* one_byte_count is used to count the bytes that is added to
> + * bytes_transferred but not actually transferred, at the proper
> + * time, we should sub one_byte_count from bytes_transferred to
> + * make bytes_transferred accurate.
> + */
> +static int one_byte_count;

With the changes proposed previously to ram_save_compressed_page() this
shouldn't be needed.  It can return 0 now.

> +static int do_compress_ram_page(CompressParam *param);
> +
>  static void *do_data_compress(void *opaque)
>  {
> -    while (!quit_comp_thread) {
> -
> -    /* To be done */
> +    CompressParam *param = opaque;
>  
> +    while (!quit_comp_thread) {
> +        qemu_mutex_lock(&param->mutex);
> +        /* Re-check the quit_comp_thread in case of
> +         * terminate_compression_threads is called just before
> +         * qemu_mutex_lock(&param->mutex) and after
> +         * while(!quit_comp_thread), re-check it here can make
> +         * sure the compression thread terminate as expected.
> +         */
> +        while (!param->busy && !quit_comp_thread) {
> +            qemu_cond_wait(&param->cond, &param->mutex);
> +        }
> +        qemu_mutex_unlock(&param->mutex);
> +        if (!quit_comp_thread) {
> +            do_compress_ram_page(param);
> +        }
> +        qemu_mutex_lock(comp_done_lock);
> +        param->busy = false;
> +        qemu_cond_signal(comp_done_cond);
> +        qemu_mutex_unlock(comp_done_lock);
>      }
>  
>      return NULL;
> @@ -382,9 +408,15 @@ static void *do_data_compress(void *opaque)
>  
>  static inline void terminate_compression_threads(void)
>  {
> -    quit_comp_thread = true;
> +    int idx, thread_count;
>  
> -    /* To be done */
> +    thread_count = migrate_compress_threads();
> +    quit_comp_thread = true;
> +    for (idx = 0; idx < thread_count; idx++) {
> +        qemu_mutex_lock(&comp_param[idx].mutex);
> +        qemu_cond_signal(&comp_param[idx].cond);
> +        qemu_mutex_unlock(&comp_param[idx].mutex);
> +    }
>  }
>  
>  void migrate_compress_threads_join(MigrationState *s)
> @@ -770,12 +802,157 @@ static int ram_save_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset,
>      return bytes_sent;
>  }
>  
> +static int do_compress_ram_page(CompressParam *param)
> +{
> +    int bytes_sent, cont;
> +    int blen;
> +    uint8_t *p;
> +    RAMBlock *block = param->block;
> +    ram_addr_t offset = param->offset;
> +
> +    cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
> +    p = memory_region_get_ram_ptr(block->mr) + offset;
> +
> +    bytes_sent = save_block_hdr(param->file, block, offset, cont,
> +                                RAM_SAVE_FLAG_COMPRESS_PAGE);
> +    blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
> +                                     migrate_compress_level());
> +    bytes_sent += blen;
> +    atomic_inc(&acct_info.norm_pages);
> +
> +    return bytes_sent;
> +}
> +
> +static inline void start_compression(CompressParam *param)
> +{
> +    qemu_mutex_lock(&param->mutex);
> +    param->busy = true;
> +    qemu_cond_signal(&param->cond);
> +    qemu_mutex_unlock(&param->mutex);
> +}
> +
> +
> +static uint64_t bytes_transferred;
> +
> +static void flush_compressed_data(QEMUFile *f)
> +{
> +    int idx, len, thread_count;
> +
> +    if (!migrate_use_compression()) {
> +        return;
> +    }
> +    thread_count = migrate_compress_threads();
> +    for (idx = 0; idx < thread_count; idx++) {
> +        if (comp_param[idx].busy) {
> +            qemu_mutex_lock(comp_done_lock);
> +            while (comp_param[idx].busy && !quit_comp_thread) {
> +                qemu_cond_wait(comp_done_cond, comp_done_lock);
> +            }
> +            qemu_mutex_unlock(comp_done_lock);
> +        }

If we arrive here because quit_comp_thread == true, shouldn't we skip
the qemu_put_qemu_file()?

> +        len = qemu_put_qemu_file(f, comp_param[idx].file);
> +        bytes_transferred += len;
> +    }

[remove one_byte stuff here]

> +}
> +
> +static inline void set_compress_params(CompressParam *param, RAMBlock *block,
> +                                       ram_addr_t offset)
> +{
> +    param->block = block;
> +    param->offset = offset;
> +}
> +
> +static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
> +                                           ram_addr_t offset)
> +{
> +    int idx, thread_count, bytes_sent = 0;
> +
> +    thread_count = migrate_compress_threads();
> +    qemu_mutex_lock(comp_done_lock);
> +    while (true) {
> +        for (idx = 0; idx < thread_count; idx++) {
> +            if (!comp_param[idx].busy) {
> +                bytes_sent = qemu_put_qemu_file(f, comp_param[idx].file);
> +                set_compress_params(&comp_param[idx], block, offset);
> +                start_compression(&comp_param[idx]);

[remove stuff here]

> +                break;
> +            }
> +        }
> +        if (bytes_sent > 0) {

Change this to:
          if (bytes_sent >= 0) {

> +            break;
> +        } else {
> +            qemu_cond_wait(comp_done_cond, comp_done_lock);
> +        }
> +    }
> +    qemu_mutex_unlock(comp_done_lock);
> +
> +    return bytes_sent;
> +}
> +
>  static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
>                                      ram_addr_t offset, bool last_stage)
>  {
>      int bytes_sent = -1;
> +    MemoryRegion *mr = block->mr;
> +    uint8_t *p;
> +    int ret;
> +    int cont;
>  
> -    /* To be done*/
> +    p = memory_region_get_ram_ptr(mr) + offset;
> +    cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
> +    ret = ram_control_save_page(f, block->offset,
> +                                offset, TARGET_PAGE_SIZE, &bytes_sent);
> +    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
> +        if (ret != RAM_SAVE_CONTROL_DELAYED) {
> +            if (bytes_sent > 0) {
> +                acct_info.norm_pages++;
> +            } else if (bytes_sent == 0) {
> +                acct_info.dup_pages++;
> +            }
> +        }
> +    } else {
> +        /* When starting the process of a new block, the first page of
> +         * the block should be sent out before other pages in the same
> +         * block, and all the pages in last block should have been sent
> +         * out, keeping this order is important, because the 'cont' flag
> +         * is used to avoid resending the block name.
> +         */
> +        if (block != last_sent_block) {
> +            flush_compressed_data(f);
> +            bytes_sent = save_zero_page(f, block, offset, p, cont);
> +            if (bytes_sent == -1) {
> +                set_compress_params(&comp_param[0], block, offset);
> +                /* Use the qemu thread to compress the data to make sure the
> +                 * first page is sent out before other pages
> +                 */
> +                bytes_sent = do_compress_ram_page(&comp_param[0]);
> +                if (bytes_sent > 0) {

This test is not needed

assert(bytes_sent>0)

or how can it be zero or negative here?  So, we have to always call
qemu_put_qemu_file() no?

> +                    qemu_put_qemu_file(f, comp_param[0].file);
> +                }
> +            }
> +        } else {
> +            bytes_sent = save_zero_page(f, block, offset, p, cont);
> +            if (bytes_sent == -1) {
> +                bytes_sent = compress_page_with_multi_thread(f, block, offset);
> +            }
> +        }
> +    }
>  
>      return bytes_sent;
>  }
> @@ -834,8 +1011,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
>      return bytes_sent;
>  }
>  
> -static uint64_t bytes_transferred;
> -
>  void acct_update_position(QEMUFile *f, size_t size, bool zero)
>  {
>      uint64_t pages = size / TARGET_PAGE_SIZE;
> @@ -1043,6 +1218,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
>          i++;
>      }
>  
> +    flush_compressed_data(f);
>      qemu_mutex_unlock_ramlist();
>  
>      /*
> @@ -1089,6 +1265,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>          bytes_transferred += bytes_sent;
>      }
>  
> +    flush_compressed_data(f);
>      ram_control_after_iterate(f, RAM_CONTROL_FINISH);
>      migration_end();


I thihnk this would make the code work, but not the locking.  You are
using here:

quit_comp_thread:  global, and not completely clear what protects it
comp_done_lock: global
comp_done_cond: global

param[i].busy: I would suggest renaming to pending work
param[i].mutex:
param[i].cond:
       thread is waiting for work


Issues:

param->busy is protected on do_data_compress() and start_compression()
with param->busy, but in flush_compressed_data() and
comress_page_with_multithread() it is protected by
comp_done_lock.

At this point, I would suggest to just drop param[i].mutex and use
everywhere comp_done_lock.  We can make locking granularly later if
needed, but 1st get it correct?

Code basically does (forget termination and locking)

each compression_thread()

  while(1) {
     while(!work_to_do)
        wait_for_work
     do_work
  }

And the main thread does:


while(1) {
     foreacth compression_thread {
          if thread free {
             put it to work
             break;
          }
          wait_for_thread_to_finish
     }
}

Notice how we are walking all threads each time that we need to do anything

Perhaps code should be more simple if we put the data that needs to be
done on a global variable and change this to:

compression_thread

  while(1) {
     while(!work_to_do)
        wait_for_work
     pick work from global variable
     wakeup main thread
     do_work
  }

main thread:

put work on global variable
while(nobody_pick_thework) {
     signal all threads
     wait for a compression thread to take the work
}

Why?  because then we only have a global mutex and two condition
variables, with a clear semantics:
- lock protects two conditions and global variable with work
- one condition where threads wait for work
- one condition where main thread wait for a worker to be ready

As we would need to lock every single tame to put the work in the global
variable, to wait or to pick up the work, we can stop all the:

if (!foo) {
    mutex_lock
    if(!foo) /* this time with lock */
        ....
}


Sorry for the very long mail, if it makes you feel better, this is the
second time that I wrote it, because the 1st version, my locking
proposal didn't worked correctly.

What do you think?

Later, Juan.

  reply	other threads:[~2015-02-11 11:45 UTC|newest]

Thread overview: 43+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2015-02-11  3:06 [Qemu-devel] [PATCH v5 0/12] migration: Add a new feature to do live migration Liang Li
2015-02-11  3:06 ` [Qemu-devel] [v5 01/12] docs: Add a doc about multiple thread compression Liang Li
2015-02-11  8:46   ` Juan Quintela
2015-02-11  3:06 ` [Qemu-devel] [v5 02/12] migration: Add the framework of multi-thread compression Liang Li
2015-02-11  8:52   ` Juan Quintela
2015-02-11  8:55   ` Juan Quintela
2015-02-11 11:10   ` Juan Quintela
2015-02-12  7:24     ` Li, Liang Z
2015-02-12 12:31       ` Juan Quintela
2015-02-11  3:06 ` [Qemu-devel] [v5 03/12] migration: Add the framework of multi-thread decompression Liang Li
2015-02-11  8:52   ` Juan Quintela
2015-02-11  3:06 ` [Qemu-devel] [v5 04/12] qemu-file: Add compression functions to QEMUFile Liang Li
2015-02-12 12:06   ` Juan Quintela
2015-02-11  3:06 ` [Qemu-devel] [v5 05/12] arch_init: Alloc and free data struct for compression Liang Li
2015-02-11  9:03   ` Juan Quintela
2015-02-12  5:32     ` Li, Liang Z
2015-02-12 12:05       ` Juan Quintela
2015-02-11  3:06 ` [Qemu-devel] [v5 06/12] arch_init: Add and free data struct for decompression Liang Li
2015-02-11  9:04   ` Juan Quintela
2015-02-11  3:06 ` [Qemu-devel] [v5 07/12] migration: Split the function ram_save_page Liang Li
2015-02-11  9:08   ` Juan Quintela
2015-02-11 11:02   ` Juan Quintela
2015-02-11  3:06 ` [Qemu-devel] [v5 08/12] migration: Add the core code of multi-thread compression Liang Li
2015-02-11 11:44   ` Juan Quintela [this message]
2015-02-12  7:43     ` Li, Liang Z
2015-03-02  2:46     ` Li, Liang Z
2015-02-11  3:06 ` [Qemu-devel] [v5 09/12] migration: Make compression co-work with xbzrle Liang Li
2015-02-11 11:46   ` Juan Quintela
2015-02-12  2:24     ` Li, Liang Z
2015-02-12 12:22       ` Juan Quintela
2015-02-12 15:10         ` Li, Liang Z
2015-02-11  3:06 ` [Qemu-devel] [v5 10/12] migration: Add the core code for decompression Liang Li
2015-02-11 11:48   ` Juan Quintela
2015-02-11  3:06 ` [Qemu-devel] [v5 11/12] migration: Add interface to control compression Liang Li
2015-02-11  3:06 ` [Qemu-devel] [v5 12/12] migration: Add commands to set and query parameter Liang Li
2015-02-11 11:53   ` Juan Quintela
2015-03-11 16:57   ` Dr. David Alan Gilbert
2015-03-12 10:30   ` Markus Armbruster
2015-03-19  2:30     ` Li, Liang Z
2015-03-19  7:49       ` Markus Armbruster
2015-03-19 14:21         ` Li, Liang Z
2015-03-20  5:16           ` Li, Liang Z
2015-03-19 14:33       ` Eric Blake

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=87bnl0brk7.fsf@neno.neno \
    --to=quintela@redhat.com \
    --cc=amit.shah@redhat.com \
    --cc=armbru@redhat.com \
    --cc=dgilbert@redhat.com \
    --cc=lcapitulino@redhat.com \
    --cc=liang.z.li@intel.com \
    --cc=qemu-devel@nongnu.org \
    --cc=yang.z.zhang@intel.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.