All of lore.kernel.org
 help / color / mirror / Atom feed
* [Qemu-devel] [PATCH v16 01/14] migration: Create multipage support
@ 2018-06-26 18:19 Juan Quintela
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 02/14] migration: Create multifd packet Juan Quintela
                   ` (12 more replies)
  0 siblings, 13 replies; 18+ messages in thread
From: Juan Quintela @ 2018-06-26 18:19 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We only create/destry the page list here.  We will use it later.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 migration/ram.c | 57 +++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 57 insertions(+)

diff --git a/migration/ram.c b/migration/ram.c
index cd5f55117d..ed4401ee46 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -517,6 +517,20 @@ typedef struct {
     uint8_t id;
 } __attribute__((packed)) MultiFDInit_t;
 
+typedef struct {
+    /* number of used pages */
+    uint32_t used;
+    /* number of allocated pages */
+    uint32_t allocated;
+    /* global number of generated multifd packets */
+    uint64_t packet_num;
+    /* offset of each page */
+    ram_addr_t *offset;
+    /* pointer to each page */
+    struct iovec *iov;
+    RAMBlock *block;
+} MultiFDPages_t;
+
 typedef struct {
     /* this fields are not changed once the thread is created */
     /* channel number */
@@ -535,6 +549,8 @@ typedef struct {
     bool running;
     /* should this thread finish */
     bool quit;
+    /* array of pages to sent */
+    MultiFDPages_t *pages;
 }  MultiFDSendParams;
 
 typedef struct {
@@ -555,6 +571,8 @@ typedef struct {
     bool running;
     /* should this thread finish */
     bool quit;
+    /* array of pages to receive */
+    MultiFDPages_t *pages;
 } MultiFDRecvParams;
 
 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
@@ -619,10 +637,36 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
     return msg.id;
 }
 
+static MultiFDPages_t *multifd_pages_init(size_t size)
+{
+    MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
+
+    pages->allocated = size;
+    pages->iov = g_new0(struct iovec, size);
+    pages->offset = g_new0(ram_addr_t, size);
+
+    return pages;
+}
+
+static void multifd_pages_clear(MultiFDPages_t *pages)
+{
+    pages->used = 0;
+    pages->allocated = 0;
+    pages->packet_num = 0;
+    pages->block = NULL;
+    g_free(pages->iov);
+    pages->iov = NULL;
+    g_free(pages->offset);
+    pages->offset = NULL;
+    g_free(pages);
+}
+
 struct {
     MultiFDSendParams *params;
     /* number of created threads */
     int count;
+    /* array of pages to sent */
+    MultiFDPages_t *pages;
 } *multifd_send_state;
 
 static void multifd_send_terminate_threads(Error *err)
@@ -672,9 +716,13 @@ int multifd_save_cleanup(Error **errp)
         qemu_sem_destroy(&p->sem);
         g_free(p->name);
         p->name = NULL;
+        multifd_pages_clear(p->pages);
+        p->pages = NULL;
     }
     g_free(multifd_send_state->params);
     multifd_send_state->params = NULL;
+    multifd_pages_clear(multifd_send_state->pages);
+    multifd_send_state->pages = NULL;
     g_free(multifd_send_state);
     multifd_send_state = NULL;
     return ret;
@@ -735,6 +783,7 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
 int multifd_save_setup(void)
 {
     int thread_count;
+    uint32_t page_count = migrate_multifd_page_count();
     uint8_t i;
 
     if (!migrate_use_multifd()) {
@@ -744,6 +793,8 @@ int multifd_save_setup(void)
     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
     atomic_set(&multifd_send_state->count, 0);
+    multifd_send_state->pages = multifd_pages_init(page_count);
+
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
@@ -751,6 +802,7 @@ int multifd_save_setup(void)
         qemu_sem_init(&p->sem, 0);
         p->quit = false;
         p->id = i;
+        p->pages = multifd_pages_init(page_count);
         p->name = g_strdup_printf("multifdsend_%d", i);
         socket_send_channel_create(multifd_new_send_channel_async, p);
     }
@@ -808,6 +860,8 @@ int multifd_load_cleanup(Error **errp)
         qemu_sem_destroy(&p->sem);
         g_free(p->name);
         p->name = NULL;
+        multifd_pages_clear(p->pages);
+        p->pages = NULL;
     }
     g_free(multifd_recv_state->params);
     multifd_recv_state->params = NULL;
@@ -841,6 +895,7 @@ static void *multifd_recv_thread(void *opaque)
 int multifd_load_setup(void)
 {
     int thread_count;
+    uint32_t page_count = migrate_multifd_page_count();
     uint8_t i;
 
     if (!migrate_use_multifd()) {
@@ -850,6 +905,7 @@ int multifd_load_setup(void)
     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
     atomic_set(&multifd_recv_state->count, 0);
+
     for (i = 0; i < thread_count; i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
@@ -857,6 +913,7 @@ int multifd_load_setup(void)
         qemu_sem_init(&p->sem, 0);
         p->quit = false;
         p->id = i;
+        p->pages = multifd_pages_init(page_count);
         p->name = g_strdup_printf("multifdrecv_%d", i);
     }
     return 0;
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [Qemu-devel] [PATCH v16 02/14] migration: Create multifd packet
  2018-06-26 18:19 [Qemu-devel] [PATCH v16 01/14] migration: Create multipage support Juan Quintela
@ 2018-06-26 18:19 ` Juan Quintela
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 03/14] migration: Calculate mbps only during transfer time Juan Quintela
                   ` (11 subsequent siblings)
  12 siblings, 0 replies; 18+ messages in thread
From: Juan Quintela @ 2018-06-26 18:19 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We still don't put anything there.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

--
fix magic (dave)
check offset/ramblock  (dave)
s/seq/packet_num/ and make it 64bit
---
 migration/ram.c | 145 +++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 144 insertions(+), 1 deletion(-)

diff --git a/migration/ram.c b/migration/ram.c
index ed4401ee46..fd144fdf9a 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -517,6 +517,17 @@ typedef struct {
     uint8_t id;
 } __attribute__((packed)) MultiFDInit_t;
 
+typedef struct {
+    uint32_t magic;
+    uint32_t version;
+    uint32_t flags;
+    uint32_t size;
+    uint32_t used;
+    uint64_t packet_num;
+    char ramblock[256];
+    uint64_t offset[];
+} __attribute__((packed)) MultiFDPacket_t;
+
 typedef struct {
     /* number of used pages */
     uint32_t used;
@@ -551,6 +562,14 @@ typedef struct {
     bool quit;
     /* array of pages to sent */
     MultiFDPages_t *pages;
+    /* packet allocated len */
+    uint32_t packet_len;
+    /* pointer to the packet */
+    MultiFDPacket_t *packet;
+    /* multifd flags for each packet */
+    uint32_t flags;
+    /* global number of generated multifd packets */
+    uint64_t packet_num;
 }  MultiFDSendParams;
 
 typedef struct {
@@ -573,6 +592,14 @@ typedef struct {
     bool quit;
     /* array of pages to receive */
     MultiFDPages_t *pages;
+    /* packet allocated len */
+    uint32_t packet_len;
+    /* pointer to the packet */
+    MultiFDPacket_t *packet;
+    /* multifd flags for each packet */
+    uint32_t flags;
+    /* global number of generated multifd packets */
+    uint64_t packet_num;
 } MultiFDRecvParams;
 
 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
@@ -661,6 +688,99 @@ static void multifd_pages_clear(MultiFDPages_t *pages)
     g_free(pages);
 }
 
+static void multifd_send_fill_packet(MultiFDSendParams *p)
+{
+    MultiFDPacket_t *packet = p->packet;
+    int i;
+
+    packet->magic = cpu_to_be32(MULTIFD_MAGIC);
+    packet->version = cpu_to_be32(MULTIFD_VERSION);
+    packet->flags = cpu_to_be32(p->flags);
+    packet->size = cpu_to_be32(migrate_multifd_page_count());
+    packet->used = cpu_to_be32(p->pages->used);
+    packet->packet_num = cpu_to_be64(p->packet_num);
+
+    if (p->pages->block) {
+        strncpy(packet->ramblock, p->pages->block->idstr, 256);
+    }
+
+    for (i = 0; i < p->pages->used; i++) {
+        packet->offset[i] = cpu_to_be64(p->pages->offset[i]);
+    }
+}
+
+static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
+{
+    MultiFDPacket_t *packet = p->packet;
+    RAMBlock *block;
+    int i;
+
+    /* ToDo: We can't use it until we haven't received a message */
+    return 0;
+
+    be32_to_cpus(&packet->magic);
+    if (packet->magic != MULTIFD_MAGIC) {
+        error_setg(errp, "multifd: received packet "
+                   "magic %x and expected magic %x",
+                   packet->magic, MULTIFD_MAGIC);
+        return -1;
+    }
+
+    be32_to_cpus(&packet->version);
+    if (packet->version != MULTIFD_VERSION) {
+        error_setg(errp, "multifd: received packet "
+                   "version %d and expected version %d",
+                   packet->version, MULTIFD_VERSION);
+        return -1;
+    }
+
+    p->flags = be32_to_cpu(packet->flags);
+
+    be32_to_cpus(&packet->size);
+    if (packet->size > migrate_multifd_page_count()) {
+        error_setg(errp, "multifd: received packet "
+                   "with size %d and expected maximum size %d",
+                   packet->size, migrate_multifd_page_count()) ;
+        return -1;
+    }
+
+    p->pages->used = be32_to_cpu(packet->used);
+    if (p->pages->used > packet->size) {
+        error_setg(errp, "multifd: received packet "
+                   "with size %d and expected maximum size %d",
+                   p->pages->used, packet->size) ;
+        return -1;
+    }
+
+    p->packet_num = be64_to_cpu(packet->packet_num);
+
+    if (p->pages->used) {
+        /* make sure that ramblock is 0 terminated */
+        packet->ramblock[255] = 0;
+        block = qemu_ram_block_by_name(packet->ramblock);
+        if (!block) {
+            error_setg(errp, "multifd: unknown ram block %s",
+                       packet->ramblock);
+            return -1;
+        }
+    }
+
+    for (i = 0; i < p->pages->used; i++) {
+        ram_addr_t offset = be64_to_cpu(packet->offset[i]);
+
+        if (offset > (block->used_length - TARGET_PAGE_SIZE)) {
+            error_setg(errp, "multifd: offset too long " RAM_ADDR_FMT
+                       " (max " RAM_ADDR_FMT ")",
+                       offset, block->max_length);
+            return -1;
+        }
+        p->pages->iov[i].iov_base = block->host + offset;
+        p->pages->iov[i].iov_len = TARGET_PAGE_SIZE;
+    }
+
+    return 0;
+}
+
 struct {
     MultiFDSendParams *params;
     /* number of created threads */
@@ -718,6 +838,9 @@ int multifd_save_cleanup(Error **errp)
         p->name = NULL;
         multifd_pages_clear(p->pages);
         p->pages = NULL;
+        p->packet_len = 0;
+        g_free(p->packet);
+        p->packet = NULL;
     }
     g_free(multifd_send_state->params);
     multifd_send_state->params = NULL;
@@ -739,6 +862,7 @@ static void *multifd_send_thread(void *opaque)
 
     while (true) {
         qemu_mutex_lock(&p->mutex);
+        multifd_send_fill_packet(p);
         if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
             break;
@@ -803,6 +927,9 @@ int multifd_save_setup(void)
         p->quit = false;
         p->id = i;
         p->pages = multifd_pages_init(page_count);
+        p->packet_len = sizeof(MultiFDPacket_t)
+                      + sizeof(ram_addr_t) * page_count;
+        p->packet = g_malloc0(p->packet_len);
         p->name = g_strdup_printf("multifdsend_%d", i);
         socket_send_channel_create(multifd_new_send_channel_async, p);
     }
@@ -862,6 +989,9 @@ int multifd_load_cleanup(Error **errp)
         p->name = NULL;
         multifd_pages_clear(p->pages);
         p->pages = NULL;
+        p->packet_len = 0;
+        g_free(p->packet);
+        p->packet = NULL;
     }
     g_free(multifd_recv_state->params);
     multifd_recv_state->params = NULL;
@@ -874,10 +1004,20 @@ int multifd_load_cleanup(Error **errp)
 static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *p = opaque;
+    Error *local_err = NULL;
+    int ret;
 
     while (true) {
         qemu_mutex_lock(&p->mutex);
-        if (p->quit) {
+        if (false)  {
+            /* ToDo: Packet reception goes here */
+
+            ret = multifd_recv_unfill_packet(p, &local_err);
+            qemu_mutex_unlock(&p->mutex);
+            if (ret) {
+                break;
+            }
+        } else if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
             break;
         }
@@ -914,6 +1054,9 @@ int multifd_load_setup(void)
         p->quit = false;
         p->id = i;
         p->pages = multifd_pages_init(page_count);
+        p->packet_len = sizeof(MultiFDPacket_t)
+                      + sizeof(ram_addr_t) * page_count;
+        p->packet = g_malloc0(p->packet_len);
         p->name = g_strdup_printf("multifdrecv_%d", i);
     }
     return 0;
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [Qemu-devel] [PATCH v16 03/14] migration: Calculate mbps only during transfer time
  2018-06-26 18:19 [Qemu-devel] [PATCH v16 01/14] migration: Create multipage support Juan Quintela
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 02/14] migration: Create multifd packet Juan Quintela
@ 2018-06-26 18:19 ` Juan Quintela
  2018-06-27  9:28   ` Dr. David Alan Gilbert
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 04/14] migration: Abstract the number of bytes sent Juan Quintela
                   ` (10 subsequent siblings)
  12 siblings, 1 reply; 18+ messages in thread
From: Juan Quintela @ 2018-06-26 18:19 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We used to include in this calculation the setup time, but that can be
quite big in rdma or multifd.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/migration.c | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index e1eaa97df4..d3e6da9bfe 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -2708,6 +2708,7 @@ static void migration_calculate_complete(MigrationState *s)
 {
     uint64_t bytes = qemu_ftell(s->to_dst_file);
     int64_t end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
+    int64_t transfer_time;
 
     s->total_time = end_time - s->start_time;
     if (!s->downtime) {
@@ -2718,8 +2719,9 @@ static void migration_calculate_complete(MigrationState *s)
         s->downtime = end_time - s->downtime_start;
     }
 
-    if (s->total_time) {
-        s->mbps = ((double) bytes * 8.0) / s->total_time / 1000;
+    transfer_time = s->total_time - s->setup_time;
+    if (transfer_time) {
+        s->mbps = ((double) bytes * 8.0) / transfer_time / 1000;
     }
 }
 
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [Qemu-devel] [PATCH v16 04/14] migration: Abstract the number of bytes sent
  2018-06-26 18:19 [Qemu-devel] [PATCH v16 01/14] migration: Create multipage support Juan Quintela
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 02/14] migration: Create multifd packet Juan Quintela
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 03/14] migration: Calculate mbps only during transfer time Juan Quintela
@ 2018-06-26 18:19 ` Juan Quintela
  2018-06-27  9:46   ` Dr. David Alan Gilbert
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 05/14] migration: Add multifd traces for start/end thread Juan Quintela
                   ` (9 subsequent siblings)
  12 siblings, 1 reply; 18+ messages in thread
From: Juan Quintela @ 2018-06-26 18:19 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Right now we use the "position" inside the QEMUFile, but things like
RDMA already do weird things to be able to maintain that counter
right, and multifd will have some similar problems.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/migration.c | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index d3e6da9bfe..264f3ce84e 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -2704,9 +2704,15 @@ static MigThrError migration_detect_error(MigrationState *s)
     }
 }
 
+/* How many bytes have we transferred since the beggining of the migration */
+static uint64_t migration_total_bytes(MigrationState *s)
+{
+    return qemu_ftell(s->to_dst_file);
+}
+
 static void migration_calculate_complete(MigrationState *s)
 {
-    uint64_t bytes = qemu_ftell(s->to_dst_file);
+    uint64_t bytes = migration_total_bytes(s);
     int64_t end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
     int64_t transfer_time;
 
@@ -2729,13 +2735,15 @@ static void migration_update_counters(MigrationState *s,
                                       int64_t current_time)
 {
     uint64_t transferred, time_spent;
+    uint64_t current_bytes; /* bytes transferred since the beginning */
     double bandwidth;
 
     if (current_time < s->iteration_start_time + BUFFER_DELAY) {
         return;
     }
 
-    transferred = qemu_ftell(s->to_dst_file) - s->iteration_initial_bytes;
+    current_bytes = migration_total_bytes(s);
+    transferred = current_bytes - s->iteration_initial_bytes;
     time_spent = current_time - s->iteration_start_time;
     bandwidth = (double)transferred / time_spent;
     s->threshold_size = bandwidth * s->parameters.downtime_limit;
@@ -2754,7 +2762,7 @@ static void migration_update_counters(MigrationState *s,
     qemu_file_reset_rate_limit(s->to_dst_file);
 
     s->iteration_start_time = current_time;
-    s->iteration_initial_bytes = qemu_ftell(s->to_dst_file);
+    s->iteration_initial_bytes = current_bytes;
 
     trace_migrate_transferred(transferred, time_spent,
                               bandwidth, s->threshold_size);
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [Qemu-devel] [PATCH v16 05/14] migration: Add multifd traces for start/end thread
  2018-06-26 18:19 [Qemu-devel] [PATCH v16 01/14] migration: Create multipage support Juan Quintela
                   ` (2 preceding siblings ...)
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 04/14] migration: Abstract the number of bytes sent Juan Quintela
@ 2018-06-26 18:19 ` Juan Quintela
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 06/14] migration: Multifd channels always wait on the sem Juan Quintela
                   ` (8 subsequent siblings)
  12 siblings, 0 replies; 18+ messages in thread
From: Juan Quintela @ 2018-06-26 18:19 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We want to know how many pages/packets each channel has sent.  Add
counters for those.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

--
sort trace-events (dave)
---
 migration/ram.c        | 22 ++++++++++++++++++++++
 migration/trace-events |  4 ++++
 2 files changed, 26 insertions(+)

diff --git a/migration/ram.c b/migration/ram.c
index fd144fdf9a..5c040e3ae5 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -570,6 +570,11 @@ typedef struct {
     uint32_t flags;
     /* global number of generated multifd packets */
     uint64_t packet_num;
+    /* thread local variables */
+    /* packets sent through this channel */
+    uint64_t num_packets;
+    /* pages sent through this channel */
+    uint64_t num_pages;
 }  MultiFDSendParams;
 
 typedef struct {
@@ -600,6 +605,11 @@ typedef struct {
     uint32_t flags;
     /* global number of generated multifd packets */
     uint64_t packet_num;
+    /* thread local variables */
+    /* packets sent through this channel */
+    uint64_t num_packets;
+    /* pages sent through this channel */
+    uint64_t num_pages;
 } MultiFDRecvParams;
 
 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
@@ -856,9 +866,13 @@ static void *multifd_send_thread(void *opaque)
     MultiFDSendParams *p = opaque;
     Error *local_err = NULL;
 
+    trace_multifd_send_thread_start(p->id);
+
     if (multifd_send_initial_packet(p, &local_err) < 0) {
         goto out;
     }
+    /* initial packet */
+    p->num_packets = 1;
 
     while (true) {
         qemu_mutex_lock(&p->mutex);
@@ -880,6 +894,8 @@ out:
     p->running = false;
     qemu_mutex_unlock(&p->mutex);
 
+    trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
+
     return NULL;
 }
 
@@ -1007,6 +1023,8 @@ static void *multifd_recv_thread(void *opaque)
     Error *local_err = NULL;
     int ret;
 
+    trace_multifd_recv_thread_start(p->id);
+
     while (true) {
         qemu_mutex_lock(&p->mutex);
         if (false)  {
@@ -1029,6 +1047,8 @@ static void *multifd_recv_thread(void *opaque)
     p->running = false;
     qemu_mutex_unlock(&p->mutex);
 
+    trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
+
     return NULL;
 }
 
@@ -1094,6 +1114,8 @@ void multifd_recv_new_channel(QIOChannel *ioc)
     }
     p->c = ioc;
     object_ref(OBJECT(ioc));
+    /* initial packet */
+    p->num_packets = 1;
 
     p->running = true;
     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
diff --git a/migration/trace-events b/migration/trace-events
index 3f67758893..a80aaa8b3f 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -76,6 +76,10 @@ get_queued_page_not_dirty(const char *block_name, uint64_t tmp_offset, unsigned
 migration_bitmap_sync_start(void) ""
 migration_bitmap_sync_end(uint64_t dirty_pages) "dirty_pages %" PRIu64
 migration_throttle(void) ""
+multifd_recv_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %d packets %" PRIu64 " pages %" PRIu64
+multifd_recv_thread_start(uint8_t id) "%d"
+multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %d packets %" PRIu64 " pages %"  PRIu64
+multifd_send_thread_start(uint8_t id) "%d"
 ram_discard_range(const char *rbname, uint64_t start, size_t len) "%s: start: %" PRIx64 " %zx"
 ram_load_loop(const char *rbname, uint64_t addr, int flags, void *host) "%s: addr: 0x%" PRIx64 " flags: 0x%x host: %p"
 ram_load_postcopy_loop(uint64_t addr, int flags) "@%" PRIx64 " %x"
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [Qemu-devel] [PATCH v16 06/14] migration: Multifd channels always wait on the sem
  2018-06-26 18:19 [Qemu-devel] [PATCH v16 01/14] migration: Create multipage support Juan Quintela
                   ` (3 preceding siblings ...)
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 05/14] migration: Add multifd traces for start/end thread Juan Quintela
@ 2018-06-26 18:19 ` Juan Quintela
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 07/14] migration: Add block where to send/receive packets Juan Quintela
                   ` (7 subsequent siblings)
  12 siblings, 0 replies; 18+ messages in thread
From: Juan Quintela @ 2018-06-26 18:19 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Either for quit, sync or packet, we first wake them.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 migration/ram.c | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 5c040e3ae5..45d6c43bfe 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -875,6 +875,7 @@ static void *multifd_send_thread(void *opaque)
     p->num_packets = 1;
 
     while (true) {
+        qemu_sem_wait(&p->sem);
         qemu_mutex_lock(&p->mutex);
         multifd_send_fill_packet(p);
         if (p->quit) {
@@ -882,7 +883,9 @@ static void *multifd_send_thread(void *opaque)
             break;
         }
         qemu_mutex_unlock(&p->mutex);
-        qemu_sem_wait(&p->sem);
+        /* this is impossible */
+        error_setg(&local_err, "multifd_send_thread: Unknown command");
+        break;
     }
 
 out:
@@ -1026,6 +1029,7 @@ static void *multifd_recv_thread(void *opaque)
     trace_multifd_recv_thread_start(p->id);
 
     while (true) {
+        qemu_sem_wait(&p->sem);
         qemu_mutex_lock(&p->mutex);
         if (false)  {
             /* ToDo: Packet reception goes here */
@@ -1040,9 +1044,14 @@ static void *multifd_recv_thread(void *opaque)
             break;
         }
         qemu_mutex_unlock(&p->mutex);
-        qemu_sem_wait(&p->sem);
+        /* this is impossible */
+        error_setg(&local_err, "multifd_recv_thread: Unknown command");
+        break;
     }
 
+    if (local_err) {
+        multifd_recv_terminate_threads(local_err);
+    }
     qemu_mutex_lock(&p->mutex);
     p->running = false;
     qemu_mutex_unlock(&p->mutex);
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [Qemu-devel] [PATCH v16 07/14] migration: Add block where to send/receive packets
  2018-06-26 18:19 [Qemu-devel] [PATCH v16 01/14] migration: Create multipage support Juan Quintela
                   ` (4 preceding siblings ...)
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 06/14] migration: Multifd channels always wait on the sem Juan Quintela
@ 2018-06-26 18:19 ` Juan Quintela
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 08/14] migration: Synchronize multifd threads with main thread Juan Quintela
                   ` (6 subsequent siblings)
  12 siblings, 0 replies; 18+ messages in thread
From: Juan Quintela @ 2018-06-26 18:19 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Once there add tracepoints.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 migration/ram.c        | 49 +++++++++++++++++++++++++++++++++++++-----
 migration/trace-events |  2 ++
 2 files changed, 46 insertions(+), 5 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 45d6c43bfe..76410f9de8 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -560,6 +560,8 @@ typedef struct {
     bool running;
     /* should this thread finish */
     bool quit;
+    /* thread has work to do */
+    int pending_job;
     /* array of pages to sent */
     MultiFDPages_t *pages;
     /* packet allocated len */
@@ -595,6 +597,8 @@ typedef struct {
     bool running;
     /* should this thread finish */
     bool quit;
+    /* thread has work to do */
+    bool pending_job;
     /* array of pages to receive */
     MultiFDPages_t *pages;
     /* packet allocated len */
@@ -877,8 +881,28 @@ static void *multifd_send_thread(void *opaque)
     while (true) {
         qemu_sem_wait(&p->sem);
         qemu_mutex_lock(&p->mutex);
-        multifd_send_fill_packet(p);
-        if (p->quit) {
+
+        if (p->pending_job) {
+            uint32_t used = p->pages->used;
+            uint64_t packet_num = p->packet_num;
+            uint32_t flags = p->flags;
+
+            multifd_send_fill_packet(p);
+            p->flags = 0;
+            p->num_packets++;
+            p->num_pages += used;
+            p->pages->used = 0;
+            qemu_mutex_unlock(&p->mutex);
+
+            trace_multifd_send(p->id, packet_num, used, flags);
+
+            /* ToDo: send packet here */
+
+            qemu_mutex_lock(&p->mutex);
+            p->pending_job--;
+            qemu_mutex_unlock(&p->mutex);
+            continue;
+        } else if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
             break;
         }
@@ -944,6 +968,7 @@ int multifd_save_setup(void)
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem, 0);
         p->quit = false;
+        p->pending_job = 0;
         p->id = i;
         p->pages = multifd_pages_init(page_count);
         p->packet_len = sizeof(MultiFDPacket_t)
@@ -1031,14 +1056,27 @@ static void *multifd_recv_thread(void *opaque)
     while (true) {
         qemu_sem_wait(&p->sem);
         qemu_mutex_lock(&p->mutex);
-        if (false)  {
-            /* ToDo: Packet reception goes here */
+        if (p->pending_job) {
+            uint32_t used;
+            uint32_t flags;
+            qemu_mutex_unlock(&p->mutex);
 
+            /* ToDo: recv packet here */
+
+            qemu_mutex_lock(&p->mutex);
             ret = multifd_recv_unfill_packet(p, &local_err);
-            qemu_mutex_unlock(&p->mutex);
             if (ret) {
+                qemu_mutex_unlock(&p->mutex);
                 break;
             }
+
+            used = p->pages->used;
+            flags = p->flags;
+            trace_multifd_recv(p->id, p->packet_num, used, flags);
+            p->pending_job = false;
+            p->num_packets++;
+            p->num_pages += used;
+            qemu_mutex_unlock(&p->mutex);
         } else if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
             break;
@@ -1081,6 +1119,7 @@ int multifd_load_setup(void)
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem, 0);
         p->quit = false;
+        p->pending_job = false;
         p->id = i;
         p->pages = multifd_pages_init(page_count);
         p->packet_len = sizeof(MultiFDPacket_t)
diff --git a/migration/trace-events b/migration/trace-events
index a80aaa8b3f..4aad26feed 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -76,8 +76,10 @@ get_queued_page_not_dirty(const char *block_name, uint64_t tmp_offset, unsigned
 migration_bitmap_sync_start(void) ""
 migration_bitmap_sync_end(uint64_t dirty_pages) "dirty_pages %" PRIu64
 migration_throttle(void) ""
+multifd_recv(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags) "channel %d packet number %" PRIu64 " pages %d flags 0x%x"
 multifd_recv_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %d packets %" PRIu64 " pages %" PRIu64
 multifd_recv_thread_start(uint8_t id) "%d"
+multifd_send(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags) "channel %d packet_num %" PRIu64 " pages %d flags 0x%x"
 multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %d packets %" PRIu64 " pages %"  PRIu64
 multifd_send_thread_start(uint8_t id) "%d"
 ram_discard_range(const char *rbname, uint64_t start, size_t len) "%s: start: %" PRIx64 " %zx"
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [Qemu-devel] [PATCH v16 08/14] migration: Synchronize multifd threads with main thread
  2018-06-26 18:19 [Qemu-devel] [PATCH v16 01/14] migration: Create multipage support Juan Quintela
                   ` (5 preceding siblings ...)
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 07/14] migration: Add block where to send/receive packets Juan Quintela
@ 2018-06-26 18:19 ` Juan Quintela
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 09/14] migration: Create multifd_bytes ram_counter Juan Quintela
                   ` (5 subsequent siblings)
  12 siblings, 0 replies; 18+ messages in thread
From: Juan Quintela @ 2018-06-26 18:19 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We synchronize all threads each RAM_SAVE_FLAG_EOS.  Bitmap
synchronizations don't happen inside a  ram section, so we are safe
about two channels trying to overwrite the same memory.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

--
seq needs to be atomic now, will also be accessed from main thread.
Fix the if (true || ...) leftover
We are back to non-atomics
---
 migration/ram.c        | 147 ++++++++++++++++++++++++++++++++---------
 migration/trace-events |   6 ++
 2 files changed, 122 insertions(+), 31 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 76410f9de8..77c66a4391 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -510,6 +510,8 @@ exit:
 #define MULTIFD_MAGIC 0x11223344U
 #define MULTIFD_VERSION 1
 
+#define MULTIFD_FLAG_SYNC (1 << 0)
+
 typedef struct {
     uint32_t magic;
     uint32_t version;
@@ -577,6 +579,8 @@ typedef struct {
     uint64_t num_packets;
     /* pages sent through this channel */
     uint64_t num_pages;
+    /* syncs main thread and channels */
+    QemuSemaphore sem_sync;
 }  MultiFDSendParams;
 
 typedef struct {
@@ -614,6 +618,8 @@ typedef struct {
     uint64_t num_packets;
     /* pages sent through this channel */
     uint64_t num_pages;
+    /* syncs main thread and channels */
+    QemuSemaphore sem_sync;
 } MultiFDRecvParams;
 
 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
@@ -801,6 +807,10 @@ struct {
     int count;
     /* array of pages to sent */
     MultiFDPages_t *pages;
+    /* syncs main thread and channels */
+    QemuSemaphore sem_sync;
+    /* global number of generated multifd packets */
+    uint64_t packet_num;
 } *multifd_send_state;
 
 static void multifd_send_terminate_threads(Error *err)
@@ -848,6 +858,7 @@ int multifd_save_cleanup(Error **errp)
         p->c = NULL;
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
+        qemu_sem_destroy(&p->sem_sync);
         g_free(p->name);
         p->name = NULL;
         multifd_pages_clear(p->pages);
@@ -856,6 +867,7 @@ int multifd_save_cleanup(Error **errp)
         g_free(p->packet);
         p->packet = NULL;
     }
+    qemu_sem_destroy(&multifd_send_state->sem_sync);
     g_free(multifd_send_state->params);
     multifd_send_state->params = NULL;
     multifd_pages_clear(multifd_send_state->pages);
@@ -865,6 +877,33 @@ int multifd_save_cleanup(Error **errp)
     return ret;
 }
 
+static void multifd_send_sync_main(void)
+{
+    int i;
+
+    if (!migrate_use_multifd()) {
+        return;
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDSendParams *p = &multifd_send_state->params[i];
+
+        trace_multifd_send_sync_main_signal(p->id);
+
+        qemu_mutex_lock(&p->mutex);
+        p->flags |= MULTIFD_FLAG_SYNC;
+        p->pending_job++;
+        qemu_mutex_unlock(&p->mutex);
+        qemu_sem_post(&p->sem);
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDSendParams *p = &multifd_send_state->params[i];
+
+        trace_multifd_send_sync_main_wait(p->id);
+        qemu_sem_wait(&multifd_send_state->sem_sync);
+    }
+    trace_multifd_send_sync_main(multifd_send_state->packet_num);
+}
+
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
@@ -901,15 +940,17 @@ static void *multifd_send_thread(void *opaque)
             qemu_mutex_lock(&p->mutex);
             p->pending_job--;
             qemu_mutex_unlock(&p->mutex);
-            continue;
+
+            if (flags & MULTIFD_FLAG_SYNC) {
+                qemu_sem_post(&multifd_send_state->sem_sync);
+            }
         } else if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
             break;
+        } else {
+            qemu_mutex_unlock(&p->mutex);
+            /* sometimes there are spurious wakeups */
         }
-        qemu_mutex_unlock(&p->mutex);
-        /* this is impossible */
-        error_setg(&local_err, "multifd_send_thread: Unknown command");
-        break;
     }
 
 out:
@@ -961,12 +1002,14 @@ int multifd_save_setup(void)
     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
     atomic_set(&multifd_send_state->count, 0);
     multifd_send_state->pages = multifd_pages_init(page_count);
+    qemu_sem_init(&multifd_send_state->sem_sync, 0);
 
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem, 0);
+        qemu_sem_init(&p->sem_sync, 0);
         p->quit = false;
         p->pending_job = 0;
         p->id = i;
@@ -984,6 +1027,10 @@ struct {
     MultiFDRecvParams *params;
     /* number of created threads */
     int count;
+    /* syncs main thread and channels */
+    QemuSemaphore sem_sync;
+    /* global number of generated multifd packets */
+    uint64_t packet_num;
 } *multifd_recv_state;
 
 static void multifd_recv_terminate_threads(Error *err)
@@ -1029,6 +1076,7 @@ int multifd_load_cleanup(Error **errp)
         p->c = NULL;
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
+        qemu_sem_destroy(&p->sem_sync);
         g_free(p->name);
         p->name = NULL;
         multifd_pages_clear(p->pages);
@@ -1037,6 +1085,7 @@ int multifd_load_cleanup(Error **errp)
         g_free(p->packet);
         p->packet = NULL;
     }
+    qemu_sem_destroy(&multifd_recv_state->sem_sync);
     g_free(multifd_recv_state->params);
     multifd_recv_state->params = NULL;
     g_free(multifd_recv_state);
@@ -1045,6 +1094,42 @@ int multifd_load_cleanup(Error **errp)
     return ret;
 }
 
+static void multifd_recv_sync_main(void)
+{
+    int i;
+
+    if (!migrate_use_multifd()) {
+        return;
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        trace_multifd_recv_sync_main_signal(p->id);
+        qemu_mutex_lock(&p->mutex);
+        p->pending_job = true;
+        qemu_mutex_unlock(&p->mutex);
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        trace_multifd_recv_sync_main_wait(p->id);
+        qemu_sem_wait(&multifd_recv_state->sem_sync);
+        qemu_mutex_lock(&p->mutex);
+        if (multifd_recv_state->packet_num < p->packet_num) {
+            multifd_recv_state->packet_num = p->packet_num;
+        }
+        qemu_mutex_unlock(&p->mutex);
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        trace_multifd_recv_sync_main_signal(p->id);
+
+        qemu_sem_post(&p->sem_sync);
+    }
+    trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
+}
+
 static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *p = opaque;
@@ -1054,37 +1139,30 @@ static void *multifd_recv_thread(void *opaque)
     trace_multifd_recv_thread_start(p->id);
 
     while (true) {
-        qemu_sem_wait(&p->sem);
+        uint32_t used;
+        uint32_t flags;
+
+        /* ToDo: recv packet here */
+
         qemu_mutex_lock(&p->mutex);
-        if (p->pending_job) {
-            uint32_t used;
-            uint32_t flags;
-            qemu_mutex_unlock(&p->mutex);
-
-            /* ToDo: recv packet here */
-
-            qemu_mutex_lock(&p->mutex);
-            ret = multifd_recv_unfill_packet(p, &local_err);
-            if (ret) {
-                qemu_mutex_unlock(&p->mutex);
-                break;
-            }
-
-            used = p->pages->used;
-            flags = p->flags;
-            trace_multifd_recv(p->id, p->packet_num, used, flags);
-            p->pending_job = false;
-            p->num_packets++;
-            p->num_pages += used;
-            qemu_mutex_unlock(&p->mutex);
-        } else if (p->quit) {
+        ret = multifd_recv_unfill_packet(p, &local_err);
+        if (ret) {
             qemu_mutex_unlock(&p->mutex);
             break;
         }
+
+        used = p->pages->used;
+        flags = p->flags;
+        trace_multifd_recv(p->id, p->packet_num, used, flags);
+        p->pending_job = false;
+        p->num_packets++;
+        p->num_pages += used;
         qemu_mutex_unlock(&p->mutex);
-        /* this is impossible */
-        error_setg(&local_err, "multifd_recv_thread: Unknown command");
-        break;
+
+        if (flags & MULTIFD_FLAG_SYNC) {
+            qemu_sem_post(&multifd_recv_state->sem_sync);
+            qemu_sem_wait(&p->sem_sync);
+        }
     }
 
     if (local_err) {
@@ -1112,12 +1190,14 @@ int multifd_load_setup(void)
     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
     atomic_set(&multifd_recv_state->count, 0);
+    qemu_sem_init(&multifd_recv_state->sem_sync, 0);
 
     for (i = 0; i < thread_count; i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem, 0);
+        qemu_sem_init(&p->sem_sync, 0);
         p->quit = false;
         p->pending_job = false;
         p->id = i;
@@ -2875,6 +2955,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
     ram_control_before_iterate(f, RAM_CONTROL_SETUP);
     ram_control_after_iterate(f, RAM_CONTROL_SETUP);
 
+    multifd_send_sync_main();
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
 
     return 0;
@@ -2955,6 +3036,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
      */
     ram_control_after_iterate(f, RAM_CONTROL_ROUND);
 
+    multifd_send_sync_main();
 out:
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
     ram_counters.transferred += 8;
@@ -3008,6 +3090,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
 
     rcu_read_unlock();
 
+    multifd_send_sync_main();
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
 
     return 0;
@@ -3497,6 +3580,7 @@ static int ram_load_postcopy(QEMUFile *f)
             break;
         case RAM_SAVE_FLAG_EOS:
             /* normal exit */
+            multifd_recv_sync_main();
             break;
         default:
             error_report("Unknown combination of migration flags: %#x"
@@ -3685,6 +3769,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
             break;
         case RAM_SAVE_FLAG_EOS:
             /* normal exit */
+            multifd_recv_sync_main();
             break;
         default:
             if (flags & RAM_SAVE_FLAG_HOOK) {
diff --git a/migration/trace-events b/migration/trace-events
index 4aad26feed..8b9edfbfef 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -77,9 +77,15 @@ migration_bitmap_sync_start(void) ""
 migration_bitmap_sync_end(uint64_t dirty_pages) "dirty_pages %" PRIu64
 migration_throttle(void) ""
 multifd_recv(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags) "channel %d packet number %" PRIu64 " pages %d flags 0x%x"
+multifd_recv_sync_main(long packet_num) "packet num %ld"
+multifd_recv_sync_main_signal(uint8_t id) "channel %d"
+multifd_recv_sync_main_wait(uint8_t id) "channel %d"
 multifd_recv_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %d packets %" PRIu64 " pages %" PRIu64
 multifd_recv_thread_start(uint8_t id) "%d"
 multifd_send(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags) "channel %d packet_num %" PRIu64 " pages %d flags 0x%x"
+multifd_send_sync_main(long packet_num) "packet num %ld"
+multifd_send_sync_main_signal(uint8_t id) "channel %d"
+multifd_send_sync_main_wait(uint8_t id) "channel %d"
 multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %d packets %" PRIu64 " pages %"  PRIu64
 multifd_send_thread_start(uint8_t id) "%d"
 ram_discard_range(const char *rbname, uint64_t start, size_t len) "%s: start: %" PRIx64 " %zx"
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [Qemu-devel] [PATCH v16 09/14] migration: Create multifd_bytes ram_counter
  2018-06-26 18:19 [Qemu-devel] [PATCH v16 01/14] migration: Create multipage support Juan Quintela
                   ` (6 preceding siblings ...)
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 08/14] migration: Synchronize multifd threads with main thread Juan Quintela
@ 2018-06-26 18:19 ` Juan Quintela
  2018-06-27 10:01   ` Dr. David Alan Gilbert
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 10/14] migration: Create ram_save_multifd_page Juan Quintela
                   ` (4 subsequent siblings)
  12 siblings, 1 reply; 18+ messages in thread
From: Juan Quintela @ 2018-06-26 18:19 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

This will include how many bytes they are sent through multifd.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 hmp.c                 | 2 ++
 migration/migration.c | 1 +
 qapi/migration.json   | 5 ++++-
 3 files changed, 7 insertions(+), 1 deletion(-)

diff --git a/hmp.c b/hmp.c
index f601099f90..0da0b0ac33 100644
--- a/hmp.c
+++ b/hmp.c
@@ -234,6 +234,8 @@ void hmp_info_migrate(Monitor *mon, const QDict *qdict)
                        info->ram->dirty_sync_count);
         monitor_printf(mon, "page size: %" PRIu64 " kbytes\n",
                        info->ram->page_size >> 10);
+        monitor_printf(mon, "multifd bytes: %" PRIu64 " kbytes\n",
+                       info->ram->multifd_bytes >> 10);
 
         if (info->ram->dirty_pages_rate) {
             monitor_printf(mon, "dirty pages rate: %" PRIu64 " pages\n",
diff --git a/migration/migration.c b/migration/migration.c
index 264f3ce84e..2680ba2d47 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -708,6 +708,7 @@ static void populate_ram_info(MigrationInfo *info, MigrationState *s)
     info->ram->dirty_sync_count = ram_counters.dirty_sync_count;
     info->ram->postcopy_requests = ram_counters.postcopy_requests;
     info->ram->page_size = qemu_target_page_size();
+    info->ram->multifd_bytes = ram_counters.multifd_bytes;
 
     if (migrate_use_xbzrle()) {
         info->has_xbzrle_cache = true;
diff --git a/qapi/migration.json b/qapi/migration.json
index 1b4c1db670..186e8a7303 100644
--- a/qapi/migration.json
+++ b/qapi/migration.json
@@ -39,6 +39,8 @@
 # @page-size: The number of bytes per page for the various page-based
 #        statistics (since 2.10)
 #
+# @multifd-bytes: The number of bytes sent through multifd (since 3.0)
+#
 # Since: 0.14.0
 ##
 { 'struct': 'MigrationStats',
@@ -46,7 +48,8 @@
            'duplicate': 'int', 'skipped': 'int', 'normal': 'int',
            'normal-bytes': 'int', 'dirty-pages-rate' : 'int',
            'mbps' : 'number', 'dirty-sync-count' : 'int',
-           'postcopy-requests' : 'int', 'page-size' : 'int' } }
+           'postcopy-requests' : 'int', 'page-size' : 'int',
+           'multifd-bytes' : 'uint64' } }
 
 ##
 # @XBZRLECacheStats:
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [Qemu-devel] [PATCH v16 10/14] migration: Create ram_save_multifd_page
  2018-06-26 18:19 [Qemu-devel] [PATCH v16 01/14] migration: Create multipage support Juan Quintela
                   ` (7 preceding siblings ...)
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 09/14] migration: Create multifd_bytes ram_counter Juan Quintela
@ 2018-06-26 18:19 ` Juan Quintela
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 11/14] migration: Start sending messages Juan Quintela
                   ` (3 subsequent siblings)
  12 siblings, 0 replies; 18+ messages in thread
From: Juan Quintela @ 2018-06-26 18:19 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

The function still don't use multifd, but we have simplified
ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
counter.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

--
Add last_page parameter
Add commets for done and address
Remove multifd field, it is the same than normal pages
Merge next patch, now we send multiple pages at a time
Remove counter for multifd pages, it is identical to normal pages
Use iovec's instead of creating the equivalent.
Clear memory used by pages (dave)
Use g_new0(danp)
define MULTIFD_CONTINUE
now pages member is a pointer
Fix off-by-one in number of pages in one packet
Remove RAM_SAVE_FLAG_MULTIFD_PAGE
s/multifd_pages_t/MultiFDPages_t/
add comment explaining what it means
---
 migration/migration.c |   2 +-
 migration/ram.c       | 107 ++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 108 insertions(+), 1 deletion(-)

diff --git a/migration/migration.c b/migration/migration.c
index 2680ba2d47..d075c27886 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -2708,7 +2708,7 @@ static MigThrError migration_detect_error(MigrationState *s)
 /* How many bytes have we transferred since the beggining of the migration */
 static uint64_t migration_total_bytes(MigrationState *s)
 {
-    return qemu_ftell(s->to_dst_file);
+    return qemu_ftell(s->to_dst_file) + ram_counters.multifd_bytes;
 }
 
 static void migration_calculate_complete(MigrationState *s)
diff --git a/migration/ram.c b/migration/ram.c
index 77c66a4391..050e2f2000 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -55,6 +55,7 @@
 #include "sysemu/sysemu.h"
 #include "qemu/uuid.h"
 #include "savevm.h"
+#include "qemu/iov.h"
 
 /***********************************************************/
 /* ram save/restore */
@@ -811,8 +812,87 @@ struct {
     QemuSemaphore sem_sync;
     /* global number of generated multifd packets */
     uint64_t packet_num;
+    /* send channels ready */
+    QemuSemaphore channels_ready;
 } *multifd_send_state;
 
+/*
+ * How we use multifd_send_state->pages and channel->pages?
+ *
+ * We create a pages for each channel, and a main one.  Each time that
+ * we need to send a batch of pages we interchange the ones between
+ * multifd_send_state and the channel that is sending it.  There are
+ * two reasons for that:
+ *    - to not have to do so many mallocs during migration
+ *    - to make easier to know what to free at the end of migration
+ *
+ * This way we always know who is the owner of each "pages" struct,
+ * and we don't need any loocking.  It belongs to the migration thread
+ * or to the channel thread.  Switching is safe because the migration
+ * thread is using the channel mutex when changing it, and the channel
+ * have to had finish with its own, otherwise pending_job can't be
+ * false.
+ */
+
+static void multifd_send_pages(void)
+{
+    int i;
+    static int next_channel;
+    MultiFDSendParams *p = NULL; /* make happy gcc */
+    MultiFDPages_t *pages = multifd_send_state->pages;
+    uint64_t transferred;
+
+    qemu_sem_wait(&multifd_send_state->channels_ready);
+    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
+        p = &multifd_send_state->params[i];
+
+        qemu_mutex_lock(&p->mutex);
+        if (!p->pending_job) {
+            p->pending_job++;
+            next_channel = (i + 1) % migrate_multifd_channels();
+            break;
+        }
+        qemu_mutex_unlock(&p->mutex);
+    }
+    p->pages->used = 0;
+
+    p->packet_num = multifd_send_state->packet_num++;
+    p->pages->block = NULL;
+    multifd_send_state->pages = p->pages;
+    p->pages = pages;
+    transferred = pages->used * TARGET_PAGE_SIZE + p->packet_len;
+    ram_counters.multifd_bytes += transferred;
+    ram_counters.transferred += transferred;;
+    qemu_mutex_unlock(&p->mutex);
+    qemu_sem_post(&p->sem);
+}
+
+static void multifd_queue_page(RAMBlock *block, ram_addr_t offset)
+{
+    MultiFDPages_t *pages = multifd_send_state->pages;
+
+    if (!pages->block) {
+        pages->block = block;
+    }
+
+    if (pages->block == block) {
+        pages->offset[pages->used] = offset;
+        pages->iov[pages->used].iov_base = block->host + offset;
+        pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
+        pages->used++;
+
+        if (pages->used < pages->allocated) {
+            return;
+        }
+    }
+
+    multifd_send_pages();
+
+    if (pages->block != block) {
+        multifd_queue_page(block, offset);
+    }
+}
+
 static void multifd_send_terminate_threads(Error *err)
 {
     int i;
@@ -867,6 +947,7 @@ int multifd_save_cleanup(Error **errp)
         g_free(p->packet);
         p->packet = NULL;
     }
+    qemu_sem_destroy(&multifd_send_state->channels_ready);
     qemu_sem_destroy(&multifd_send_state->sem_sync);
     g_free(multifd_send_state->params);
     multifd_send_state->params = NULL;
@@ -884,12 +965,17 @@ static void multifd_send_sync_main(void)
     if (!migrate_use_multifd()) {
         return;
     }
+    if (multifd_send_state->pages->used) {
+        multifd_send_pages();
+    }
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
         trace_multifd_send_sync_main_signal(p->id);
 
         qemu_mutex_lock(&p->mutex);
+
+        p->packet_num = multifd_send_state->packet_num++;
         p->flags |= MULTIFD_FLAG_SYNC;
         p->pending_job++;
         qemu_mutex_unlock(&p->mutex);
@@ -944,6 +1030,7 @@ static void *multifd_send_thread(void *opaque)
             if (flags & MULTIFD_FLAG_SYNC) {
                 qemu_sem_post(&multifd_send_state->sem_sync);
             }
+            qemu_sem_post(&multifd_send_state->channels_ready);
         } else if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
             break;
@@ -1003,6 +1090,7 @@ int multifd_save_setup(void)
     atomic_set(&multifd_send_state->count, 0);
     multifd_send_state->pages = multifd_pages_init(page_count);
     qemu_sem_init(&multifd_send_state->sem_sync, 0);
+    qemu_sem_init(&multifd_send_state->channels_ready, 0);
 
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
@@ -1724,6 +1812,23 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
     return pages;
 }
 
+static int ram_save_multifd_page(RAMState *rs, RAMBlock *block,
+                                 ram_addr_t offset)
+{
+    uint8_t *p;
+
+    p = block->host + offset;
+
+    ram_counters.transferred += save_page_header(rs, rs->f, block,
+                                                 offset | RAM_SAVE_FLAG_PAGE);
+    multifd_queue_page(block, offset);
+    qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
+    ram_counters.transferred += TARGET_PAGE_SIZE;
+    ram_counters.normal++;
+
+    return 1;
+}
+
 static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
                                 ram_addr_t offset, uint8_t *source_buf)
 {
@@ -2129,6 +2234,8 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
      */
     if (block == rs->last_sent_block && save_page_use_compression(rs)) {
         return compress_page_with_multi_thread(rs, block, offset);
+    } else if (migrate_use_multifd()) {
+        return ram_save_multifd_page(rs, block, offset);
     }
 
     return ram_save_page(rs, pss, last_stage);
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [Qemu-devel] [PATCH v16 11/14] migration: Start sending messages
  2018-06-26 18:19 [Qemu-devel] [PATCH v16 01/14] migration: Create multipage support Juan Quintela
                   ` (8 preceding siblings ...)
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 10/14] migration: Create ram_save_multifd_page Juan Quintela
@ 2018-06-26 18:19 ` Juan Quintela
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 12/14] migration: Wait for blocking IO Juan Quintela
                   ` (2 subsequent siblings)
  12 siblings, 0 replies; 18+ messages in thread
From: Juan Quintela @ 2018-06-26 18:19 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 migration/ram.c | 29 ++++++++++++++++++++++++-----
 1 file changed, 24 insertions(+), 5 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 050e2f2000..b9c8f65059 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -736,9 +736,6 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
     RAMBlock *block;
     int i;
 
-    /* ToDo: We can't use it until we haven't received a message */
-    return 0;
-
     be32_to_cpus(&packet->magic);
     if (packet->magic != MULTIFD_MAGIC) {
         error_setg(errp, "multifd: received packet "
@@ -994,6 +991,7 @@ static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
     Error *local_err = NULL;
+    int ret;
 
     trace_multifd_send_thread_start(p->id);
 
@@ -1021,7 +1019,16 @@ static void *multifd_send_thread(void *opaque)
 
             trace_multifd_send(p->id, packet_num, used, flags);
 
-            /* ToDo: send packet here */
+            ret = qio_channel_write_all(p->c, (void *)p->packet,
+                                        p->packet_len, &local_err);
+            if (ret != 0) {
+                break;
+            }
+
+            ret = qio_channel_writev_all(p->c, p->pages->iov, used, &local_err);
+            if (ret != 0) {
+                break;
+            }
 
             qemu_mutex_lock(&p->mutex);
             p->pending_job--;
@@ -1230,7 +1237,14 @@ static void *multifd_recv_thread(void *opaque)
         uint32_t used;
         uint32_t flags;
 
-        /* ToDo: recv packet here */
+        ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
+                                       p->packet_len, &local_err);
+        if (ret == 0) {   /* EOF */
+            break;
+        }
+        if (ret == -1) {   /* Error */
+            break;
+        }
 
         qemu_mutex_lock(&p->mutex);
         ret = multifd_recv_unfill_packet(p, &local_err);
@@ -1247,6 +1261,11 @@ static void *multifd_recv_thread(void *opaque)
         p->num_pages += used;
         qemu_mutex_unlock(&p->mutex);
 
+        ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err);
+        if (ret != 0) {
+            break;
+        }
+
         if (flags & MULTIFD_FLAG_SYNC) {
             qemu_sem_post(&multifd_recv_state->sem_sync);
             qemu_sem_wait(&p->sem_sync);
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [Qemu-devel] [PATCH v16 12/14] migration: Wait for blocking IO
  2018-06-26 18:19 [Qemu-devel] [PATCH v16 01/14] migration: Create multipage support Juan Quintela
                   ` (9 preceding siblings ...)
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 11/14] migration: Start sending messages Juan Quintela
@ 2018-06-26 18:19 ` Juan Quintela
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 13/14] migration: Remove not needed semaphore and quit Juan Quintela
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 14/14] migration: Stop sending whole pages through main channel Juan Quintela
  12 siblings, 0 replies; 18+ messages in thread
From: Juan Quintela @ 2018-06-26 18:19 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We have three conditions here:
- channel fails -> error
- we have to quit: we close the channel and reads fails
- normal read that success, we are in bussiness

So forget the complications of waiting in a semaphore.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 migration/ram.c | 13 -------------
 1 file changed, 13 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index b9c8f65059..5d38d699f3 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -602,8 +602,6 @@ typedef struct {
     bool running;
     /* should this thread finish */
     bool quit;
-    /* thread has work to do */
-    bool pending_job;
     /* array of pages to receive */
     MultiFDPages_t *pages;
     /* packet allocated len */
@@ -1199,14 +1197,6 @@ static void multifd_recv_sync_main(void)
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
-        trace_multifd_recv_sync_main_signal(p->id);
-        qemu_mutex_lock(&p->mutex);
-        p->pending_job = true;
-        qemu_mutex_unlock(&p->mutex);
-    }
-    for (i = 0; i < migrate_multifd_channels(); i++) {
-        MultiFDRecvParams *p = &multifd_recv_state->params[i];
-
         trace_multifd_recv_sync_main_wait(p->id);
         qemu_sem_wait(&multifd_recv_state->sem_sync);
         qemu_mutex_lock(&p->mutex);
@@ -1219,7 +1209,6 @@ static void multifd_recv_sync_main(void)
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
         trace_multifd_recv_sync_main_signal(p->id);
-
         qemu_sem_post(&p->sem_sync);
     }
     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
@@ -1256,7 +1245,6 @@ static void *multifd_recv_thread(void *opaque)
         used = p->pages->used;
         flags = p->flags;
         trace_multifd_recv(p->id, p->packet_num, used, flags);
-        p->pending_job = false;
         p->num_packets++;
         p->num_pages += used;
         qemu_mutex_unlock(&p->mutex);
@@ -1306,7 +1294,6 @@ int multifd_load_setup(void)
         qemu_sem_init(&p->sem, 0);
         qemu_sem_init(&p->sem_sync, 0);
         p->quit = false;
-        p->pending_job = false;
         p->id = i;
         p->pages = multifd_pages_init(page_count);
         p->packet_len = sizeof(MultiFDPacket_t)
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [Qemu-devel] [PATCH v16 13/14] migration: Remove not needed semaphore and quit
  2018-06-26 18:19 [Qemu-devel] [PATCH v16 01/14] migration: Create multipage support Juan Quintela
                   ` (10 preceding siblings ...)
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 12/14] migration: Wait for blocking IO Juan Quintela
@ 2018-06-26 18:19 ` Juan Quintela
  2018-06-27 10:46   ` Dr. David Alan Gilbert
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 14/14] migration: Stop sending whole pages through main channel Juan Quintela
  12 siblings, 1 reply; 18+ messages in thread
From: Juan Quintela @ 2018-06-26 18:19 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We know quit with shutdwon in the QIO.

Signed-off-by: Juan Quintela <quintela@redhat.com>
--
Add comment
Use shutdown() instead of unref()
---
 migration/ram.c | 14 +++++---------
 1 file changed, 5 insertions(+), 9 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 5d38d699f3..61f7313093 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -594,14 +594,10 @@ typedef struct {
     QemuThread thread;
     /* communication channel */
     QIOChannel *c;
-    /* sem where to wait for more work */
-    QemuSemaphore sem;
     /* this mutex protects the following parameters */
     QemuMutex mutex;
     /* is this channel thread running */
     bool running;
-    /* should this thread finish */
-    bool quit;
     /* array of pages to receive */
     MultiFDPages_t *pages;
     /* packet allocated len */
@@ -1144,8 +1140,11 @@ static void multifd_recv_terminate_threads(Error *err)
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
         qemu_mutex_lock(&p->mutex);
-        p->quit = true;
-        qemu_sem_post(&p->sem);
+        /* We could arrive here for two reasons:
+           - normal quit, i.e. everything went fine, just finished
+           - error quit: We close the channels so the channel threads
+             finish the qio_channel_read_all_eof() */
+        qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
         qemu_mutex_unlock(&p->mutex);
     }
 }
@@ -1168,7 +1167,6 @@ int multifd_load_cleanup(Error **errp)
         object_unref(OBJECT(p->c));
         p->c = NULL;
         qemu_mutex_destroy(&p->mutex);
-        qemu_sem_destroy(&p->sem);
         qemu_sem_destroy(&p->sem_sync);
         g_free(p->name);
         p->name = NULL;
@@ -1291,9 +1289,7 @@ int multifd_load_setup(void)
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
         qemu_mutex_init(&p->mutex);
-        qemu_sem_init(&p->sem, 0);
         qemu_sem_init(&p->sem_sync, 0);
-        p->quit = false;
         p->id = i;
         p->pages = multifd_pages_init(page_count);
         p->packet_len = sizeof(MultiFDPacket_t)
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [Qemu-devel] [PATCH v16 14/14] migration: Stop sending whole pages through main channel
  2018-06-26 18:19 [Qemu-devel] [PATCH v16 01/14] migration: Create multipage support Juan Quintela
                   ` (11 preceding siblings ...)
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 13/14] migration: Remove not needed semaphore and quit Juan Quintela
@ 2018-06-26 18:19 ` Juan Quintela
  12 siblings, 0 replies; 18+ messages in thread
From: Juan Quintela @ 2018-06-26 18:19 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We have to flush() the QEMUFile because now we sent really few data
through that channel.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 migration/ram.c | 11 +++--------
 1 file changed, 3 insertions(+), 8 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 61f7313093..7d23b472cb 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -1817,15 +1817,7 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
 static int ram_save_multifd_page(RAMState *rs, RAMBlock *block,
                                  ram_addr_t offset)
 {
-    uint8_t *p;
-
-    p = block->host + offset;
-
-    ram_counters.transferred += save_page_header(rs, rs->f, block,
-                                                 offset | RAM_SAVE_FLAG_PAGE);
     multifd_queue_page(block, offset);
-    qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
-    ram_counters.transferred += TARGET_PAGE_SIZE;
     ram_counters.normal++;
 
     return 1;
@@ -3066,6 +3058,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
 
     multifd_send_sync_main();
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
+    qemu_fflush(f);
 
     return 0;
 }
@@ -3148,6 +3141,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
     multifd_send_sync_main();
 out:
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
+    qemu_fflush(f);
     ram_counters.transferred += 8;
 
     ret = qemu_file_get_error(f);
@@ -3201,6 +3195,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
 
     multifd_send_sync_main();
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
+    qemu_fflush(f);
 
     return 0;
 }
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* Re: [Qemu-devel] [PATCH v16 03/14] migration: Calculate mbps only during transfer time
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 03/14] migration: Calculate mbps only during transfer time Juan Quintela
@ 2018-06-27  9:28   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 18+ messages in thread
From: Dr. David Alan Gilbert @ 2018-06-27  9:28 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> We used to include in this calculation the setup time, but that can be
> quite big in rdma or multifd.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>

Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

> ---
>  migration/migration.c | 6 ++++--
>  1 file changed, 4 insertions(+), 2 deletions(-)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index e1eaa97df4..d3e6da9bfe 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -2708,6 +2708,7 @@ static void migration_calculate_complete(MigrationState *s)
>  {
>      uint64_t bytes = qemu_ftell(s->to_dst_file);
>      int64_t end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
> +    int64_t transfer_time;
>  
>      s->total_time = end_time - s->start_time;
>      if (!s->downtime) {
> @@ -2718,8 +2719,9 @@ static void migration_calculate_complete(MigrationState *s)
>          s->downtime = end_time - s->downtime_start;
>      }
>  
> -    if (s->total_time) {
> -        s->mbps = ((double) bytes * 8.0) / s->total_time / 1000;
> +    transfer_time = s->total_time - s->setup_time;
> +    if (transfer_time) {
> +        s->mbps = ((double) bytes * 8.0) / transfer_time / 1000;
>      }
>  }
>  
> -- 
> 2.17.1
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [Qemu-devel] [PATCH v16 04/14] migration: Abstract the number of bytes sent
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 04/14] migration: Abstract the number of bytes sent Juan Quintela
@ 2018-06-27  9:46   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 18+ messages in thread
From: Dr. David Alan Gilbert @ 2018-06-27  9:46 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> Right now we use the "position" inside the QEMUFile, but things like
> RDMA already do weird things to be able to maintain that counter
> right, and multifd will have some similar problems.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>

Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

