All of lore.kernel.org
 help / color / mirror / Atom feed
From: "Dr. David Alan Gilbert" <dgilbert@redhat.com>
To: Juan Quintela <quintela@redhat.com>
Cc: qemu-devel@nongnu.org, lvivier@redhat.com, peterx@redhat.com
Subject: Re: [Qemu-devel] [PATCH v12 16/21] migration: Synchronize multifd threads with main thread
Date: Thu, 3 May 2018 11:44:38 +0100	[thread overview]
Message-ID: <20180503104437.GC2660@work-vm> (raw)
In-Reply-To: <20180425112723.1111-17-quintela@redhat.com>

* Juan Quintela (quintela@redhat.com) wrote:
> We synchronize all threads each RAM_SAVE_FLAG_EOS.  Bitmap
> synchronizations don't happen inside a  ram section, so we are safe
> about two channels trying to overwrite the same memory.

OK, that's quite neat - so you don't need any extra flags in the stream
to do the sync;  it probably needs a comment in the code somewhere so we
don't forget!


> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c        | 118 +++++++++++++++++++++++++++++++++++++----
>  migration/trace-events |   6 +++
>  2 files changed, 113 insertions(+), 11 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index c4c185cc4c..398cb0af3b 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -405,6 +405,8 @@ static void compress_threads_save_setup(void)
>  #define MULTIFD_MAGIC 0x11223344U
>  #define MULTIFD_VERSION 1
>  
> +#define MULTIFD_FLAG_SYNC (1 << 0)
> +
>  typedef struct {
>      uint32_t magic;
>      uint32_t version;
> @@ -471,6 +473,8 @@ typedef struct {
>      uint32_t num_packets;
>      /* pages sent through this channel */
>      uint32_t num_pages;
> +    /* syncs main thread and channels */
> +    QemuSemaphore sem_sync;
>  }  MultiFDSendParams;
>  
>  typedef struct {
> @@ -507,6 +511,8 @@ typedef struct {
>      uint32_t num_packets;
>      /* pages sent through this channel */
>      uint32_t num_pages;
> +    /* syncs main thread and channels */
> +    QemuSemaphore sem_sync;
>  } MultiFDRecvParams;
>  
>  static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
> @@ -682,6 +688,10 @@ struct {
>      int count;
>      /* array of pages to sent */
>      MultiFDPages_t *pages;
> +    /* syncs main thread and channels */
> +    QemuSemaphore sem_sync;
> +    /* global number of generated multifd packets */
> +    uint32_t seq;

It's interesting you use the same comment for 'seq' in
MultiFDSendParams - but I guess that means only this one is the global
version and the others aren't really global number - they're just
local to that thread?

>  } *multifd_send_state;
>  
>  static void multifd_send_terminate_threads(Error *err)
> @@ -727,6 +737,7 @@ int multifd_save_cleanup(Error **errp)
>          p->c = NULL;
>          qemu_mutex_destroy(&p->mutex);
>          qemu_sem_destroy(&p->sem);
> +        qemu_sem_destroy(&p->sem_sync);
>          g_free(p->name);
>          p->name = NULL;
>          multifd_pages_clear(p->pages);
> @@ -735,6 +746,7 @@ int multifd_save_cleanup(Error **errp)
>          g_free(p->packet);
>          p->packet = NULL;
>      }
> +    qemu_sem_destroy(&multifd_send_state->sem_sync);
>      g_free(multifd_send_state->params);
>      multifd_send_state->params = NULL;
>      multifd_pages_clear(multifd_send_state->pages);
> @@ -744,6 +756,33 @@ int multifd_save_cleanup(Error **errp)
>      return ret;
>  }
>  
> +static void multifd_send_sync_main(void)
> +{
> +    int i;
> +
> +    if (!migrate_use_multifd()) {
> +        return;
> +    }
> +    for (i = 0; i < migrate_multifd_channels(); i++) {
> +        MultiFDSendParams *p = &multifd_send_state->params[i];
> +
> +        trace_multifd_send_sync_main_signal(p->id);
> +
> +        qemu_mutex_lock(&p->mutex);
> +        p->flags |= MULTIFD_FLAG_SYNC;
> +        p->pending_job++;
> +        qemu_mutex_unlock(&p->mutex);
> +        qemu_sem_post(&p->sem);
> +    }
> +    for (i = 0; i < migrate_multifd_channels(); i++) {
> +        MultiFDSendParams *p = &multifd_send_state->params[i];
> +
> +        trace_multifd_send_sync_main_wait(p->id);
> +        qemu_sem_wait(&multifd_send_state->sem_sync);
> +    }
> +    trace_multifd_send_sync_main(multifd_send_state->seq);
> +}
> +

OK, so this just makes each of the sending threads ack, so that seems
OK.
But what happens with an error? multifd_send_sync_main exits it's
loop with a 'break' if the writes fail, and that could mean they never
come and post the flag-sync sem.

>  static void *multifd_send_thread(void *opaque)
>  {
>      MultiFDSendParams *p = opaque;
> @@ -778,17 +817,20 @@ static void *multifd_send_thread(void *opaque)
>              /* ToDo: send packet here */
>  
>              qemu_mutex_lock(&p->mutex);
> +            p->flags = 0;
>              p->pending_job--;
>              qemu_mutex_unlock(&p->mutex);
> -            continue;
> +
> +            if (flags & MULTIFD_FLAG_SYNC) {
> +                qemu_sem_post(&multifd_send_state->sem_sync);
> +            }
>          } else if (p->quit) {
>              qemu_mutex_unlock(&p->mutex);
>              break;
> +        } else {
> +            qemu_mutex_unlock(&p->mutex);
> +            /* sometimes there are spurious wakeups */
>          }
> -        qemu_mutex_unlock(&p->mutex);
> -        /* this is impossible */
> -        error_setg(&local_err, "multifd_send_thread: Unknown command");
> -        break;
>      }
>  
>  out:
> @@ -840,12 +882,14 @@ int multifd_save_setup(void)
>      multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
>      atomic_set(&multifd_send_state->count, 0);
>      multifd_pages_init(&multifd_send_state->pages, page_count);
> +    qemu_sem_init(&multifd_send_state->sem_sync, 0);
>  
>      for (i = 0; i < thread_count; i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>  
>          qemu_mutex_init(&p->mutex);
>          qemu_sem_init(&p->sem, 0);
> +        qemu_sem_init(&p->sem_sync, 0);
>          p->quit = false;
>          p->pending_job = 0;
>          p->id = i;
> @@ -863,6 +907,10 @@ struct {
>      MultiFDRecvParams *params;
>      /* number of created threads */
>      int count;
> +    /* syncs main thread and channels */
> +    QemuSemaphore sem_sync;
> +    /* global number of generated multifd packets */
> +    uint32_t seq;
>  } *multifd_recv_state;
>  
>  static void multifd_recv_terminate_threads(Error *err)
> @@ -908,6 +956,7 @@ int multifd_load_cleanup(Error **errp)
>          p->c = NULL;
>          qemu_mutex_destroy(&p->mutex);
>          qemu_sem_destroy(&p->sem);
> +        qemu_sem_destroy(&p->sem_sync);
>          g_free(p->name);
>          p->name = NULL;
>          multifd_pages_clear(p->pages);
> @@ -916,6 +965,7 @@ int multifd_load_cleanup(Error **errp)
>          g_free(p->packet);
>          p->packet = NULL;
>      }
> +    qemu_sem_destroy(&multifd_recv_state->sem_sync);
>      g_free(multifd_recv_state->params);
>      multifd_recv_state->params = NULL;
>      g_free(multifd_recv_state);
> @@ -924,6 +974,42 @@ int multifd_load_cleanup(Error **errp)
>      return ret;
>  }
>  
> +static void multifd_recv_sync_main(void)
> +{
> +    int i;
> +
> +    if (!migrate_use_multifd()) {
> +        return;
> +    }
> +    for (i = 0; i < migrate_multifd_channels(); i++) {
> +        MultiFDRecvParams *p = &multifd_recv_state->params[i];
> +
> +        trace_multifd_recv_sync_main_signal(p->id);
> +        qemu_mutex_lock(&p->mutex);
> +        p->pending_job = true;
> +        qemu_mutex_unlock(&p->mutex);
> +    }
> +    for (i = 0; i < migrate_multifd_channels(); i++) {
> +        MultiFDRecvParams *p = &multifd_recv_state->params[i];
> +
> +        trace_multifd_recv_sync_main_wait(p->id);
> +        qemu_sem_wait(&multifd_recv_state->sem_sync);
> +        qemu_mutex_lock(&p->mutex);
> +        if (multifd_recv_state->seq < p->seq) {
> +            multifd_recv_state->seq = p->seq;
> +        }

Can you explain what this is for?
Something like the latest received block?

> +        qemu_mutex_unlock(&p->mutex);
> +    }
> +    for (i = 0; i < migrate_multifd_channels(); i++) {
> +        MultiFDRecvParams *p = &multifd_recv_state->params[i];
> +
> +        trace_multifd_recv_sync_main_signal(p->id);
> +
> +        qemu_sem_post(&p->sem_sync);
> +    }
> +    trace_multifd_recv_sync_main(multifd_recv_state->seq);
> +}
> +
>  static void *multifd_recv_thread(void *opaque)
>  {
>      MultiFDRecvParams *p = opaque;
> @@ -933,9 +1019,8 @@ static void *multifd_recv_thread(void *opaque)
>      trace_multifd_recv_thread_start(p->id);
>  
>      while (true) {
> -        qemu_sem_wait(&p->sem);
>          qemu_mutex_lock(&p->mutex);
> -        if (p->pending_job) {
> +        if (true || p->pending_job) {

A TODO I guess???

>              uint32_t used;
>              uint32_t flags;
>              qemu_mutex_unlock(&p->mutex);
> @@ -956,14 +1041,18 @@ static void *multifd_recv_thread(void *opaque)
>              p->num_packets++;
>              p->num_pages += used;
>              qemu_mutex_unlock(&p->mutex);
> +
> +            if (flags & MULTIFD_FLAG_SYNC) {
> +                qemu_sem_post(&multifd_recv_state->sem_sync);
> +                qemu_sem_wait(&p->sem_sync);
> +            }

Can you explain the receive side logic - I think this is waiting for all
receive threads to 'ack' - but how do we know that they've finished
receiving all data that was sent?

Dave

>          } else if (p->quit) {
>              qemu_mutex_unlock(&p->mutex);
>              break;
> +        } else {
> +            qemu_mutex_unlock(&p->mutex);
> +            /* sometimes there are spurious wakeups */
>          }
> -        qemu_mutex_unlock(&p->mutex);
> -        /* this is impossible */
> -        error_setg(&local_err, "multifd_recv_thread: Unknown command");
> -        break;
>      }
>  
>      if (local_err) {
> @@ -991,12 +1080,14 @@ int multifd_load_setup(void)
>      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
>      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
>      atomic_set(&multifd_recv_state->count, 0);
> +    qemu_sem_init(&multifd_recv_state->sem_sync, 0);
>  
>      for (i = 0; i < thread_count; i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
>          qemu_mutex_init(&p->mutex);
>          qemu_sem_init(&p->sem, 0);
> +        qemu_sem_init(&p->sem_sync, 0);
>          p->quit = false;
>          p->pending_job = false;
>          p->id = i;
> @@ -2695,6 +2786,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
>      ram_control_before_iterate(f, RAM_CONTROL_SETUP);
>      ram_control_after_iterate(f, RAM_CONTROL_SETUP);
>  
> +    multifd_send_sync_main();
>      qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
>  
>      return 0;
> @@ -2770,6 +2862,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
>       */
>      ram_control_after_iterate(f, RAM_CONTROL_ROUND);
>  
> +    multifd_send_sync_main();
>  out:
>      qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
>      ram_counters.transferred += 8;
> @@ -2823,6 +2916,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>  
>      rcu_read_unlock();
>  
> +    multifd_send_sync_main();
>      qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
>  
>      return 0;
> @@ -3253,6 +3347,7 @@ static int ram_load_postcopy(QEMUFile *f)
>              break;
>          case RAM_SAVE_FLAG_EOS:
>              /* normal exit */
> +            multifd_recv_sync_main();
>              break;
>          default:
>              error_report("Unknown combination of migration flags: %#x"
> @@ -3438,6 +3533,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>              break;
>          case RAM_SAVE_FLAG_EOS:
>              /* normal exit */
> +            multifd_recv_sync_main();
>              break;
>          default:
>              if (flags & RAM_SAVE_FLAG_HOOK) {
> diff --git a/migration/trace-events b/migration/trace-events
> index 9eee048287..b0ab8e2d03 100644
> --- a/migration/trace-events
> +++ b/migration/trace-events
> @@ -83,6 +83,12 @@ multifd_recv_thread_start(uint8_t id) "%d"
>  multifd_recv_thread_end(uint8_t id, uint32_t packets, uint32_t pages) "channel %d packets %d pages %d"
>  multifd_send(uint8_t id, uint32_t seq, uint32_t used, uint32_t flags) "channel %d seq number %d pages %d flags 0x%x"
>  multifd_recv(uint8_t id, uint32_t seq, uint32_t used, uint32_t flags) "channel %d seq number %d pages %d flags 0x%x"
> +multifd_send_sync_main(uint32_t seq) "seq %d"
> +multifd_send_sync_main_signal(uint8_t id) "channel %d"
> +multifd_send_sync_main_wait(uint8_t id) "channel %d"
> +multifd_recv_sync_main(uint32_t seq) "seq %d"
> +multifd_recv_sync_main_signal(uint8_t id) "channel %d"
> +multifd_recv_sync_main_wait(uint8_t id) "channel %d"
>  
>  # migration/migration.c
>  await_return_path_close_on_source_close(void) ""
> -- 
> 2.17.0
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

  reply	other threads:[~2018-05-03 10:44 UTC|newest]

Thread overview: 60+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 01/21] migration: Set error state in case of error Juan Quintela
2018-05-02 15:53   ` Dr. David Alan Gilbert
2018-05-09  8:15     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 02/21] migration: Introduce multifd_recv_new_channel() Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 03/21] migration: terminate_* can be called for other threads Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 04/21] migration: Be sure all recv channels are created Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 05/21] migration: Export functions to create send channels Juan Quintela
2018-04-26  7:28   ` Peter Xu
2018-05-09  8:05     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 06/21] migration: Create multifd channels Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 07/21] migration: Delay start of migration main routines Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 08/21] migration: Transmit initial package through the multifd channels Juan Quintela
2018-05-02 17:19   ` Dr. David Alan Gilbert
2018-05-09  8:34     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 09/21] migration: Define MultifdRecvParams sooner Juan Quintela
2018-05-02 17:32   ` Dr. David Alan Gilbert
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 10/21] migration: Create multipage support Juan Quintela
2018-04-26  7:15   ` Peter Xu
2018-05-09 10:52     ` Juan Quintela
2018-05-02 17:52   ` Dr. David Alan Gilbert
2018-05-09 10:53     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 11/21] migration: Create multifd packet Juan Quintela
2018-05-02 18:04   ` Dr. David Alan Gilbert
2018-05-09 11:09     ` Juan Quintela
2018-05-09 11:12       ` Dr. David Alan Gilbert
2018-05-09 19:46         ` Juan Quintela
2018-05-11 16:36           ` Dr. David Alan Gilbert
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 12/21] migration: Add multifd traces for start/end thread Juan Quintela
2018-05-02 18:35   ` Dr. David Alan Gilbert
2018-05-09 11:11     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 13/21] migration: Calculate transferred ram correctly Juan Quintela
2018-05-02 18:59   ` Dr. David Alan Gilbert
2018-05-09 11:14     ` Juan Quintela
2018-05-09 19:46     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 14/21] migration: Multifd channels always wait on the sem Juan Quintela
2018-05-03  9:36   ` Dr. David Alan Gilbert
2018-05-23 10:59     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 15/21] migration: Add block where to send/receive packets Juan Quintela
2018-05-03 10:03   ` Dr. David Alan Gilbert
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 16/21] migration: Synchronize multifd threads with main thread Juan Quintela
2018-05-03 10:44   ` Dr. David Alan Gilbert [this message]
2018-05-09 19:45     ` Juan Quintela
2018-05-11 16:32       ` Dr. David Alan Gilbert
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 17/21] migration: Create ram_multifd_page Juan Quintela
2018-04-26  7:43   ` Peter Xu
2018-04-26  8:18   ` Peter Xu
2018-05-03 11:30     ` Dr. David Alan Gilbert
2018-05-23 11:13     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 18/21] migration: Start sending messages Juan Quintela
2018-05-03 14:55   ` Dr. David Alan Gilbert
2018-05-23 10:51     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 19/21] migration: Wait for blocking IO Juan Quintela
2018-05-03 15:04   ` Dr. David Alan Gilbert
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 20/21] migration: Remove not needed semaphore and quit Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 21/21] migration: Stop sending whole pages through main channel Juan Quintela
2018-05-03 15:24   ` Dr. David Alan Gilbert
2018-04-25 11:44 ` [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
2018-05-03 15:32   ` Dr. David Alan Gilbert
2018-04-26  8:28 ` Peter Xu

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20180503104437.GC2660@work-vm \
    --to=dgilbert@redhat.com \
    --cc=lvivier@redhat.com \
    --cc=peterx@redhat.com \
    --cc=qemu-devel@nongnu.org \
    --cc=quintela@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.