From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from eggs.gnu.org ([2001:4830:134:3::10]:45297) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1YdKLU-0006RV-Gy for qemu-devel@nongnu.org; Wed, 01 Apr 2015 11:14:22 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1YdKLO-0003ln-BF for qemu-devel@nongnu.org; Wed, 01 Apr 2015 11:14:20 -0400 Received: from mx1.redhat.com ([209.132.183.28]:33346) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1YdKLO-0003l3-23 for qemu-devel@nongnu.org; Wed, 01 Apr 2015 11:14:14 -0400 Date: Wed, 1 Apr 2015 16:14:05 +0100 From: "Dr. David Alan Gilbert" Message-ID: <20150401151404.GA2310@work-vm> References: <1424883128-9841-1-git-send-email-dgilbert@redhat.com> <1424883128-9841-13-git-send-email-dgilbert@redhat.com> <20150310060824.GD11973@voom.redhat.com> <20150320181730.GI2468@work-vm> <20150323023739.GH25043@voom.fritz.box> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <20150323023739.GH25043@voom.fritz.box> Subject: Re: [Qemu-devel] [PATCH v5 12/45] Return path: Source handling of return path List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: David Gibson Cc: aarcange@redhat.com, yamahata@private.email.ne.jp, quintela@redhat.com, qemu-devel@nongnu.org, amit.shah@redhat.com, pbonzini@redhat.com, yanghy@cn.fujitsu.com * David Gibson (david@gibson.dropbear.id.au) wrote: > On Fri, Mar 20, 2015 at 06:17:31PM +0000, Dr. David Alan Gilbert wrote: > > * David Gibson (david@gibson.dropbear.id.au) wrote: > > > On Wed, Feb 25, 2015 at 04:51:35PM +0000, Dr. David Alan Gilbert (git) wrote: > > > > From: "Dr. David Alan Gilbert" > > > > > > > > Open a return path, and handle messages that are received upon it. > > > > > > > > Signed-off-by: Dr. David Alan Gilbert > > > > --- > > > > include/migration/migration.h | 8 ++ > > > > migration/migration.c | 178 +++++++++++++++++++++++++++++++++++++++++- > > > > trace-events | 13 +++ > > > > 3 files changed, 198 insertions(+), 1 deletion(-) > > > > > > > > diff --git a/include/migration/migration.h b/include/migration/migration.h > > > > index 6775747..5242ead 100644 > > > > --- a/include/migration/migration.h > > > > +++ b/include/migration/migration.h > > > > @@ -73,6 +73,14 @@ struct MigrationState > > > > > > > > int state; > > > > MigrationParams params; > > > > + > > > > + /* State related to return path */ > > > > + struct { > > > > + QEMUFile *file; > > > > + QemuThread rp_thread; > > > > + bool error; > > > > + } rp_state; > > > > + > > > > double mbps; > > > > int64_t total_time; > > > > int64_t downtime; > > > > diff --git a/migration/migration.c b/migration/migration.c > > > > index 80d234c..34cd4fe 100644 > > > > --- a/migration/migration.c > > > > +++ b/migration/migration.c > > > > @@ -237,6 +237,23 @@ MigrationCapabilityStatusList *qmp_query_migrate_capabilities(Error **errp) > > > > return head; > > > > } > > > > > > > > +/* > > > > + * Return true if we're already in the middle of a migration > > > > + * (i.e. any of the active or setup states) > > > > + */ > > > > +static bool migration_already_active(MigrationState *ms) > > > > +{ > > > > + switch (ms->state) { > > > > + case MIG_STATE_ACTIVE: > > > > + case MIG_STATE_SETUP: > > > > + return true; > > > > + > > > > + default: > > > > + return false; > > > > + > > > > + } > > > > +} > > > > + > > > > static void get_xbzrle_cache_stats(MigrationInfo *info) > > > > { > > > > if (migrate_use_xbzrle()) { > > > > @@ -362,6 +379,21 @@ static void migrate_set_state(MigrationState *s, int old_state, int new_state) > > > > } > > > > } > > > > > > > > +static void migrate_fd_cleanup_src_rp(MigrationState *ms) > > > > +{ > > > > + QEMUFile *rp = ms->rp_state.file; > > > > + > > > > + /* > > > > + * When stuff goes wrong (e.g. failing destination) on the rp, it can get > > > > + * cleaned up from a few threads; make sure not to do it twice in parallel > > > > + */ > > > > + rp = atomic_cmpxchg(&ms->rp_state.file, rp, NULL); > > > > > > A cmpxchg seems dangerously subtle for such a basic and infrequent > > > operation, but ok. > > > > I'll take other suggestions; but I'm trying to just do > > 'if the qemu_file still exists close it', and it didn't seem > > worth introducing another state variable to atomically update > > when we've already got the file pointer itself. > > Yes, I see the rationale. My concern is just that the more atomicity > mechanisms are scattered through the code, the harder it is to analyze > and be sure you haven't missed race cases (or introduced then with a > future change). > > In short, I prefer to see a simple-as-possible, and preferably > documented, consistent overall concurrency scheme for a data > structure, rather than scattered atomic ops for various variable where > it's difficult to see how all the pieces might relate together. > > > > > + if (rp) { > > > > + trace_migrate_fd_cleanup_src_rp(); > > > > + qemu_fclose(rp); > > > > + } > > > > +} > > > > + > > > > static void migrate_fd_cleanup(void *opaque) > > > > { > > > > MigrationState *s = opaque; > > > > @@ -369,6 +401,8 @@ static void migrate_fd_cleanup(void *opaque) > > > > qemu_bh_delete(s->cleanup_bh); > > > > s->cleanup_bh = NULL; > > > > > > > > + migrate_fd_cleanup_src_rp(s); > > > > + > > > > if (s->file) { > > > > trace_migrate_fd_cleanup(); > > > > qemu_mutex_unlock_iothread(); > > > > @@ -406,6 +440,11 @@ static void migrate_fd_cancel(MigrationState *s) > > > > QEMUFile *f = migrate_get_current()->file; > > > > trace_migrate_fd_cancel(); > > > > > > > > + if (s->rp_state.file) { > > > > + /* shutdown the rp socket, so causing the rp thread to shutdown */ > > > > + qemu_file_shutdown(s->rp_state.file); > > > > > > I missed where qemu_file_shutdown() was implemented. Does this > > > introduce a leftover socket dependency? > > > > No, it shouldn't. The shutdown() causes a shutdown(2) syscall to > > be issued on the socket stopping anything blocking on it; it then > > gets closed at the end after the rp thread has exited. > > > Sorry, that's not what I meant. I mean is this a hole in the > abstraction of the QemuFile, because it assumes that what you're > dealing with here is indeed a socket, rather than something else? It's just a dependency that we have a shutdown method on the qemu_file we're using; if it's not a socket then whatever it is, if we're going to use it for a rp then it needs to implement something equivalent. > > > > + } > > > > + > > > > do { > > > > old_state = s->state; > > > > if (old_state != MIG_STATE_SETUP && old_state != MIG_STATE_ACTIVE) { > > > > @@ -658,8 +697,145 @@ int64_t migrate_xbzrle_cache_size(void) > > > > return s->xbzrle_cache_size; > > > > } > > > > > > > > -/* migration thread support */ > > > > +/* > > > > + * Something bad happened to the RP stream, mark an error > > > > + * The caller shall print something to indicate why > > > > + */ > > > > +static void source_return_path_bad(MigrationState *s) > > > > +{ > > > > + s->rp_state.error = true; > > > > + migrate_fd_cleanup_src_rp(s); > > > > +} > > > > + > > > > +/* > > > > + * Handles messages sent on the return path towards the source VM > > > > + * > > > > + */ > > > > +static void *source_return_path_thread(void *opaque) > > > > +{ > > > > + MigrationState *ms = opaque; > > > > + QEMUFile *rp = ms->rp_state.file; > > > > + uint16_t expected_len, header_len, header_com; > > > > + const int max_len = 512; > > > > + uint8_t buf[max_len]; > > > > + uint32_t tmp32; > > > > + int res; > > > > + > > > > + trace_source_return_path_thread_entry(); > > > > + while (rp && !qemu_file_get_error(rp) && > > > > + migration_already_active(ms)) { > > > > + trace_source_return_path_thread_loop_top(); > > > > + header_com = qemu_get_be16(rp); > > > > + header_len = qemu_get_be16(rp); > > > > + > > > > + switch (header_com) { > > > > + case MIG_RP_CMD_SHUT: > > > > + case MIG_RP_CMD_PONG: > > > > + expected_len = 4; > > > > > > Could the knowledge of expected lengths be folded into the switch > > > below? Switching twice on the same thing is a bit icky. > > > > No, because the length at this point is used to valdiate the > > length field in the header prior to reading the body. > > The other switch processes the contents of the body that > > have been read. > > Ok. > > > > > + break; > > > > + > > > > + default: > > > > + error_report("RP: Received invalid cmd 0x%04x length 0x%04x", > > > > + header_com, header_len); > > > > + source_return_path_bad(ms); > > > > + goto out; > > > > + } > > > > > > > > + if (header_len > expected_len) { > > > > + error_report("RP: Received command 0x%04x with" > > > > + "incorrect length %d expecting %d", > > > > + header_com, header_len, > > > > + expected_len); > > > > + source_return_path_bad(ms); > > > > + goto out; > > > > + } > > > > + > > > > + /* We know we've got a valid header by this point */ > > > > + res = qemu_get_buffer(rp, buf, header_len); > > > > + if (res != header_len) { > > > > + trace_source_return_path_thread_failed_read_cmd_data(); > > > > + source_return_path_bad(ms); > > > > + goto out; > > > > + } > > > > + > > > > + /* OK, we have the command and the data */ > > > > + switch (header_com) { > > > > + case MIG_RP_CMD_SHUT: > > > > + tmp32 = be32_to_cpup((uint32_t *)buf); > > > > + trace_source_return_path_thread_shut(tmp32); > > > > + if (tmp32) { > > > > + error_report("RP: Sibling indicated error %d", tmp32); > > > > + source_return_path_bad(ms); > > > > + } > > > > + /* > > > > + * We'll let the main thread deal with closing the RP > > > > + * we could do a shutdown(2) on it, but we're the only user > > > > + * anyway, so there's nothing gained. > > > > + */ > > > > + goto out; > > > > + > > > > + case MIG_RP_CMD_PONG: > > > > + tmp32 = be32_to_cpup((uint32_t *)buf); > > > > + trace_source_return_path_thread_pong(tmp32); > > > > + break; > > > > + > > > > + default: > > > > + /* This shouldn't happen because we should catch this above */ > > > > + trace_source_return_path_bad_header_com(); > > > > + } > > > > + /* Latest command processed, now leave a gap for the next one */ > > > > + header_com = MIG_RP_CMD_INVALID; > > > > > > This assignment will always get overwritten. > > > > Thanks; gone - it's a left over from an old version. > > > > > > + } > > > > + if (rp && qemu_file_get_error(rp)) { > > > > + trace_source_return_path_thread_bad_end(); > > > > + source_return_path_bad(ms); > > > > + } > > > > + > > > > + trace_source_return_path_thread_end(); > > > > +out: > > > > + return NULL; > > > > +} > > > > + > > > > +__attribute__ (( unused )) /* Until later in patch series */ > > > > +static int open_outgoing_return_path(MigrationState *ms) > > > > > > Uh.. surely this should be open_incoming_return_path(); it's designed > > > to be used on the source side, AFAICT. > > > > > > > +{ > > > > + > > > > + ms->rp_state.file = qemu_file_get_return_path(ms->file); > > > > + if (!ms->rp_state.file) { > > > > + return -1; > > > > + } > > > > + > > > > + trace_open_outgoing_return_path(); > > > > + qemu_thread_create(&ms->rp_state.rp_thread, "return path", > > > > + source_return_path_thread, ms, QEMU_THREAD_JOINABLE); > > > > + > > > > + trace_open_outgoing_return_path_continue(); > > > > + > > > > + return 0; > > > > +} > > > > + > > > > +__attribute__ (( unused )) /* Until later in patch series */ > > > > +static void await_outgoing_return_path_close(MigrationState *ms) > > > > > > Likewise "incoming" here, surely. > > > > I've changed those two to open_source_return_path() which seems less ambiguous; > > that OK? > > Uh.. not really, it just moves the ambiguity to a different place (is > "source return path" the return path *on* the source or *to* the > source). > > Perhaps "open_return_path_on_source" and > "await_return_path_close_on_source"? I'm not particularly fond of > those, but they're the best I've come up with yet. Done. Dave > > -- > David Gibson | I'll have my music baroque, and my code > david AT gibson.dropbear.id.au | minimalist, thank you. NOT _the_ _other_ > | _way_ _around_! > http://www.ozlabs.org/~dgibson -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK