All of lore.kernel.org
 help / color / mirror / Atom feed
From: Paolo Bonzini <pbonzini@redhat.com>
To: Umesh Deshpande <udeshpan@redhat.com>
Cc: kvm@vger.kernel.org, quintela@redhat.com, mtosatti@redhat.com
Subject: Re: [RFC PATCH v3 1/4] separate thread for VM migration
Date: Thu, 11 Aug 2011 18:18:54 +0200	[thread overview]
Message-ID: <4E4400EE.1030905@redhat.com> (raw)
In-Reply-To: <6ac256e1f481ea28678bae846a13714302f258db.1313076455.git.udeshpan@redhat.com>

On 08/11/2011 05:32 PM, Umesh Deshpande wrote:
> This patch creates a separate thread for the guest migration on the source side.
> migrate_cancel request from the iothread is handled asynchronously. That is,
> iothread submits migrate_cancel to the migration thread and returns, while the
> migration thread attends this request at the next iteration to terminate its
> execution.

Looks pretty good!  I hope you agree. :)  Just one note inside.

> Signed-off-by: Umesh Deshpande<udeshpan@redhat.com>
> ---
>   buffered_file.c |   85 ++++++++++++++++++++++++++++++++----------------------
>   buffered_file.h |    4 ++
>   migration.c     |   49 ++++++++++++++-----------------
>   migration.h     |    6 ++++
>   4 files changed, 82 insertions(+), 62 deletions(-)
>
> diff --git a/buffered_file.c b/buffered_file.c
> index 41b42c3..19932b6 100644
> --- a/buffered_file.c
> +++ b/buffered_file.c
> @@ -16,6 +16,8 @@
>   #include "qemu-timer.h"
>   #include "qemu-char.h"
>   #include "buffered_file.h"
> +#include "migration.h"
> +#include "qemu-thread.h"
>
>   //#define DEBUG_BUFFERED_FILE
>
> @@ -28,13 +30,14 @@ typedef struct QEMUFileBuffered
>       void *opaque;
>       QEMUFile *file;
>       int has_error;
> +    int closed;
>       int freeze_output;
>       size_t bytes_xfer;
>       size_t xfer_limit;
>       uint8_t *buffer;
>       size_t buffer_size;
>       size_t buffer_capacity;
> -    QEMUTimer *timer;
> +    QemuThread thread;
>   } QEMUFileBuffered;
>
>   #ifdef DEBUG_BUFFERED_FILE
> @@ -155,14 +158,6 @@ static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, in
>           offset = size;
>       }
>
> -    if (pos == 0&&  size == 0) {
> -        DPRINTF("file is ready\n");
> -        if (s->bytes_xfer<= s->xfer_limit) {
> -            DPRINTF("notifying client\n");
> -            s->put_ready(s->opaque);
> -        }
> -    }
> -
>       return offset;
>   }
>
> @@ -175,20 +170,20 @@ static int buffered_close(void *opaque)
>
>       while (!s->has_error&&  s->buffer_size) {
>           buffered_flush(s);
> -        if (s->freeze_output)
> +        if (s->freeze_output) {
>               s->wait_for_unfreeze(s);
> +        }
>       }

This is racy; you might end up calling buffered_put_buffer twice from 
two different threads.

> -    ret = s->close(s->opaque);
> +    s->closed = 1;
>
> -    qemu_del_timer(s->timer);
> -    qemu_free_timer(s->timer);
> +    ret = s->close(s->opaque);
>       qemu_free(s->buffer);
> -    qemu_free(s);

... similarly, here the migration thread might end up using the buffer. 
  Just set s->closed here and wait for thread completion; the migration 
thread can handle the flushes free the buffer etc.  Let the migration 
thread do as much as possible, it will simplify your life.

