From mboxrd@z Thu Jan 1 00:00:00 1970 From: Andrew Cooper Subject: Re: [RFC Patch v2 15/16] xc_domain_save: implement save_callbacks for colo Date: Thu, 11 Jul 2013 14:52:46 +0100 Message-ID: <51DEB8AE.6020102@citrix.com> References: <1373531748-12547-1-git-send-email-wency@cn.fujitsu.com> <1373531748-12547-16-git-send-email-wency@cn.fujitsu.com> Mime-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Return-path: In-Reply-To: <1373531748-12547-16-git-send-email-wency@cn.fujitsu.com> List-Unsubscribe: , List-Post: List-Help: List-Subscribe: , Sender: xen-devel-bounces@lists.xen.org Errors-To: xen-devel-bounces@lists.xen.org To: Wen Congyang Cc: Lai Jiangshan , Jiang Yunhong , Dong Eddie , Ye Wei , xen-devl , Hong Tao , Xu Yao , Shriram Rajagopalan List-Id: xen-devel@lists.xenproject.org On 11/07/13 09:35, Wen Congyang wrote: > Add a new save callbacks: > 1. post_sendstate(): SVM will run only when XC_SAVE_ID_LAST_CHECKPOINT is > sent to slaver. But we only sent XC_SAVE_ID_LAST_CHECKPOINT when we do > live migration now. Add this callback, and we can send it in this > callback. > > Update some callbacks for colo: > 1. suspend(): In colo mode, both PVM and SVM are running. So we should suspend > both PVM and SVM. > Communicate with slaver like this: > a. write "continue" to notify slaver to suspend SVM > b. suspend PVM and SVM > c. slaver writes "suspend" to tell master that SVM is suspended > 2. postcopy(): In colo mode, both PVM and SVM are running, and we have suspended > both PVM and SVM. So we should resume PVM and SVM > Communicate with slaver like this: > a. write "resume" to notify slaver to resume SVM > b. resume PVM and SVM > c. slaver writes "resume" to tell master that SVM is resumed > 3. checkpoint(): In colo mode, we do a new checkpoint only when output packet > from PVM and SVM is different. We will block in this callback and return > when a output packet is different. > > Signed-off-by: Ye Wei > Signed-off-by: Jiang Yunhong > Signed-off-by: Wen Congyang > --- > tools/libxc/xc_domain_save.c | 17 ++ > tools/libxc/xenguest.h | 3 + > tools/python/xen/lowlevel/checkpoint/checkpoint.c | 302 ++++++++++++++++++++- > tools/python/xen/lowlevel/checkpoint/checkpoint.h | 1 + > 4 files changed, 319 insertions(+), 4 deletions(-) > > diff --git a/tools/libxc/xc_domain_save.c b/tools/libxc/xc_domain_save.c > index b477188..8f84c9b 100644 > --- a/tools/libxc/xc_domain_save.c > +++ b/tools/libxc/xc_domain_save.c > @@ -1785,6 +1785,23 @@ int xc_domain_save(xc_interface *xch, int io_fd, uint32_t dom, uint32_t max_iter > } > } > > + /* Flush last write and discard cache for file. */ > + if ( outbuf_flush(xch, ob, io_fd) < 0 ) { > + PERROR("Error when flushing output buffer"); > + rc = 1; > + } > + > + discard_file_cache(xch, io_fd, 1 /* flush */); > + > + if ( callbacks->post_sendstate ) > + { > + if ( callbacks->post_sendstate(callbacks->data) < 0) > + { > + PERROR("Error: post_sendstate()\n"); > + goto out; > + } > + } > + > /* Zero terminate */ > i = 0; > if ( wrexact(io_fd, &i, sizeof(int)) ) > diff --git a/tools/libxc/xenguest.h b/tools/libxc/xenguest.h > index 4bb444a..9d7d03c 100644 > --- a/tools/libxc/xenguest.h > +++ b/tools/libxc/xenguest.h > @@ -72,6 +72,9 @@ struct save_callbacks { > */ > int (*toolstack_save)(uint32_t domid, uint8_t **buf, uint32_t *len, void *data); > > + /* called before Zero terminate is sent */ > + int (*post_sendstate)(void *data); > + > /* to be provided as the last argument to each callback function */ > void* data; > }; > diff --git a/tools/python/xen/lowlevel/checkpoint/checkpoint.c b/tools/python/xen/lowlevel/checkpoint/checkpoint.c > index ec14b27..28bdb23 100644 > --- a/tools/python/xen/lowlevel/checkpoint/checkpoint.c > +++ b/tools/python/xen/lowlevel/checkpoint/checkpoint.c > @@ -1,14 +1,22 @@ > /* python bridge to checkpointing API */ > > #include > +#include I cant see anything using this header file which is good, as otherwise I would still tell you that a python module should not be using any of its contents. ~Andrew > > #include > #include > +#include > +#include > > #include "checkpoint.h" > > #define PKG "xen.lowlevel.checkpoint" > > +#define COMP_IOC_MAGIC 'k' > +#define COMP_IOCTWAIT _IO(COMP_IOC_MAGIC, 0) > +#define COMP_IOCTFLUSH _IO(COMP_IOC_MAGIC, 1) > +#define COMP_IOCTRESUME _IO(COMP_IOC_MAGIC, 2) > + > static PyObject* CheckpointError; > > typedef struct { > @@ -25,11 +33,15 @@ typedef struct { > PyObject* setup_cb; > > PyThreadState* threadstate; > + int colo; > + int first_time; > + int dev_fd; > } CheckpointObject; > > static int suspend_trampoline(void* data); > static int postcopy_trampoline(void* data); > static int checkpoint_trampoline(void* data); > +static int post_sendstate_trampoline(void *data); > > static PyObject* Checkpoint_new(PyTypeObject* type, PyObject* args, > PyObject* kwargs) > @@ -169,10 +181,17 @@ static PyObject* pycheckpoint_start(PyObject* obj, PyObject* args) { > } else > self->setup_cb = NULL; > > + if (flags & CHECKPOINT_FLAGS_COLO) > + self->colo = 1; > + else > + self->colo = 0; > + self->first_time = 1; > + > memset(&callbacks, 0, sizeof(callbacks)); > callbacks.suspend = suspend_trampoline; > callbacks.postcopy = postcopy_trampoline; > callbacks.checkpoint = checkpoint_trampoline; > + callbacks.post_sendstate = post_sendstate_trampoline; > callbacks.data = self; > > self->threadstate = PyEval_SaveThread(); > @@ -279,6 +298,196 @@ PyMODINIT_FUNC initcheckpoint(void) { > block_timer(); > } > > +/* colo functions */ > + > +/* master slaver comment > + * "continue" ===> > + * <=== "suspend" guest is suspended > + */ > +static int notify_slaver_suspend(CheckpointObject *self) > +{ > + int fd = self->cps.fd; > + > + if (self->first_time == 1) > + return 0; > + > + return write_exact(fd, "continue", 8); > +} > + > +static int wait_slaver_suspend(CheckpointObject *self) > +{ > + int fd = self->cps.fd; > + xc_interface *xch = self->cps.xch; > + char buf[8]; > + > + if (self->first_time == 1) > + return 0; > + > + if ( read_exact(fd, buf, 7) < 0) { > + PERROR("read: suspend"); > + return -1; > + } > + > + buf[7] = '\0'; > + if (strcmp(buf, "suspend")) { > + PERROR("read \"%s\", expect \"suspend\"", buf); > + return -1; > + } > + > + return 0; > +} > + > +static int notify_slaver_start_checkpoint(CheckpointObject *self) > +{ > + int fd = self->cps.fd; > + xc_interface *xch = self->cps.xch; > + > + if (self->first_time == 1) > + return 0; > + > + if ( write_exact(fd, "start", 5) < 0) { > + PERROR("write start"); > + return -1; > + } > + > + return 0; > +} > + > +/* > + * master slaver > + * <==== "finish" > + * flush packets > + * "resume" ====> > + * resume vm resume vm > + * <==== "resume" > + */ > +static int notify_slaver_resume(CheckpointObject *self) > +{ > + int fd = self->cps.fd; > + xc_interface *xch = self->cps.xch; > + char buf[7]; > + > + /* wait slaver to finish update memory, device state... */ > + if ( read_exact(fd, buf, 6) < 0) { > + PERROR("read: finish"); > + return -1; > + } > + > + buf[6] = '\0'; > + if (strcmp(buf, "finish")) { > + ERROR("read \"%s\", expect \"finish\"", buf); > + return -1; > + } > + > + if (!self->first_time) > + /* flush queued packets now */ > + ioctl(self->dev_fd, COMP_IOCTFLUSH); > + > + /* notify slaver to resume vm*/ > + if (write_exact(fd, "resume", 6) < 0) { > + PERROR("write: resume"); > + return -1; > + } > + > + return 0; > +} > + > +static int install_fw_network(CheckpointObject *self) > +{ > + int rc; > + PyObject* result; > + > + PyEval_RestoreThread(self->threadstate); > + result = PyObject_CallFunction(self->setup_cb, NULL); > + self->threadstate = PyEval_SaveThread(); > + > + if (!result) > + return -1; > + > + if (result == Py_None || PyObject_IsTrue(result)) > + rc = 0; > + else > + rc = -1; > + > + Py_DECREF(result); > + > + return rc; > +} > + > +static int wait_slaver_resume(CheckpointObject *self) > +{ > + int fd = self->cps.fd; > + xc_interface *xch = self->cps.xch; > + char buf[7]; > + > + if (read_exact(fd, buf, 6) < 0) { > + PERROR("read resume"); > + return -1; > + } > + > + buf[6] = '\0'; > + if (strcmp(buf, "resume")) { > + ERROR("read \"%s\", expect \"resume\"", buf); > + return -1; > + } > + > + return 0; > +} > + > +static int colo_postresume(CheckpointObject *self) > +{ > + int rc; > + int dev_fd = self->dev_fd; > + > + rc = wait_slaver_resume(self); > + if (rc < 0) > + return rc; > + > + if (self->first_time) { > + rc = install_fw_network(self); > + if (rc < 0) { > + fprintf(stderr, "install network fails\n"); > + return rc; > + } > + } else { > + ioctl(dev_fd, COMP_IOCTRESUME); > + } > + > + return 0; > +} > + > +static int pre_checkpoint(CheckpointObject *self) > +{ > + xc_interface *xch = self->cps.xch; > + > + if (!self->first_time) > + return 0; > + > + self->dev_fd = open("/dev/HA_compare", O_RDWR); > + if (self->dev_fd < 0) { > + PERROR("opening /dev/HA_compare fails"); > + return -1; > + } > + > + return 0; > +} > + > +static void wait_new_checkpoint(CheckpointObject *self) > +{ > + int dev_fd = self->dev_fd; > + int err; > + > + while (1) { > + err = ioctl(dev_fd, COMP_IOCTWAIT); > + if (err == 0) > + break; > + > + if (err == -1 && errno != ERESTART && errno != ETIME) { > + fprintf(stderr, "ioctl() returns -1, errno: %d\n", errno); > + } > + } > +} > + > /* private functions */ > > /* bounce C suspend call into python equivalent. > @@ -289,6 +498,13 @@ static int suspend_trampoline(void* data) > > PyObject* result; > > + if (self->colo) { > + if (notify_slaver_suspend(self) < 0) { > + fprintf(stderr, "nofitying slaver suspend fails\n"); > + return 0; > + } > + } > + > /* call default suspend function, then python hook if available */ > if (self->armed) { > if (checkpoint_wait(&self->cps) < 0) { > @@ -307,8 +523,16 @@ static int suspend_trampoline(void* data) > } > } > > + /* suspend_cb() should be called after both sides are suspended */ > + if (self->colo) { > + if (wait_slaver_suspend(self) < 0) { > + fprintf(stderr, "waiting slaver suspend fails\n"); > + return 0; > + } > + } > + > if (!self->suspend_cb) > - return 1; > + goto start_checkpoint; > > PyEval_RestoreThread(self->threadstate); > result = PyObject_CallFunction(self->suspend_cb, NULL); > @@ -319,12 +543,32 @@ static int suspend_trampoline(void* data) > > if (result == Py_None || PyObject_IsTrue(result)) { > Py_DECREF(result); > - return 1; > + goto start_checkpoint; > } > > Py_DECREF(result); > > return 0; > + > +start_checkpoint: > + if (self->colo) { > + if (notify_slaver_start_checkpoint(self) < 0) { > + fprintf(stderr, "nofitying slaver to start checkpoint fails\n"); > + return 0; > + } > + > + /* PVM is suspended first when doing live migration, > + * and then it is suspended for a new checkpoint. > + */ > + if (self->first_time == 1) > + /* live migration */ > + self->first_time = 2; > + else if (self->first_time == 2) > + /* the first checkpoint */ > + self->first_time = 0; > + } > + > + return 1; > } > > static int postcopy_trampoline(void* data) > @@ -334,6 +578,13 @@ static int postcopy_trampoline(void* data) > PyObject* result; > int rc = 0; > > + if (self->colo) { > + if (notify_slaver_resume(self) < 0) { > + fprintf(stderr, "nofitying slaver resume fails\n"); > + return 0; > + } > + } > + > if (!self->postcopy_cb) > goto resume; > > @@ -352,6 +603,13 @@ static int postcopy_trampoline(void* data) > return 0; > } > > + if (self->colo) { > + if (colo_postresume(self) < 0) { > + fprintf(stderr, "postresume fails\n"); > + return 0; > + } > + } > + > return rc; > } > > @@ -366,8 +624,15 @@ static int checkpoint_trampoline(void* data) > return -1; > } > > + if (self->colo) { > + if (pre_checkpoint(self) < 0) { > + fprintf(stderr, "pre_checkpoint() fails\n"); > + return -1; > + } > + } > + > if (!self->checkpoint_cb) > - return 0; > + goto wait_checkpoint; > > PyEval_RestoreThread(self->threadstate); > result = PyObject_CallFunction(self->checkpoint_cb, NULL); > @@ -378,10 +643,39 @@ static int checkpoint_trampoline(void* data) > > if (result == Py_None || PyObject_IsTrue(result)) { > Py_DECREF(result); > - return 1; > + goto wait_checkpoint; > } > > Py_DECREF(result); > > return 0; > + > +wait_checkpoint: > + if (self->colo) { > + wait_new_checkpoint(self); > + } > + > + fprintf(stderr, "\n\nnew checkpoint..........\n"); > + > + return 1; > +} > + > +static int post_sendstate_trampoline(void* data) > +{ > + CheckpointObject *self = data; > + int fd = self->cps.fd; > + int i = XC_SAVE_ID_LAST_CHECKPOINT; > + > + if (!self->colo) > + return 0; > + > + /* In colo mode, guest is running on slaver side, so we should > + * send XC_SAVE_ID_LAST_CHECKPOINT to slaver. > + */ > + if (write_exact(fd, &i, sizeof(int)) < 0) { > + fprintf(stderr, "writing XC_SAVE_ID_LAST_CHECKPOINT fails\n"); > + return -1; > + } > + > + return 0; > } > diff --git a/tools/python/xen/lowlevel/checkpoint/checkpoint.h b/tools/python/xen/lowlevel/checkpoint/checkpoint.h > index 187d9d7..96fc949 100644 > --- a/tools/python/xen/lowlevel/checkpoint/checkpoint.h > +++ b/tools/python/xen/lowlevel/checkpoint/checkpoint.h > @@ -41,6 +41,7 @@ typedef struct { > } checkpoint_state; > > #define CHECKPOINT_FLAGS_COMPRESSION 1 > +#define CHECKPOINT_FLAGS_COLO 2 > char* checkpoint_error(checkpoint_state* s); > > void checkpoint_init(checkpoint_state* s);