> ---
>  migration/migration.c | 14 +++++++++++---
>  1 file changed, 11 insertions(+), 3 deletions(-)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index d3e6da9bfe..264f3ce84e 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -2704,9 +2704,15 @@ static MigThrError migration_detect_error(MigrationState *s)
>      }
>  }
>  
> +/* How many bytes have we transferred since the beggining of the migration */
> +static uint64_t migration_total_bytes(MigrationState *s)
> +{
> +    return qemu_ftell(s->to_dst_file);
> +}
> +
>  static void migration_calculate_complete(MigrationState *s)
>  {
> -    uint64_t bytes = qemu_ftell(s->to_dst_file);
> +    uint64_t bytes = migration_total_bytes(s);
>      int64_t end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
>      int64_t transfer_time;
>  
> @@ -2729,13 +2735,15 @@ static void migration_update_counters(MigrationState *s,
>                                        int64_t current_time)
>  {
>      uint64_t transferred, time_spent;
> +    uint64_t current_bytes; /* bytes transferred since the beginning */
>      double bandwidth;
>  
>      if (current_time < s->iteration_start_time + BUFFER_DELAY) {
>          return;
>      }
>  
> -    transferred = qemu_ftell(s->to_dst_file) - s->iteration_initial_bytes;
> +    current_bytes = migration_total_bytes(s);
> +    transferred = current_bytes - s->iteration_initial_bytes;
>      time_spent = current_time - s->iteration_start_time;
>      bandwidth = (double)transferred / time_spent;
>      s->threshold_size = bandwidth * s->parameters.downtime_limit;
> @@ -2754,7 +2762,7 @@ static void migration_update_counters(MigrationState *s,
>      qemu_file_reset_rate_limit(s->to_dst_file);
>  
>      s->iteration_start_time = current_time;
> -    s->iteration_initial_bytes = qemu_ftell(s->to_dst_file);
> +    s->iteration_initial_bytes = current_bytes;
>  
>      trace_migrate_transferred(transferred, time_spent,
>                                bandwidth, s->threshold_size);
> -- 
> 2.17.1
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [Qemu-devel] [PATCH v16 09/14] migration: Create multifd_bytes ram_counter
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 09/14] migration: Create multifd_bytes ram_counter Juan Quintela
@ 2018-06-27 10:01   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 18+ messages in thread
From: Dr. David Alan Gilbert @ 2018-06-27 10:01 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> This will include how many bytes they are sent through multifd.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>

Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

> ---
>  hmp.c                 | 2 ++
>  migration/migration.c | 1 +
>  qapi/migration.json   | 5 ++++-
>  3 files changed, 7 insertions(+), 1 deletion(-)
> 
> diff --git a/hmp.c b/hmp.c
> index f601099f90..0da0b0ac33 100644
> --- a/hmp.c
> +++ b/hmp.c
> @@ -234,6 +234,8 @@ void hmp_info_migrate(Monitor *mon, const QDict *qdict)
>                         info->ram->dirty_sync_count);
>          monitor_printf(mon, "page size: %" PRIu64 " kbytes\n",
>                         info->ram->page_size >> 10);
> +        monitor_printf(mon, "multifd bytes: %" PRIu64 " kbytes\n",
> +                       info->ram->multifd_bytes >> 10);
>  
>          if (info->ram->dirty_pages_rate) {
>              monitor_printf(mon, "dirty pages rate: %" PRIu64 " pages\n",
> diff --git a/migration/migration.c b/migration/migration.c
> index 264f3ce84e..2680ba2d47 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -708,6 +708,7 @@ static void populate_ram_info(MigrationInfo *info, MigrationState *s)
>      info->ram->dirty_sync_count = ram_counters.dirty_sync_count;
>      info->ram->postcopy_requests = ram_counters.postcopy_requests;
>      info->ram->page_size = qemu_target_page_size();
> +    info->ram->multifd_bytes = ram_counters.multifd_bytes;
>  
>      if (migrate_use_xbzrle()) {
>          info->has_xbzrle_cache = true;
> diff --git a/qapi/migration.json b/qapi/migration.json
> index 1b4c1db670..186e8a7303 100644
> --- a/qapi/migration.json
> +++ b/qapi/migration.json
> @@ -39,6 +39,8 @@
>  # @page-size: The number of bytes per page for the various page-based
>  #        statistics (since 2.10)
>  #
> +# @multifd-bytes: The number of bytes sent through multifd (since 3.0)
> +#
>  # Since: 0.14.0
>  ##
>  { 'struct': 'MigrationStats',
> @@ -46,7 +48,8 @@
>             'duplicate': 'int', 'skipped': 'int', 'normal': 'int',
>             'normal-bytes': 'int', 'dirty-pages-rate' : 'int',
>             'mbps' : 'number', 'dirty-sync-count' : 'int',
> -           'postcopy-requests' : 'int', 'page-size' : 'int' } }
> +           'postcopy-requests' : 'int', 'page-size' : 'int',
> +           'multifd-bytes' : 'uint64' } }
>  
>  ##
>  # @XBZRLECacheStats:
> -- 
> 2.17.1
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [Qemu-devel] [PATCH v16 13/14] migration: Remove not needed semaphore and quit
  2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 13/14] migration: Remove not needed semaphore and quit Juan Quintela
@ 2018-06-27 10:46   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 18+ messages in thread
From: Dr. David Alan Gilbert @ 2018-06-27 10:46 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> We know quit with shutdwon in the QIO.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>

Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

> --
> Add comment
> Use shutdown() instead of unref()
> ---
>  migration/ram.c | 14 +++++---------
>  1 file changed, 5 insertions(+), 9 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 5d38d699f3..61f7313093 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -594,14 +594,10 @@ typedef struct {
>      QemuThread thread;
>      /* communication channel */
>      QIOChannel *c;
> -    /* sem where to wait for more work */
> -    QemuSemaphore sem;
>      /* this mutex protects the following parameters */
>      QemuMutex mutex;
>      /* is this channel thread running */
>      bool running;
> -    /* should this thread finish */
> -    bool quit;
>      /* array of pages to receive */
>      MultiFDPages_t *pages;
>      /* packet allocated len */
> @@ -1144,8 +1140,11 @@ static void multifd_recv_terminate_threads(Error *err)
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
>          qemu_mutex_lock(&p->mutex);
> -        p->quit = true;
> -        qemu_sem_post(&p->sem);
> +        /* We could arrive here for two reasons:
> +           - normal quit, i.e. everything went fine, just finished
> +           - error quit: We close the channels so the channel threads
> +             finish the qio_channel_read_all_eof() */
> +        qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
>          qemu_mutex_unlock(&p->mutex);
>      }
>  }
> @@ -1168,7 +1167,6 @@ int multifd_load_cleanup(Error **errp)
>          object_unref(OBJECT(p->c));
>          p->c = NULL;
>          qemu_mutex_destroy(&p->mutex);
> -        qemu_sem_destroy(&p->sem);
>          qemu_sem_destroy(&p->sem_sync);
>          g_free(p->name);
>          p->name = NULL;
> @@ -1291,9 +1289,7 @@ int multifd_load_setup(void)
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
>          qemu_mutex_init(&p->mutex);
> -        qemu_sem_init(&p->sem, 0);
>          qemu_sem_init(&p->sem_sync, 0);
> -        p->quit = false;
>          p->id = i;
>          p->pages = multifd_pages_init(page_count);
>          p->packet_len = sizeof(MultiFDPacket_t)
> -- 
> 2.17.1
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 18+ messages in thread

end of thread, other threads:[~2018-06-27 10:46 UTC | newest]

Thread overview: 18+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-06-26 18:19 [Qemu-devel] [PATCH v16 01/14] migration: Create multipage support Juan Quintela
2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 02/14] migration: Create multifd packet Juan Quintela
2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 03/14] migration: Calculate mbps only during transfer time Juan Quintela
2018-06-27  9:28   ` Dr. David Alan Gilbert
2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 04/14] migration: Abstract the number of bytes sent Juan Quintela
2018-06-27  9:46   ` Dr. David Alan Gilbert
2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 05/14] migration: Add multifd traces for start/end thread Juan Quintela
2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 06/14] migration: Multifd channels always wait on the sem Juan Quintela
2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 07/14] migration: Add block where to send/receive packets Juan Quintela
2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 08/14] migration: Synchronize multifd threads with main thread Juan Quintela
2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 09/14] migration: Create multifd_bytes ram_counter Juan Quintela
2018-06-27 10:01   ` Dr. David Alan Gilbert
2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 10/14] migration: Create ram_save_multifd_page Juan Quintela
2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 11/14] migration: Start sending messages Juan Quintela
2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 12/14] migration: Wait for blocking IO Juan Quintela
2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 13/14] migration: Remove not needed semaphore and quit Juan Quintela
2018-06-27 10:46   ` Dr. David Alan Gilbert
2018-06-26 18:19 ` [Qemu-devel] [PATCH v16 14/14] migration: Stop sending whole pages through main channel Juan Quintela

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.