>       return ret;
>   }
>
> +
>   static int buffered_rate_limit(void *opaque)
>   {
>       QEMUFileBuffered *s = opaque;
> @@ -228,34 +223,55 @@ static int64_t buffered_get_rate_limit(void *opaque)
>       return s->xfer_limit;
>   }
>
> -static void buffered_rate_tick(void *opaque)
> +static void *migrate_vm(void *opaque)
>   {
>       QEMUFileBuffered *s = opaque;
> +    int64_t current_time, expire_time = qemu_get_clock_ms(rt_clock) + 100;
> +    struct timeval tv = { .tv_sec = 0, .tv_usec = 100000};
>
> -    if (s->has_error) {
> -        buffered_close(s);
> -        return;
> -    }
> +    qemu_mutex_lock_iothread();
>
> -    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
> +    while (!s->closed) {

... This can be in fact

     while (!s->closed || s->buffered_size)

and that alone will subsume the loop in buffered_close, no?

> +        if (s->freeze_output) {
> +            s->wait_for_unfreeze(s);
> +            s->freeze_output = 0;
> +            continue;
> +        }
>
> -    if (s->freeze_output)
> -        return;
> +        if (s->has_error) {
> +            break;
> +        }
> +
> +        current_time = qemu_get_clock_ms(rt_clock);
> +        if (!s->closed&&  (expire_time>  current_time)) {
> +            tv.tv_usec = 1000 * (expire_time - current_time);
> +            select(0, NULL, NULL, NULL,&tv);
> +            continue;
> +        }
>
> -    s->bytes_xfer = 0;
> +        s->bytes_xfer = 0;
> +        buffered_flush(s);
>
> -    buffered_flush(s);
> +        expire_time = qemu_get_clock_ms(rt_clock) + 100;
> +        s->put_ready(s->opaque);
> +    }
>
> -    /* Add some checks around this */
> -    s->put_ready(s->opaque);
> +    if (s->has_error) {
> +        buffered_close(s);
> +    }
> +    qemu_free(s);
> +
> +    qemu_mutex_unlock_iothread();
> +
> +    return NULL;
>   }
>
>   QEMUFile *qemu_fopen_ops_buffered(void *opaque,
> -                                  size_t bytes_per_sec,
> -                                  BufferedPutFunc *put_buffer,
> -                                  BufferedPutReadyFunc *put_ready,
> -                                  BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
> -                                  BufferedCloseFunc *close)
> +        size_t bytes_per_sec,
> +        BufferedPutFunc *put_buffer,
> +        BufferedPutReadyFunc *put_ready,
> +        BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
> +        BufferedCloseFunc *close)
>   {
>       QEMUFileBuffered *s;
>
> @@ -267,15 +283,14 @@ QEMUFile *qemu_fopen_ops_buffered(void *opaque,
>       s->put_ready = put_ready;
>       s->wait_for_unfreeze = wait_for_unfreeze;
>       s->close = close;
> +    s->closed = 0;
>
>       s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
>                                buffered_close, buffered_rate_limit,
>                                buffered_set_rate_limit,
> -			     buffered_get_rate_limit);
> -
> -    s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
> +                             buffered_get_rate_limit);
>
> -    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
> +    qemu_thread_create(&s->thread, migrate_vm, s);
>
>       return s->file;
>   }
> diff --git a/buffered_file.h b/buffered_file.h
> index 98d358b..477bf7c 100644
> --- a/buffered_file.h
> +++ b/buffered_file.h
> @@ -17,9 +17,13 @@
>   #include "hw/hw.h"
>
>   typedef ssize_t (BufferedPutFunc)(void *opaque, const void *data, size_t size);
> +typedef void (BufferedBeginFunc)(void *opaque);

Unused typedef.

>   typedef void (BufferedPutReadyFunc)(void *opaque);
>   typedef void (BufferedWaitForUnfreezeFunc)(void *opaque);
>   typedef int (BufferedCloseFunc)(void *opaque);
> +typedef void (BufferedWaitForCancelFunc)(void *opaque);
> +
> +void wait_for_cancel(void *opaque);

BufferedWaitForCancelFunc should go in patch 2; wait_for_cancel is unused.

>   QEMUFile *qemu_fopen_ops_buffered(void *opaque, size_t xfer_limit,
>                                     BufferedPutFunc *put_buffer,
> diff --git a/migration.c b/migration.c
> index af3a1f2..d8a0abb 100644
> --- a/migration.c
> +++ b/migration.c
> @@ -284,8 +284,6 @@ int migrate_fd_cleanup(FdMigrationState *s)
>   {
>       int ret = 0;
>
> -    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
> -
>       if (s->file) {
>           DPRINTF("closing file\n");
>           if (qemu_fclose(s->file) != 0) {
> @@ -307,14 +305,6 @@ int migrate_fd_cleanup(FdMigrationState *s)
>       return ret;
>   }
>
> -void migrate_fd_put_notify(void *opaque)
> -{
> -    FdMigrationState *s = opaque;
> -
> -    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
> -    qemu_file_put_notify(s->file);
> -}
> -

qemu_file_put_notify is also unused now.

>   ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
>   {
>       FdMigrationState *s = opaque;
> @@ -327,9 +317,7 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
>       if (ret == -1)
>           ret = -(s->get_error(s));
>
> -    if (ret == -EAGAIN) {
> -        qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s);
> -    } else if (ret<  0) {
> +    if (ret<  0&&  ret != -EAGAIN) {
>           if (s->mon) {
>               monitor_resume(s->mon);
>           }
> @@ -342,36 +330,40 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
>
>   void migrate_fd_connect(FdMigrationState *s)
>   {
> -    int ret;
> -
> +    s->begin = 1;
>       s->file = qemu_fopen_ops_buffered(s,
>                                         s->bandwidth_limit,
>                                         migrate_fd_put_buffer,
>                                         migrate_fd_put_ready,
>                                         migrate_fd_wait_for_unfreeze,
>                                         migrate_fd_close);
> -
> -    DPRINTF("beginning savevm\n");
> -    ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
> -                                  s->mig_state.shared);
> -    if (ret<  0) {
> -        DPRINTF("failed, %d\n", ret);
> -        migrate_fd_error(s);
> -        return;
> -    }
> -
> -    migrate_fd_put_ready(s);
>   }
>
>   void migrate_fd_put_ready(void *opaque)
>   {
>       FdMigrationState *s = opaque;
> +    int ret;
>
>       if (s->state != MIG_STATE_ACTIVE) {
>           DPRINTF("put_ready returning because of non-active state\n");
> +        if (s->state == MIG_STATE_CANCELLED) {
> +            migrate_fd_terminate(s);
> +        }
>           return;
>       }
>
> +    if (s->begin) {
> +        DPRINTF("beginning savevm\n");
> +        ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
> +                s->mig_state.shared);
> +        if (ret<  0) {
> +            DPRINTF("failed, %d\n", ret);
> +            migrate_fd_error(s);
> +            return;
> +        }
> +        s->begin = 0;
> +    }
> +
>       DPRINTF("iterate\n");
>       if (qemu_savevm_state_iterate(s->mon, s->file) == 1) {
>           int state;
> @@ -415,6 +407,10 @@ void migrate_fd_cancel(MigrationState *mig_state)
>       DPRINTF("cancelling migration\n");
>
>       s->state = MIG_STATE_CANCELLED;
> +}
> +
> +void migrate_fd_terminate(FdMigrationState *s)
> +{
>       notifier_list_notify(&migration_state_notifiers);
>       qemu_savevm_state_cancel(s->mon, s->file);
>
> @@ -458,7 +454,6 @@ int migrate_fd_close(void *opaque)
>   {
>       FdMigrationState *s = opaque;
>
> -    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
>       return s->close(s);
>   }
>
> diff --git a/migration.h b/migration.h
> index 050c56c..887f84c 100644
> --- a/migration.h
> +++ b/migration.h
> @@ -45,9 +45,11 @@ struct FdMigrationState
>       int fd;
>       Monitor *mon;
>       int state;
> +    int begin;
>       int (*get_error)(struct FdMigrationState*);
>       int (*close)(struct FdMigrationState*);
>       int (*write)(struct FdMigrationState*, const void *, size_t);
> +    void (*callback)(void *);
>       void *opaque;
>   };
>
> @@ -118,12 +120,16 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size);
>
>   void migrate_fd_connect(FdMigrationState *s);
>
> +void migrate_fd_begin(void *opaque);
> +
>   void migrate_fd_put_ready(void *opaque);
>
>   int migrate_fd_get_status(MigrationState *mig_state);
>
>   void migrate_fd_cancel(MigrationState *mig_state);
>
> +void migrate_fd_terminate(FdMigrationState *s);
> +
>   void migrate_fd_release(MigrationState *mig_state);
>
>   void migrate_fd_wait_for_unfreeze(void *opaque);


  reply	other threads:[~2011-08-11 16:18 UTC|newest]

Thread overview: 19+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2011-08-11 15:32 [RFC PATCH v3 0/4] Separate thread for VM migration Umesh Deshpande
2011-08-11 15:32 ` [RFC PATCH v3 1/4] separate " Umesh Deshpande
2011-08-11 16:18   ` Paolo Bonzini [this message]
2011-08-11 17:36     ` Umesh Deshpande
2011-08-12  6:40       ` Paolo Bonzini
2011-08-11 15:32 ` [RFC PATCH v3 2/4] Making iothread block for migrate_cancel Umesh Deshpande
2011-08-11 15:32 ` [RFC PATCH v3 3/4] lock to protect memslots Umesh Deshpande
2011-08-11 16:20   ` Paolo Bonzini
2011-08-12  6:45     ` Paolo Bonzini
2011-08-15  6:45       ` Umesh Deshpande
2011-08-15 14:10         ` Paolo Bonzini
2011-08-15  7:26       ` Marcelo Tosatti
2011-08-15 14:14         ` Paolo Bonzini
2011-08-15 20:27           ` Umesh Deshpande
2011-08-16  6:15             ` Paolo Bonzini
2011-08-16  7:56               ` Paolo Bonzini
2011-08-11 15:32 ` [RFC PATCH v3 4/4] Separate migration bitmap Umesh Deshpande
2011-08-11 16:23 ` [RFC PATCH v3 0/4] Separate thread for VM migration Paolo Bonzini
2011-08-11 18:25 ` Anthony Liguori

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=4E4400EE.1030905@redhat.com \
    --to=pbonzini@redhat.com \
    --cc=kvm@vger.kernel.org \
    --cc=mtosatti@redhat.com \
    --cc=quintela@redhat.com \
    --cc=udeshpan@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.