From mboxrd@z Thu Jan 1 00:00:00 1970 From: Shriram Rajagopalan Subject: Re: [RFC Patch v3 15/22] blktap2: move async connect related codes to block-replication.c Date: Wed, 24 Sep 2014 11:48:22 -0700 Message-ID: References: <1409909158-19243-1-git-send-email-wency@cn.fujitsu.com> <1409909158-19243-16-git-send-email-wency@cn.fujitsu.com> Reply-To: rshriram@cs.ubc.ca Mime-Version: 1.0 Content-Type: multipart/mixed; boundary="===============8024538590219703831==" Return-path: In-Reply-To: <1409909158-19243-16-git-send-email-wency@cn.fujitsu.com> List-Unsubscribe: , List-Post: List-Help: List-Subscribe: , Sender: xen-devel-bounces@lists.xen.org Errors-To: xen-devel-bounces@lists.xen.org To: Wen Congyang Cc: Lai Jiangshan , Ian Jackson , Jiang Yunhong , Dong Eddie , xen devel , Yang Hongyang , Ian Campbell List-Id: xen-devel@lists.xenproject.org --===============8024538590219703831== Content-Type: multipart/alternative; boundary=bcaec5014c9fdc65c80503d42006 --bcaec5014c9fdc65c80503d42006 Content-Type: text/plain; charset=UTF-8 On Sep 5, 2014 5:31 AM, "Wen Congyang" wrote: > > COLO will reuse them. > > Signed-off-by: Wen Congyang > Cc: Shriram Rajagopalan > --- > tools/blktap2/drivers/Makefile | 2 +- > tools/blktap2/drivers/block-remus.c | 494 +++--------------------------- > tools/blktap2/drivers/block-replication.c | 468 ++++++++++++++++++++++++++++ > tools/blktap2/drivers/block-replication.h | 113 +++++++ > 4 files changed, 630 insertions(+), 447 deletions(-) > create mode 100644 tools/blktap2/drivers/block-replication.c > create mode 100644 tools/blktap2/drivers/block-replication.h > > diff --git a/tools/blktap2/drivers/Makefile b/tools/blktap2/drivers/Makefile > index 37c3485..3d8ed8a 100644 > --- a/tools/blktap2/drivers/Makefile > +++ b/tools/blktap2/drivers/Makefile > @@ -23,7 +23,7 @@ endif > > VHDLIBS := -L$(LIBVHDDIR) -lvhd > > -REMUS-OBJS := block-remus.o > +REMUS-OBJS := block-remus.o block-replication.o > REMUS-OBJS += hashtable.o > REMUS-OBJS += hashtable_itr.o > REMUS-OBJS += hashtable_utility.o > diff --git a/tools/blktap2/drivers/block-remus.c b/tools/blktap2/drivers/block-remus.c > index 5d27d41..8b6f157 100644 > --- a/tools/blktap2/drivers/block-remus.c > +++ b/tools/blktap2/drivers/block-remus.c > @@ -40,6 +40,7 @@ > #include "hashtable.h" > #include "hashtable_itr.h" > #include "hashtable_utility.h" > +#include "block-replication.h" > > #include > #include > @@ -49,10 +50,7 @@ > #include > #include > #include > -#include > -#include > #include > -#include > #include > #include > #include > @@ -67,22 +65,6 @@ > > #define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: " _f, ## _a) > > -#define UNREGISTER_EVENT(id) \ > - do { \ > - if (id >= 0) { \ > - tapdisk_server_unregister_event(id); \ > - id = -1; \ > - } \ > - } while (0) > - > -#define CLOSE_FD(fd) \ > - do { \ > - if (fd >= 0) { \ > - close(fd); \ > - fd = -1; \ > - } \ > - } while (0) > - > #define MAX_REMUS_REQUEST TAPDISK_DATA_REQUESTS > > enum tdremus_mode { > @@ -92,13 +74,6 @@ enum tdremus_mode { > mode_backup > }; > > -enum { > - ERROR_INTERNAL = -1, > - ERROR_IO = -2, > - ERROR_CONNECTION = -3, > - ERROR_CLOSE = -4, > -}; > - > struct tdremus_req { > td_request_t treq; > }; > @@ -167,21 +142,9 @@ struct ramdisk_write_cbdata { > > typedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq); > > -/* > - * If cid, rid and wid are -1, fd must be -1. It means that > - * we are in unpritected mode or we don't start to connect > - * to backup. > - * If fd is an valid fd: > - * cid is valid, rid and wid must be invalid. It means that > - * the connection is in progress. > - * cid is invalid. rid or wid must be valid. It means that > - * the connection is established. > - */ > typedef struct poll_fd { > int fd; > - event_id_t cid; > - event_id_t rid; > - event_id_t wid; > + event_id_t id; > } poll_fd_t; > > struct tdremus_state { > @@ -195,9 +158,7 @@ struct tdremus_state { > char* msg_path; /* output completion message here */ > poll_fd_t msg_fd; > > - /* replication host */ > - struct sockaddr_in sa; > - poll_fd_t server_fd; /* server listen port */ > + td_replication_connect_t t; > poll_fd_t stream_fd; /* replication channel */ > > /* > @@ -777,28 +738,8 @@ static int mwrite(int fd, void* buf, size_t len) > select(fd + 1, NULL, &wfds, NULL, &tv); > } > > - > -static void inline close_stream_fd(struct tdremus_state *s) > -{ > - > - UNREGISTER_EVENT(s->stream_fd.cid); > - UNREGISTER_EVENT(s->stream_fd.rid); > - UNREGISTER_EVENT(s->stream_fd.wid); > - > - /* close the connection */ > - CLOSE_FD(s->stream_fd.fd); > -} > - > -static void close_server_fd(struct tdremus_state *s) > -{ > - UNREGISTER_EVENT(s->server_fd.cid); > - CLOSE_FD(s->server_fd.fd); > -} > - > /* primary functions */ > -static void remus_client_event(event_id_t, char mode, void *private); > -static void remus_connect_event(event_id_t id, char mode, void *private); > -static void remus_retry_connect_event(event_id_t id, char mode, void *private); > +static void remus_client_event(event_id_t id, char mode, void *private); > static int primary_forward_request(struct tdremus_state *s, > const td_request_t *treq); > > @@ -808,56 +749,15 @@ static int primary_forward_request(struct tdremus_state *s, > */ > static void primary_failed(struct tdremus_state *s, int rc) > { > - close_stream_fd(s); > + td_replication_connect_kill(&s->t); > if (rc == ERROR_INTERNAL) > RPRINTF("switch to unprotected mode due to internal error"); > if (rc == ERROR_CLOSE) > RPRINTF("switch to unprotected mode before closing"); > + UNREGISTER_EVENT(s->stream_fd.id); > switch_mode(s->tdremus_driver, mode_unprotected); > } > > -static int primary_do_connect(struct tdremus_state *state) > -{ > - event_id_t id; > - int fd; > - int rc; > - int flags; > - > - RPRINTF("client connecting to %s:%d...\n", > - inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port)); > - > - if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) { > - RPRINTF("could not create client socket: %d\n", errno); > - return ERROR_INTERNAL; > - } > - state->stream_fd.fd = fd; > - > - /* make socket nonblocking */ > - if ((flags = fcntl(fd, F_GETFL, 0)) == -1) > - flags = 0; > - if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) { > - RPRINTF("error setting fd %d to non block mode\n", fd); > - return ERROR_INTERNAL; > - } > - > - /* > - * once we have created the socket and populated the address, > - * we can now start our non-blocking connect. rather than > - * duplicating code we trigger a timeout on the socket fd, > - * which calls out nonblocking connect code > - */ > - if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0, > - remus_retry_connect_event, > - state)) < 0) { > - RPRINTF("error registering timeout client connection event handler: %s\n", > - strerror(id)); > - return ERROR_INTERNAL; > - } > - > - state->stream_fd.cid = id; > - return 0; > -} > - > static int remus_handle_queued_io(struct tdremus_state *s) > { > struct req_ring *queued_io = &s->queued_io; > @@ -882,184 +782,35 @@ static int remus_handle_queued_io(struct tdremus_state *s) > return 0; > } > > -static int remus_connection_done(struct tdremus_state *s) > +static void remus_client_established(td_replication_connect_t *t, int rc) > { > + struct tdremus_state *s = CONTAINER_OF(t, *s, t); > event_id_t id; > > - /* the connect succeeded */ > - /* unregister this function and register a new event handler */ > - tapdisk_server_unregister_event(s->stream_fd.cid); > - s->stream_fd.cid = -1; > + if (rc) { > + primary_failed(s, rc); > + return; > + } > > - id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->stream_fd.fd, > + /* the connect succeeded */ > + id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd, > 0, remus_client_event, s); > if(id < 0) { > RPRINTF("error registering client event handler: %s\n", > strerror(id)); > - return ERROR_INTERNAL; > - } > - s->stream_fd.rid = id; > - > - /* handle the queued requests */ > - return remus_handle_queued_io(s); > -} > - > -static int remus_retry_connect(struct tdremus_state *s) > -{ > - event_id_t id; > - > - tapdisk_server_unregister_event(s->stream_fd.cid); > - s->stream_fd.cid = -1; > - > - RPRINTF("connect to backup 1 second later"); > - id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, > - s->stream_fd.fd, > - REMUS_CONNRETRY_TIMEOUT, > - remus_retry_connect_event, s); > - if (id < 0) { > - RPRINTF("error registering timeout client connection event handler: %s\n", > - strerror(id)); > - return ERROR_INTERNAL; > - } > - > - s->stream_fd.cid = id; > - return 0; > -} > - > -static int remus_wait_connect_done(struct tdremus_state *s) > -{ > - event_id_t id; > - > - tapdisk_server_unregister_event(s->stream_fd.cid); > - s->stream_fd.cid = -1; > - > - id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD, > - s->stream_fd.fd, 0, > - remus_connect_event, s); > - if (id < 0) { > - RPRINTF("error registering client connection event handler: %s\n", > - strerror(id)); > - return ERROR_INTERNAL; > - } > - s->stream_fd.cid = id; > - > - return 0; > -} > - > -/* return 1 if we need to reconnect to backup */ > -static int check_connect_errno(int err) > -{ > - /* > - * The fd is non-block, so we will not get ETIMEDOUT > - * after calling connect(). We only can get this errno > - * by getsockopt(). > - */ > - if (err == ECONNREFUSED || err == ENETUNREACH || > - err == EAGAIN || err == ECONNABORTED || > - err == ETIMEDOUT) > - return 1; > - > - return 0; > -} > - > -static void remus_retry_connect_event(event_id_t id, char mode, void *private) > -{ > - struct tdremus_state *s = (struct tdremus_state *)private; > - int rc, ret; > - > - /* do a non-blocking connect */ > - ret = connect(s->stream_fd.fd, > - (struct sockaddr *)&s->sa, > - sizeof(s->sa)); > - if (ret) { > - if (errno == EINPROGRESS) { > - /* > - * the connect returned EINPROGRESS (nonblocking > - * connect) we must wait for the fd to be writeable > - * to determine if the connect worked > - */ > - rc = remus_wait_connect_done(s); > - if (rc) > - goto fail; > - return; > - } > - > - if (check_connect_errno(errno)) { > - rc = remus_retry_connect(s); > - if (rc) > - goto fail; > - return; > - } > - > - /* not recoverable */ > - RPRINTF("error connection to server %s\n", strerror(errno)); > - rc = ERROR_CONNECTION; > - goto fail; > - } > - > - /* The connection is established unexpectedly */ > - rc = remus_connection_done(s); > - if (rc) > - goto fail; > - > - return; > - > -fail: > - primary_failed(s, rc); > - return; > -} > - > -/* callback when nonblocking connect() is finished */ > -static void remus_connect_event(event_id_t id, char mode, void *private) > -{ > - int socket_errno; > - socklen_t socket_errno_size; > - struct tdremus_state *s = (struct tdremus_state *)private; > - int rc; > - > - /* check to see if the connect succeeded */ > - socket_errno_size = sizeof(socket_errno); > - if (getsockopt(s->stream_fd.fd, SOL_SOCKET, SO_ERROR, > - &socket_errno, &socket_errno_size)) { > - RPRINTF("error getting socket errno\n"); > + primary_failed(s, ERROR_INTERNAL); > return; > } > > - RPRINTF("socket connect returned %d\n", socket_errno); > + s->stream_fd.fd = t->fd; > + s->stream_fd.id = id; > > - if (socket_errno) { > - /* the connect did not succeed */ > - if (check_connect_errno(socket_errno)) { > - /* > - * we can probably assume that the backup is down. > - * just try again later > - */ > - rc = remus_retry_connect(s); > - if (rc) > - goto fail; > - > - return; > - } else { > - RPRINTF("socket connect returned %d, giving up\n", > - socket_errno); > - rc = ERROR_CONNECTION; > - goto fail; > - } > - > - return; > - } > - > - rc = remus_connection_done(s); > + /* handle the queued requests */ > + rc = remus_handle_queued_io(s); > if (rc) > - goto fail; > - > - return; > - > -fail: > - primary_failed(s, rc); > + primary_failed(s, rc); > } > > - > /* > * we install this event handler on the primary once we have > * connected to the backup. > @@ -1142,19 +893,21 @@ static int primary_forward_request(struct tdremus_state *s, > static void primary_queue_write(td_driver_t *driver, td_request_t treq) > { > struct tdremus_state *s = (struct tdremus_state *)driver->data; > - int rc; > + int rc, ret; > > // RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd); > > - if(s->stream_fd.fd < 0) { > + ret = td_replication_connect_status(&s->t); > + if(ret == -1) { > RPRINTF("connecting to backup...\n"); > - rc = primary_do_connect(s); > + s->t.callback = remus_client_established; > + rc = td_replication_client_start(&s->t); > if (rc) > goto fail; > } > > /* The connection is not established, just queue the request */ > - if (s->stream_fd.cid >= 0) { > + if (ret != 1) { > ring_add_request(&s->queued_io, &treq); > return; > } > @@ -1227,9 +980,7 @@ static int primary_start(td_driver_t *driver) > s->queue_flush = primary_flush; > > s->stream_fd.fd = -1; > - s->stream_fd.cid = -1; > - s->stream_fd.rid = -1; > - s->stream_fd.wid = -1; > + s->stream_fd.id = -1; > > return 0; > } > @@ -1240,100 +991,32 @@ static void remus_server_event(event_id_t id, char mode, void *private); > /* It is called when we find some I/O error */ > static void backup_failed(struct tdremus_state *s, int rc) > { > - close_stream_fd(s); > - close_server_fd(s); > + td_replication_connect_kill(&s->t); > /* We will switch to unprotected mode in backup_queue_write() */ > } > > /* returns the socket that receives write requests */ > -static void remus_server_accept(event_id_t id, char mode, void* private) > +static void remus_server_established(td_replication_connect_t *t, int rc) > { > - struct tdremus_state* s = (struct tdremus_state *) private; > - > - int stream_fd; > - > - /* XXX: add address-based black/white list */ > - if ((stream_fd = accept(s->server_fd.fd, NULL, NULL)) < 0) { > - RPRINTF("error accepting connection: %d\n", errno); > - return; > - } > + struct tdremus_state *s = CONTAINER_OF(t, *s, t); > + event_id_t id; > > - /* > - * TODO: check to see if we are already replicating. > - * if so just close the connection (or do something > - * smarter) > - */ > - RPRINTF("server accepted connection\n"); > + /* rc is always 0 */ > > /* add tapdisk event for replication stream */ > - id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd, 0, > + id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd, 0, > remus_server_event, s); > > if (id < 0) { > RPRINTF("error registering connection event handler: %s\n", > strerror(errno)); > - close(stream_fd); > + td_replication_server_restart(t); > return; > } > > /* store replication file descriptor */ > - s->stream_fd.fd = stream_fd; > - s->stream_fd.rid = id; > -} > - > -/* returns -2 if EADDRNOTAVAIL */ > -static int remus_bind(struct tdremus_state* s) > -{ > - int opt; > - int rc = -1; > - event_id_t id; > - > - if ((s->server_fd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { > - RPRINTF("could not create server socket: %d\n", errno); > - return rc; > - } > - > - opt = 1; > - if (setsockopt(s->server_fd.fd, SOL_SOCKET, > - SO_REUSEADDR, &opt, sizeof(opt)) < 0) > - RPRINTF("Error setting REUSEADDR on %d: %d\n", > - s->server_fd.fd, errno); > - > - if (bind(s->server_fd.fd, (struct sockaddr *)&s->sa, > - sizeof(s->sa)) < 0) { > - RPRINTF("could not bind server socket %d to %s:%d: %d %s\n", > - s->server_fd.fd, inet_ntoa(s->sa.sin_addr), > - ntohs(s->sa.sin_port), errno, strerror(errno)); > - if (errno == EADDRNOTAVAIL) > - rc = -2; > - goto err_sfd; > - } > - > - if (listen(s->server_fd.fd, 10)) { > - RPRINTF("could not listen on socket: %d\n", errno); > - goto err_sfd; > - } > - > - /* > - * The socket s now bound to the address and listening so we > - * may now register the fd with tapdisk > - */ > - id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, > - s->server_fd.fd, 0, > - remus_server_accept, s); > - if (id < 0) { > - RPRINTF("error registering server connection event handler: %s", > - strerror(id)); > - goto err_sfd; > - } > - s->server_fd.cid = id; > - > - return 0; > - > -err_sfd: > - CLOSE_FD(s->server_fd.fd); > - > - return rc; > + s->stream_fd.fd = t->fd; > + s->stream_fd.id = id; > } > > /* wait for latest checkpoint to be applied */ > @@ -1566,90 +1249,6 @@ static int unprotected_start(td_driver_t *driver) > > > /* control */ > - > -static inline int resolve_address(const char* addr, struct in_addr* ia) > -{ > - struct hostent* he; > - uint32_t ip; > - > - if (!(he = gethostbyname(addr))) { > - RPRINTF("error resolving %s: %d\n", addr, h_errno); > - return -1; > - } > - > - if (!he->h_addr_list[0]) { > - RPRINTF("no address found for %s\n", addr); > - return -1; > - } > - > - /* network byte order */ > - ip = *((uint32_t**)he->h_addr_list)[0]; > - ia->s_addr = ip; > - > - return 0; > -} > - > -static int get_args(td_driver_t *driver, const char* name) > -{ > - struct tdremus_state *state = (struct tdremus_state *)driver->data; > - char* host; > - char* port; > -// char* driver_str; > -// char* parent; > -// int type; > -// char* path; > -// unsigned long ulport; > -// int i; > -// struct sockaddr_in server_addr_in; > - > - int gai_status; > - int valid_addr; > - struct addrinfo gai_hints; > - struct addrinfo *servinfo, *servinfo_itr; > - > - memset(&gai_hints, 0, sizeof gai_hints); > - gai_hints.ai_family = AF_UNSPEC; > - gai_hints.ai_socktype = SOCK_STREAM; > - > - port = strchr(name, ':'); > - if (!port) { > - RPRINTF("missing host in %s\n", name); > - return -ENOENT; > - } > - if (!(host = strndup(name, port - name))) { > - RPRINTF("unable to allocate host\n"); > - return -ENOMEM; > - } > - port++; > - > - if ((gai_status = getaddrinfo(host, port, &gai_hints, &servinfo)) != 0) { > - RPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status)); > - return -ENOENT; > - } > - > - /* TODO: do something smarter here */ > - valid_addr = 0; > - for(servinfo_itr = servinfo; servinfo_itr != NULL; servinfo_itr = servinfo_itr->ai_next) { > - void *addr; > - char *ipver; > - > - if (servinfo_itr->ai_family == AF_INET) { > - valid_addr = 1; > - memset(&state->sa, 0, sizeof(state->sa)); > - state->sa = *(struct sockaddr_in *)servinfo_itr->ai_addr; > - break; > - } > - } > - freeaddrinfo(servinfo); > - > - if (!valid_addr) > - return -ENOENT; > - > - RPRINTF("host: %s, port: %d\n", inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port)); > - > - return 0; > -} > - > static int switch_mode(td_driver_t *driver, enum tdremus_mode mode) > { > struct tdremus_state *s = (struct tdremus_state *)driver->data; > @@ -1844,11 +1443,11 @@ static int ctl_register(struct tdremus_state *s) > RPRINTF("registering ctl fifo\n"); > > /* register ctl fd */ > - s->ctl_fd.cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->ctl_fd.fd, 0, ctl_request, s); > + s->ctl_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->ctl_fd.fd, 0, ctl_request, s); > > - if (s->ctl_fd.cid < 0) { > + if (s->ctl_fd.id < 0) { > RPRINTF("error registering ctrl FIFO %s: %d\n", > - s->ctl_path, s->ctl_fd.cid); > + s->ctl_path, s->ctl_fd.id); > return -1; > } > > @@ -1859,7 +1458,7 @@ static void ctl_unregister(struct tdremus_state *s) > { > RPRINTF("unregistering ctl fifo\n"); > > - UNREGISTER_EVENT(s->ctl_fd.cid); > + UNREGISTER_EVENT(s->ctl_fd.id); > } > > /* interface */ > @@ -1867,6 +1466,7 @@ static void ctl_unregister(struct tdremus_state *s) > static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid) > { > struct tdremus_state *s = (struct tdremus_state *)driver->data; > + td_replication_connect_t *t = &s->t; > int rc; > const char *name = image->name; > td_flag_t flags = image->flags; > @@ -1877,7 +1477,6 @@ static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid) > remus_image = image; > > memset(s, 0, sizeof(*s)); > - s->server_fd.fd = -1; > s->stream_fd.fd = -1; > s->ctl_fd.fd = -1; > s->msg_fd.fd = -1; > @@ -1886,8 +1485,12 @@ static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid) > * the driver stack from the stream_fd event handler */ > s->tdremus_driver = driver; > > + t->log_prefix = "remus"; > + t->retry_timeout_s = REMUS_CONNRETRY_TIMEOUT; > + t->max_connections = 10; > + t->callback = remus_server_established; > /* parse name to get info etc */ > - if ((rc = get_args(driver, name))) > + if ((rc = td_replication_connect_init(t, name))) > return rc; > > if ((rc = ctl_open(driver, name))) { > @@ -1901,7 +1504,7 @@ static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid) > return rc; > } > > - if (!(rc = remus_bind(s))) > + if (!(rc = td_replication_server_start(t))) > rc = switch_mode(driver, mode_backup); > else if (rc == -2) > rc = switch_mode(driver, mode_primary); > @@ -1932,8 +1535,7 @@ static int tdremus_close(td_driver_t *driver) > if (s->ramdisk.inprogress) > hashtable_destroy(s->ramdisk.inprogress, 0); > > - close_server_fd(s); > - close_stream_fd(s); > + td_replication_connect_kill(&s->t); > ctl_unregister(s); > ctl_close(s); > > diff --git a/tools/blktap2/drivers/block-replication.c b/tools/blktap2/drivers/block-replication.c > new file mode 100644 > index 0000000..e4b2679 > --- /dev/null > +++ b/tools/blktap2/drivers/block-replication.c > @@ -0,0 +1,468 @@ > +/* > + * Copyright (C) 2014 FUJITSU LIMITED > + * Author: Wen Congyang > + * > + * This program is free software; you can redistribute it and/or modify > + * it under the terms of the GNU Lesser General Public License as published > + * by the Free Software Foundation; version 2.1 only. with the special > + * exception on linking described in file LICENSE. > + * > + * This program is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > + * GNU Lesser General Public License for more details. > + */ > + > +#include "tapdisk-server.h" > +#include "block-replication.h" > + > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > + > +#undef DPRINTF > +#undef EPRINTF > +#define DPRINTF(_f, _a...) syslog (LOG_DEBUG, "%s: " _f, log_prefix, ## _a) > +#define EPRINTF(_f, _a...) syslog (LOG_ERR, "%s: " _f, log_prefix, ## _a) > + > +/* connection status */ > +enum { > + connection_none, > + connection_in_progress, > + connection_established, > + connection_closed, > +}; > + > +/* common functions */ > +/* args should be host:port */ > +static int get_args(td_replication_connect_t *t, const char* name) > +{ > + char* host; > + const char* port; > + int gai_status; > + int valid_addr; > + struct addrinfo gai_hints; > + struct addrinfo *servinfo, *servinfo_itr; > + const char *log_prefix = t->log_prefix; > + > + memset(&gai_hints, 0, sizeof gai_hints); > + gai_hints.ai_family = AF_UNSPEC; > + gai_hints.ai_socktype = SOCK_STREAM; > + > + port = strchr(name, ':'); > + if (!port) { > + EPRINTF("missing host in %s\n", name); > + return -ENOENT; > + } > + if (!(host = strndup(name, port - name))) { > + EPRINTF("unable to allocate host\n"); > + return -ENOMEM; > + } > + port++; > + if ((gai_status = getaddrinfo(host, port, > + &gai_hints, &servinfo)) != 0) { > + EPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status)); > + free(host); > + return -ENOENT; > + } > + free(host); > + > + /* TODO: do something smarter here */ > + valid_addr = 0; > + for (servinfo_itr = servinfo; servinfo_itr != NULL; > + servinfo_itr = servinfo_itr->ai_next) { > + if (servinfo_itr->ai_family == AF_INET) { > + valid_addr = 1; > + memset(&t->sa, 0, sizeof(t->sa)); > + t->sa = *(struct sockaddr_in *)servinfo_itr->ai_addr; > + break; > + } > + } > + freeaddrinfo(servinfo); > + > + if (!valid_addr) > + return -ENOENT; > + > + DPRINTF("host: %s, port: %d\n", inet_ntoa(t->sa.sin_addr), > + ntohs(t->sa.sin_port)); > + > + return 0; > +} > + > +int td_replication_connect_init(td_replication_connect_t *t, const char *name) > +{ > + int rc; > + > + rc = get_args(t, name); > + if (rc) > + return rc; > + > + t->listen_fd = -1; > + t->id = -1; > + t->status = connection_none; > + return 0; > +} > + > +int td_replication_connect_status(td_replication_connect_t *t) > +{ > + const char *log_prefix = t->log_prefix; > + > + switch (t->status) { > + case connection_none: > + case connection_closed: > + return -1; > + case connection_in_progress: > + return 0; > + case connection_established: > + return 1; > + default: > + EPRINTF("td_replication_connect is corruptted\n"); > + return -2; > + } > +} > + > +void td_replication_connect_kill(td_replication_connect_t *t) > +{ > + if (t->status != connection_in_progress && > + t->status != connection_established) > + return; > + > + UNREGISTER_EVENT(t->id); > + CLOSE_FD(t->fd); > + CLOSE_FD(t->listen_fd); > + t->status = connection_closed; > +} > + > +/* server */ > +static void td_replication_server_accept(event_id_t id, char mode, > + void *private); > + > +int td_replication_server_start(td_replication_connect_t *t) > +{ > + int opt; > + int rc = -1; > + event_id_t id; > + int fd; > + const char *log_prefix = t->log_prefix; > + > + if (t->status == connection_in_progress || > + t->status == connection_established) > + return rc; > + > + fd = socket(AF_INET, SOCK_STREAM, 0); > + if (fd < 0) { > + EPRINTF("could not create server socket: %d\n", errno); > + return rc; > + } > + > + opt = 1; > + if (setsockopt(fd, SOL_SOCKET, > + SO_REUSEADDR, &opt, sizeof(opt)) < 0) > + DPRINTF("Error setting REUSEADDR on %d: %d\n", fd, errno); > + > + if (bind(fd, (struct sockaddr *)&t->sa, sizeof(t->sa)) < 0) { > + DPRINTF("could not bind server socket %d to %s:%d: %d %s\n", > + fd, inet_ntoa(t->sa.sin_addr), > + ntohs(t->sa.sin_port), errno, strerror(errno)); > + if (errno == EADDRNOTAVAIL) > + rc = -2; > + goto err; > + } > + > + if (listen(fd, t->max_connections)) { > + EPRINTF("could not listen on socket: %d\n", errno); > + goto err; > + } > + > + /* > + * The socket is now bound to the address and listening so we > + * may now register the fd with tapdisk > + */ > + id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, > + fd, 0, > + td_replication_server_accept, t); > + if (id < 0) { > + EPRINTF("error registering server connection event handler: %s", > + strerror(id)); > + goto err; > + } > + t->listen_fd = fd; > + t->id = id; > + t->status = connection_in_progress; > + > + return 0; > + > +err: > + close(fd); > + return rc; > +} > + > +static void td_replication_server_accept(event_id_t id, char mode, > + void *private) > +{ > + td_replication_connect_t *t = private; > + int fd; > + const char *log_prefix = t->log_prefix; > + > + /* XXX: add address-based black/white list */ > + fd = accept(t->listen_fd, NULL, NULL); > + if (fd < 0) { > + EPRINTF("error accepting connection: %d\n", errno); > + return; > + } > + > + if (t->status == connection_established) { > + EPRINTF("connection is already established\n"); > + close(fd); > + return; > + } > + > + DPRINTF("server accepted connection\n"); > + t->fd = fd; > + t->status = connection_established; > + t->callback(t, 0); > +} > + > +int td_replication_server_restart(td_replication_connect_t *t) > +{ > + switch (t->status) { > + case connection_in_progress: > + return 0; > + case connection_established: > + CLOSE_FD(t->fd); > + t->status = connection_in_progress; > + return 0; > + case connection_none: > + case connection_closed: > + return td_replication_server_start(t); > + default: > + /* not reached */ > + return -1; > + } > +} > + > +/* client */ > +static void td_replication_retry_connect_event(event_id_t id, char mode, > + void *private); > +static void td_replication_connect_event(event_id_t id, char mode, > + void *private); > +int td_replication_client_start(td_replication_connect_t *t) > +{ > + event_id_t id; > + int fd; > + int rc; > + int flags; > + const char *log_prefix = t->log_prefix; > + > + if (t->status == connection_in_progress || > + t->status == connection_established) > + return ERROR_INTERNAL; > + > + DPRINTF("client connecting to %s:%d...\n", > + inet_ntoa(t->sa.sin_addr), ntohs(t->sa.sin_port)); > + > + if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) { > + EPRINTF("could not create client socket: %d\n", errno); > + return ERROR_INTERNAL; > + } > + > + /* make socket nonblocking */ > + if ((flags = fcntl(fd, F_GETFL, 0)) == -1) > + flags = 0; > + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) { > + EPRINTF("error setting fd %d to non block mode\n", fd); > + goto err; > + } > + > + /* > + * once we have created the socket and populated the address, > + * we can now start our non-blocking connect. rather than > + * duplicating code we trigger a timeout on the socket fd, > + * which calls out nonblocking connect code > + */ > + id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0, > + td_replication_retry_connect_event, > + t); > + if(id < 0) { > + EPRINTF("error registering timeout client connection event handler: %s\n", > + strerror(id)); > + goto err; > + } > + > + t->fd = fd; > + t->id = id; > + t->status = connection_in_progress; > + return 0; > + > +err: > + close(fd); > + return ERROR_INTERNAL; > +} > + > +static void td_replication_client_failed(td_replication_connect_t *t, int rc) > +{ > + td_replication_connect_kill(t); > + t->callback(t, rc); > +} > + > +static void td_replication_client_done(td_replication_connect_t *t) > +{ > + UNREGISTER_EVENT(t->id); > + t->status = connection_established; > + t->callback(t, 0); > +} > + > +static int td_replication_retry_connect(td_replication_connect_t *t) > +{ > + event_id_t id; > + const char *log_prefix = t->log_prefix; > + > + UNREGISTER_EVENT(t->id); > + > + DPRINTF("connect to server 1 second later"); > + id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, > + t->fd, t->retry_timeout_s, > + td_replication_retry_connect_event, > + t); > + if (id < 0) { > + EPRINTF("error registering timeout client connection event handler: %s\n", > + strerror(id)); > + return ERROR_INTERNAL; > + } > + > + t->id = id; > + return 0; > +} > + > +static int td_replication_wait_connect_done(td_replication_connect_t *t) > +{ > + event_id_t id; > + const char *log_prefix = t->log_prefix; > + > + UNREGISTER_EVENT(t->id); > + > + id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD, > + t->fd, 0, > + td_replication_connect_event, t); > + if (id < 0) { > + EPRINTF("error registering client connection event handler: %s\n", > + strerror(id)); > + return ERROR_INTERNAL; > + } > + t->id = id; > + > + return 0; > +} > + > +/* return 1 if we need to reconnect to backup server */ > +static int check_connect_errno(int err) > +{ > + /* > + * The fd is non-block, so we will not get ETIMEDOUT > + * after calling connect(). We only can get this errno > + * by getsockopt(). > + */ > + if (err == ECONNREFUSED || err == ENETUNREACH || > + err == EAGAIN || err == ECONNABORTED || > + err == ETIMEDOUT) > + return 1; > + > + return 0; > +} > + > +static void td_replication_retry_connect_event(event_id_t id, char mode, > + void *private) > +{ > + td_replication_connect_t *t = private; > + int rc, ret; > + const char *log_prefix = t->log_prefix; > + > + /* do a non-blocking connect */ > + ret = connect(t->fd, (struct sockaddr *)&t->sa, sizeof(t->sa)); > + if (ret) { > + if (errno == EINPROGRESS) { > + /* > + * the connect returned EINPROGRESS (nonblocking > + * connect) we must wait for the fd to be writeable > + * to determine if the connect worked > + */ > + rc = td_replication_wait_connect_done(t); > + if (rc) > + goto fail; > + return; > + } > + > + if (check_connect_errno(errno)) { > + rc = td_replication_retry_connect(t); > + if (rc) > + goto fail; > + return; > + } > + > + /* not recoverable */ > + EPRINTF("error connection to server %s\n", strerror(errno)); > + rc = ERROR_CONNECTION; > + goto fail; > + } > + > + /* The connection is established unexpectedly */ > + td_replication_client_done(t); > + > + return; > + > +fail: > + td_replication_client_failed(t, rc); > +} > + > +/* callback when nonblocking connect() is finished */ > +static void td_replication_connect_event(event_id_t id, char mode, > + void *private) > +{ > + int socket_errno; > + socklen_t socket_errno_size; > + td_replication_connect_t *t = private; > + int rc; > + const char *log_prefix = t->log_prefix; > + > + /* check to see if the connect succeeded */ > + socket_errno_size = sizeof(socket_errno); > + if (getsockopt(t->fd, SOL_SOCKET, SO_ERROR, > + &socket_errno, &socket_errno_size)) { > + EPRINTF("error getting socket errno\n"); > + return; > + } > + > + DPRINTF("socket connect returned %d\n", socket_errno); > + > + if (socket_errno) { > + /* the connect did not succeed */ > + if (check_connect_errno(socket_errno)) { > + /* > + * we can probably assume that the backup is down. > + * just try again later > + */ > + rc = td_replication_retry_connect(t); > + if (rc) > + goto fail; > + > + return; > + } else { > + EPRINTF("socket connect returned %d, giving up\n", > + socket_errno); > + rc = ERROR_CONNECTION; > + goto fail; > + } > + } > + > + td_replication_client_done(t); > + > + return; > + > +fail: > + td_replication_client_failed(t, rc); > +} > diff --git a/tools/blktap2/drivers/block-replication.h b/tools/blktap2/drivers/block-replication.h > new file mode 100644 > index 0000000..0bd6e71 > --- /dev/null > +++ b/tools/blktap2/drivers/block-replication.h > @@ -0,0 +1,113 @@ > +/* > + * Copyright (C) 2014 FUJITSU LIMITED > + * Author: Wen Congyang > + * > + * This program is free software; you can redistribute it and/or modify > + * it under the terms of the GNU Lesser General Public License as published > + * by the Free Software Foundation; version 2.1 only. with the special > + * exception on linking described in file LICENSE. > + * > + * This program is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > + * GNU Lesser General Public License for more details. > + */ > + > +#ifndef BLOCK_REPLICATION_H > +#define BLOCK_REPLICATION_H > + > +#include "scheduler.h" > +#include > +#include > + > +#define CONTAINER_OF(inner_ptr, outer, member_name) \ > + ({ \ > + typeof(outer) *container_of_; \ > + container_of_ = (void*)((char*)(inner_ptr) - \ > + offsetof(typeof(outer), member_name)); \ > + (void)(&container_of_->member_name == \ > + (typeof(inner_ptr))0) /* type check */; \ > + container_of_; \ > + }) > + > +#define UNREGISTER_EVENT(id) \ > + do { \ > + if (id >= 0) { \ > + tapdisk_server_unregister_event(id); \ > + id = -1; \ > + } \ > + } while (0) > +#define CLOSE_FD(fd) \ > + do { \ > + if (fd >= 0) { \ > + close(fd); \ > + fd = -1; \ > + } \ > + } while (0) > + > +enum { > + ERROR_INTERNAL = -1, > + ERROR_IO = -2, > + ERROR_CONNECTION = -3, > + ERROR_CLOSE = -4, > +}; > + > +typedef struct td_replication_connect td_replication_connect_t; > +typedef void td_replication_callback(td_replication_connect_t *r, int rc); > + > +struct td_replication_connect { > + /* > + * caller must fill these in before calling > + * td_replication_connect_init() > + */ > + const char *log_prefix; > + td_replication_callback *callback; > + int retry_timeout_s; > + int max_connections; > + /* > + * The caller uses this fd to read/write after > + * the connection is established > + */ > + int fd; > + > + /* private */ > + struct sockaddr_in sa; > + int listen_fd; > + event_id_t id; > + > + int status; > +}; > + > +/* return -errno if failure happened, otherwise return 0 */ > +int td_replication_connect_init(td_replication_connect_t *t, const char *name); > +/* > + * Return value: > + * -1: connection is closed or not connected > + * 0: connection is in progress > + * 1: connection is established > + */ > +int td_replication_connect_status(td_replication_connect_t *t); > +void td_replication_connect_kill(td_replication_connect_t *t); > + > +/* > + * Return value: > + * -2: this caller should be client > + * -1: error > + * 0: connection is in progress > + */ > +int td_replication_server_start(td_replication_connect_t *t); > +/* > + * Return value: > + * -2: this caller should be client > + * -1: error > + * 0: connection is in progress > + */ > +int td_replication_server_restart(td_replication_connect_t *t); > +/* > + * Return value: > + * -1: error > + * 0: connection is in progress > + */ > +int td_replication_client_start(td_replication_connect_t *t); > + > +#endif > -- > 1.9.3 > Acked-by: Shriram Rajagopalan --bcaec5014c9fdc65c80503d42006 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable

On Sep 5, 2014 5:31 AM, "Wen Congyang" <wency@cn.fujitsu.com> wrote:
>
> =C2=A0 =C2=A0COLO will reuse them.
>
> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
> Cc: Shriram Rajagopalan <rshr= iram@cs.ubc.ca>
> ---
> =C2=A0tools/blktap2/drivers/Makefile=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 |=C2=A0 =C2=A02 +-
> =C2=A0tools/blktap2/drivers/block-remus.c=C2=A0 =C2=A0 =C2=A0 =C2=A0| = 494 +++---------------------------
> =C2=A0tools/blktap2/drivers/block-replication.c | 468 ++++++++++++++++= ++++++++++++
> =C2=A0tools/blktap2/drivers/block-replication.h | 113 +++++++
> =C2=A04 files changed, 630 insertions(+), 447 deletions(-)
> =C2=A0create mode 100644 tools/blktap2/drivers/block-replication.c
> =C2=A0create mode 100644 tools/blktap2/drivers/block-replication.h
>
> diff --git a/tools/blktap2/drivers/Makefile b/tools/blktap2/drivers/Ma= kefile
> index 37c3485..3d8ed8a 100644
> --- a/tools/blktap2/drivers/Makefile
> +++ b/tools/blktap2/drivers/Makefile
> @@ -23,7 +23,7 @@ endif
>
> =C2=A0VHDLIBS=C2=A0 =C2=A0 :=3D -L$(LIBVHDDIR) -lvhd
>
> -REMUS-OBJS=C2=A0 :=3D block-remus.o
> +REMUS-OBJS=C2=A0 :=3D block-remus.o block-replication.o
> =C2=A0REMUS-OBJS=C2=A0 +=3D hashtable.o
> =C2=A0REMUS-OBJS=C2=A0 +=3D hashtable_itr.o
> =C2=A0REMUS-OBJS=C2=A0 +=3D hashtable_utility.o
> diff --git a/tools/blktap2/drivers/block-remus.c b/tools/blktap2/drive= rs/block-remus.c
> index 5d27d41..8b6f157 100644
> --- a/tools/blktap2/drivers/block-remus.c
> +++ b/tools/blktap2/drivers/block-remus.c
> @@ -40,6 +40,7 @@
> =C2=A0#include "hashtable.h"
> =C2=A0#include "hashtable_itr.h"
> =C2=A0#include "hashtable_utility.h"
> +#include "block-replication.h"
>
> =C2=A0#include <errno.h>
> =C2=A0#include <inttypes.h>
> @@ -49,10 +50,7 @@
> =C2=A0#include <string.h>
> =C2=A0#include <sys/time.h>
> =C2=A0#include <sys/types.h>
> -#include <sys/socket.h>
> -#include <netdb.h>
> =C2=A0#include <netinet/in.h>
> -#include <arpa/inet.h>
> =C2=A0#include <sys/param.h>
> =C2=A0#include <sys/sysctl.h>
> =C2=A0#include <unistd.h>
> @@ -67,22 +65,6 @@
>
> =C2=A0#define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: "= ; _f, ## _a)
>
> -#define UNREGISTER_EVENT(id)=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0\
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0do {=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 \
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (id >=3D= 0) {=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 \
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0tapdisk_server_unregister_event(id);=C2=A0 =C2=A0 \
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0id =3D -1;=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 \
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}=C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0\
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0} while (0)
> -
> -#define CLOSE_FD(fd)=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0\
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0do {=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 \
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (fd >=3D= 0) {=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 \
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0close(fd);=C2=A0 =C2=A0 =C2=A0 \
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0fd =3D -1;=C2=A0 =C2=A0 =C2=A0 =C2=A0 \
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}=C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0\
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0} while (0)
> -
> =C2=A0#define MAX_REMUS_REQUEST=C2=A0 =C2=A0 =C2=A0 =C2=A0TAPDISK_DATA= _REQUESTS
>
> =C2=A0enum tdremus_mode {
> @@ -92,13 +74,6 @@ enum tdremus_mode {
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 mode_backup
> =C2=A0};
>
> -enum {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0ERROR_INTERNAL =3D -1,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0ERROR_IO =3D -2,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0ERROR_CONNECTION =3D -3,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0ERROR_CLOSE =3D -4,
> -};
> -
> =C2=A0struct tdremus_req {
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 td_request_t treq;
> =C2=A0};
> @@ -167,21 +142,9 @@ struct ramdisk_write_cbdata {
>
> =C2=A0typedef void (*queue_rw_t) (td_driver_t *driver, td_request_t tr= eq);
>
> -/*
> - * If cid, rid and wid are -1, fd must be -1. It means that
> - * we are in unpritected mode or we don't start to connect
> - * to backup.
> - * If fd is an valid fd:
> - *=C2=A0 cid is valid, rid and wid must be invalid. It means that
> - *=C2=A0 =C2=A0 =C2=A0 the connection is in progress.
> - *=C2=A0 cid is invalid. rid or wid must be valid. It means that
> - *=C2=A0 =C2=A0 =C2=A0 the connection is established.
> - */
> =C2=A0typedef struct poll_fd {
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 int=C2=A0 =C2=A0 =C2=A0 =C2=A0 fd;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0event_id_t cid;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0event_id_t rid;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0event_id_t wid;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0event_id_t id;
> =C2=A0} poll_fd_t;
>
> =C2=A0struct tdremus_state {
> @@ -195,9 +158,7 @@ struct tdremus_state {
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 char*=C2=A0 =C2=A0 =C2=A0msg_path; /* outp= ut completion message here */
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 poll_fd_t msg_fd;
>
> -=C2=A0 /* replication host */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0struct sockaddr_in sa;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0poll_fd_t server_fd;=C2=A0 =C2=A0 /* serve= r listen port */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0td_replication_connect_t t;
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 poll_fd_t stream_fd;=C2=A0 =C2=A0 =C2=A0/*= replication channel */
>
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 /*
> @@ -777,28 +738,8 @@ static int mwrite(int fd, void* buf, size_t len)<= br> > =C2=A0 =C2=A0 =C2=A0 =C2=A0 select(fd + 1, NULL, &wfds, NULL, &= ;tv);
> =C2=A0}
>
> -
> -static void inline close_stream_fd(struct tdremus_state *s)
> -{
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0UNREGISTER_EVENT(s->stream_fd.cid);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0UNREGISTER_EVENT(s->stream_fd.rid);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0UNREGISTER_EVENT(s->stream_fd.wid);
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/* close the connection */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0CLOSE_FD(s->stream_fd.fd);
> -}
> -
> -static void close_server_fd(struct tdremus_state *s)
> -{
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0UNREGISTER_EVENT(s->server_fd.cid);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0CLOSE_FD(s->server_fd.fd);
> -}
> -
> =C2=A0/* primary functions */
> -static void remus_client_event(event_id_t, char mode, void *private);=
> -static void remus_connect_event(event_id_t id, char mode, void *priva= te);
> -static void remus_retry_connect_event(event_id_t id, char mode, void = *private);
> +static void remus_client_event(event_id_t id, char mode, void *privat= e);
> =C2=A0static int primary_forward_request(struct tdremus_state *s,
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0const td_request_t *= treq);
>
> @@ -808,56 +749,15 @@ static int primary_forward_request(struct tdremu= s_state *s,
> =C2=A0 */
> =C2=A0static void primary_failed(struct tdremus_state *s, int rc)
> =C2=A0{
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0close_stream_fd(s);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0td_replication_connect_kill(&s->t);=
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 if (rc =3D=3D ERROR_INTERNAL)
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 RPRINTF("= switch to unprotected mode due to internal error");
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 if (rc =3D=3D ERROR_CLOSE)
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 RPRINTF("= switch to unprotected mode before closing");
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0UNREGISTER_EVENT(s->stream_fd.id);
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 switch_mode(s->tdremus_driver, mode_unp= rotected);
> =C2=A0}
>
> -static int primary_do_connect(struct tdremus_state *state)
> -{
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0event_id_t id;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0int fd;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0int rc;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0int flags;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("client connecting to %s:%d..= .\n",
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0inet_ntoa(stat= e->sa.sin_addr), ntohs(state->sa.sin_port));
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if ((fd =3D socket(PF_INET, SOCK_STREAM, 0= )) < 0) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("= could not create client socket: %d\n", errno);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return ERROR_I= NTERNAL;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0state->stream_fd.fd =3D fd;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/* make socket nonblocking */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if ((flags =3D fcntl(fd, F_GETFL, 0)) =3D= =3D -1)
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0flags =3D 0; > -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (fcntl(fd, F_SETFL, flags | O_NONBLOCK)= =3D=3D -1) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("= error setting fd %d to non block mode\n", fd);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return ERROR_I= NTERNAL;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/*
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * once we have created the socket and pop= ulated the address,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * we can now start our non-blocking conne= ct. rather than
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * duplicating code we trigger a timeout o= n the socket fd,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * which calls out nonblocking connect cod= e
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if((id =3D tapdisk_server_register_event(S= CHEDULER_POLL_TIMEOUT, fd, 0,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 remus_retry_connect_event,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 state)) < 0) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("= error registering timeout client connection event handler: %s\n",
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0strerror(id));
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return ERROR_I= NTERNAL;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0state->stream_fd.cid =3D id;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> -}
> -
> =C2=A0static int remus_handle_queued_io(struct tdremus_state *s)
> =C2=A0{
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 struct req_ring *queued_io =3D &s->= queued_io;
> @@ -882,184 +782,35 @@ static int remus_handle_queued_io(struct tdremu= s_state *s)
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 return 0;
> =C2=A0}
>
> -static int remus_connection_done(struct tdremus_state *s)
> +static void remus_client_established(td_replication_connect_t *t, int= rc)
> =C2=A0{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0struct tdremus_state *s =3D CONTAINER_OF(t= , *s, t);
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 event_id_t id;
>
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/* the connect succeeded */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/* unregister this function and register a= new event handler */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0tapdisk_server_unregister_event(s->stre= am_fd.cid);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0s->stream_fd.cid =3D -1;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (rc) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0primary_failed= (s, rc);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
>
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0id =3D tapdisk_server_register_event(SCHED= ULER_POLL_READ_FD, s->stream_fd.fd,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/* the connect succeeded */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0id =3D tapdisk_server_register_event(SCHED= ULER_POLL_READ_FD, t->fd,
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A00, remus_client_event, s);
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 if(id < 0) {
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 RPRINTF("= error registering client event handler: %s\n",
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 strerror(id));
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return ERROR_I= NTERNAL;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0s->stream_fd.rid =3D id;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/* handle the queued requests */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0return remus_handle_queued_io(s);
> -}
> -
> -static int remus_retry_connect(struct tdremus_state *s)
> -{
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0event_id_t id;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0tapdisk_server_unregister_event(s->stre= am_fd.cid);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0s->stream_fd.cid =3D -1;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("connect to backup 1 second l= ater");
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0id =3D tapdisk_server_register_event(SCHED= ULER_POLL_TIMEOUT,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 s->stream_fd.fd,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 REMUS_CONNRETRY_TIMEOUT,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 remus_retry_connect_event, s);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (id < 0) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("= error registering timeout client connection event handler: %s\n",
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0strerror(id));
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return ERROR_I= NTERNAL;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0s->stream_fd.cid =3D id;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> -}
> -
> -static int remus_wait_connect_done(struct tdremus_state *s)
> -{
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0event_id_t id;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0tapdisk_server_unregister_event(s->stre= am_fd.cid);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0s->stream_fd.cid =3D -1;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0id =3D tapdisk_server_register_event(SCHED= ULER_POLL_WRITE_FD,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 s->stream_fd.fd, 0,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 remus_connect_event, s);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (id < 0) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("= error registering client connection event handler: %s\n",
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0strerror(id));
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return ERROR_I= NTERNAL;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0s->stream_fd.cid =3D id;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> -}
> -
> -/* return 1 if we need to reconnect to backup */
> -static int check_connect_errno(int err)
> -{
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/*
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * The fd is non-block, so we will not get= ETIMEDOUT
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * after calling connect(). We only can ge= t this errno
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * by getsockopt().
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (err =3D=3D ECONNREFUSED || err =3D=3D = ENETUNREACH ||
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0err =3D=3D EAGAIN || err =3D= =3D ECONNABORTED ||
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0err =3D=3D ETIMEDOUT)
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return 1;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> -}
> -
> -static void remus_retry_connect_event(event_id_t id, char mode, void = *private)
> -{
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0struct tdremus_state *s =3D (struct tdremu= s_state *)private;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0int rc, ret;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/* do a non-blocking connect */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0ret =3D connect(s->stream_fd.fd,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0(struct sockaddr *)&s->sa,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0sizeof(s->sa));
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (ret) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (errno =3D= =3D EINPROGRESS) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0/*
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 * the connect returned EINPROGRESS (nonblocking
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 * connect) we must wait for the fd to be writeable
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 * to determine if the connect worked
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0rc =3D remus_wait_connect_done(s);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0if (rc)
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0goto fail;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0return;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (check_conn= ect_errno(errno)) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0rc =3D remus_retry_connect(s);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0if (rc)
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0goto fail;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0return;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0/* not recover= able */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("= error connection to server %s\n", strerror(errno));
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0rc =3D ERROR_C= ONNECTION;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0goto fail;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/* The connection is established unexpecte= dly */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0rc =3D remus_connection_done(s);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (rc)
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0goto fail;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0return;
> -
> -fail:
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0primary_failed(s, rc);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0return;
> -}
> -
> -/* callback when nonblocking connect() is finished */
> -static void remus_connect_event(event_id_t id, char mode, void *priva= te)
> -{
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0int socket_errno;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0socklen_t socket_errno_size;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0struct tdremus_state *s =3D (struct tdremu= s_state *)private;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0int rc;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/* check to see if the connect succeeded *= /
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0socket_errno_size =3D sizeof(socket_errno)= ;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (getsockopt(s->stream_fd.fd, SOL_SOC= KET, SO_ERROR,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 &socket_errno, &socket_errno_size)) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("= error getting socket errno\n");
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0primary_failed= (s, ERROR_INTERNAL);
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 return;
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
>
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("socket connect returned %d\n= ", socket_errno);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0s->stream_fd.fd =3D t->fd;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0s->stre= am_fd.id =3D id;
>
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (socket_errno) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0/* the connect= did not succeed */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (check_conn= ect_errno(socket_errno)) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0/*
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 * we can probably assume that the backup is down.
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 * just try again later
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0rc =3D remus_retry_connect(s);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0if (rc)
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0goto fail;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0return;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0} else {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0RPRINTF("socket connect returned %d, giving up\n",<= br> > -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0socket_errno);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0rc =3D ERROR_CONNECTION;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0goto fail;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0rc =3D remus_connection_done(s);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/* handle the queued requests */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0rc =3D remus_handle_queued_io(s);
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 if (rc)
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0goto fail;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0return;
> -
> -fail:
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0primary_failed(s, rc);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0primary_failed= (s, rc);
> =C2=A0}
>
> -
> =C2=A0/*
> =C2=A0 * we install this event handler on the primary once we have
> =C2=A0 * connected to the backup.
> @@ -1142,19 +893,21 @@ static int primary_forward_request(struct tdrem= us_state *s,
> =C2=A0static void primary_queue_write(td_driver_t *driver, td_request_= t treq)
> =C2=A0{
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 struct tdremus_state *s =3D (struct tdremu= s_state *)driver->data;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0int rc;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0int rc, ret;
>
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 // RPRINTF("write: stream_fd.fd: %d\n= ", s->stream_fd.fd);
>
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if(s->stream_fd.fd < 0) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0ret =3D td_replication_connect_status(&= ;s->t);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if(ret =3D=3D -1) {
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 RPRINTF("= connecting to backup...\n");
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0rc =3D primary= _do_connect(s);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0s->t.callba= ck =3D remus_client_established;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0rc =3D td_repl= ication_client_start(&s->t);
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 if (rc)
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 goto fail;
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
>
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 /* The connection is not established, just= queue the request */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (s->stream_fd.cid >=3D 0) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (ret !=3D 1) {
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 ring_add_reque= st(&s->queued_io, &treq);
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 return;
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
> @@ -1227,9 +980,7 @@ static int primary_start(td_driver_t *driver)
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 s->queue_flush =3D primary_flush;
>
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 s->stream_fd.fd =3D -1;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0s->stream_fd.cid =3D -1;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0s->stream_fd.rid =3D -1;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0s->stream_fd.wid =3D -1;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0s->stre= am_fd.id =3D -1;
>
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 return 0;
> =C2=A0}
> @@ -1240,100 +991,32 @@ static void remus_server_event(event_id_t id, = char mode, void *private);
> =C2=A0/* It is called when we find some I/O error */
> =C2=A0static void backup_failed(struct tdremus_state *s, int rc)
> =C2=A0{
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0close_stream_fd(s);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0close_server_fd(s);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0td_replication_connect_kill(&s->t);=
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 /* We will switch to unprotected mode in b= ackup_queue_write() */
> =C2=A0}
>
> =C2=A0/* returns the socket that receives write requests */
> -static void remus_server_accept(event_id_t id, char mode, void* priva= te)
> +static void remus_server_established(td_replication_connect_t *t, int= rc)
> =C2=A0{
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0struct tdremus_state* s =3D (struct tdremu= s_state *) private;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0int stream_fd;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/* XXX: add address-based black/white list= */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if ((stream_fd =3D accept(s->server_fd.= fd, NULL, NULL)) < 0) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("= error accepting connection: %d\n", errno);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0struct tdremus_state *s =3D CONTAINER_OF(t= , *s, t);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0event_id_t id;
>
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/*
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * TODO: check to see if we are already re= plicating.
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * if so just close the connection (or do = something
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * smarter)
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("server accepted connection\n= ");
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/* rc is always 0 */
>
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 /* add tapdisk event for replication strea= m */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0id =3D tapdisk_server_register_event(SCHED= ULER_POLL_READ_FD, stream_fd, 0,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0id =3D tapdisk_server_register_event(SCHED= ULER_POLL_READ_FD, t->fd, 0,
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0remus_server_event, s);
>
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 if (id < 0) {
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 RPRINTF("= error registering connection event handler: %s\n",
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 strerror(errno));
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0close(stream_f= d);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0td_replication= _server_restart(t);
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 return;
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
>
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 /* store replication file descriptor */ > -=C2=A0 =C2=A0 =C2=A0 =C2=A0s->stream_fd.fd =3D stream_fd;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0s->stream_fd.rid =3D id;
> -}
> -
> -/* returns -2 if EADDRNOTAVAIL */
> -static int remus_bind(struct tdremus_state* s)
> -{
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0int opt;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0int rc =3D -1;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0event_id_t id;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if ((s->server_fd.fd =3D socket(AF_INET= , SOCK_STREAM, 0)) < 0) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("= could not create server socket: %d\n", errno);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return rc;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0opt =3D 1;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (setsockopt(s->server_fd.fd, SOL_SOC= KET,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 SO_REUSEADDR, &opt, sizeof(opt)) < 0)
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("= Error setting REUSEADDR on %d: %d\n",
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0s->server_fd.fd, errno);
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (bind(s->server_fd.fd, (struct socka= ddr *)&s->sa,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 sizeof(s->= sa)) < 0) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("= could not bind server socket %d to %s:%d: %d %s\n",
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0s->server_fd.fd, inet_ntoa(s->sa.sin_addr),
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0ntohs(s->sa.sin_port), errno, strerror(errno));
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (errno =3D= =3D EADDRNOTAVAIL)
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0rc =3D -2;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0goto err_sfd;<= br> > -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (listen(s->server_fd.fd, 10)) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("= could not listen on socket: %d\n", errno);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0goto err_sfd;<= br> > -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/*
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * The socket s now bound to the address a= nd listening so we
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * may now register the fd with tapdisk > -=C2=A0 =C2=A0 =C2=A0 =C2=A0 */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0id =3D=C2=A0 tapdisk_server_register_event= (SCHEDULER_POLL_READ_FD,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0s->server_fd.fd, 0,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0remus_server_accept, s);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (id < 0) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("= error registering server connection event handler: %s",
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0strerror(id));
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0goto err_sfd;<= br> > -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0s->server_fd.cid =3D id;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> -
> -err_sfd:
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0CLOSE_FD(s->server_fd.fd);
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0return rc;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0s->stream_fd.fd =3D t->fd;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0s->stre= am_fd.id =3D id;
> =C2=A0}
>
> =C2=A0/* wait for latest checkpoint to be applied */
> @@ -1566,90 +1249,6 @@ static int unprotected_start(td_driver_t *drive= r)
>
>
> =C2=A0/* control */
> -
> -static inline int resolve_address(const char* addr, struct in_addr* i= a)
> -{
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0struct hostent* he;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0uint32_t ip;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!(he =3D gethostbyname(addr))) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("= error resolving %s: %d\n", addr, h_errno);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -1;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!he->h_addr_list[0]) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("= no address found for %s\n", addr);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -1;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/* network byte order */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0ip =3D *((uint32_t**)he->h_addr_list)[0= ];
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0ia->s_addr =3D ip;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> -}
> -
> -static int get_args(td_driver_t *driver, const char* name)
> -{
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0struct tdremus_state *state =3D (struct td= remus_state *)driver->data;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0char* host;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0char* port;
> -//=C2=A0 char* driver_str;
> -//=C2=A0 char* parent;
> -//=C2=A0 int type;
> -//=C2=A0 char* path;
> -//=C2=A0 unsigned long ulport;
> -//=C2=A0 int i;
> -//=C2=A0 struct sockaddr_in server_addr_in;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0int gai_status;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0int valid_addr;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0struct addrinfo gai_hints;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0struct addrinfo *servinfo, *servinfo_itr;<= br> > -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0memset(&gai_hints, 0, sizeof gai_hints= );
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0gai_hints.ai_family =3D AF_UNSPEC;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0gai_hints.ai_socktype =3D SOCK_STREAM;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0port =3D strchr(name, ':');
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!port) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("= missing host in %s\n", name);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -ENOENT= ;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!(host =3D strndup(name, port - name))= ) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("= unable to allocate host\n");
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -ENOMEM= ;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0port++;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if ((gai_status =3D getaddrinfo(host, port= , &gai_hints, &servinfo)) !=3D 0) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("= getaddrinfo error: %s\n", gai_strerror(gai_status));
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -ENOENT= ;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/* TODO: do something smarter here */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0valid_addr =3D 0;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0for(servinfo_itr =3D servinfo; servinfo_it= r !=3D NULL; servinfo_itr =3D servinfo_itr->ai_next) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0void *addr; > -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0char *ipver; > -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (servinfo_i= tr->ai_family =3D=3D AF_INET) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0valid_addr =3D 1;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0memset(&state->sa, 0, sizeof(state->sa));
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0state->sa =3D *(struct sockaddr_in *)servinfo_itr->ai_a= ddr;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0break;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0freeaddrinfo(servinfo);
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!valid_addr)
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -ENOENT= ;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("host: %s, port: %d\n", = inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> -}
> -
> =C2=A0static int switch_mode(td_driver_t *driver, enum tdremus_mode mo= de)
> =C2=A0{
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 struct tdremus_state *s =3D (struct tdremu= s_state *)driver->data;
> @@ -1844,11 +1443,11 @@ static int ctl_register(struct tdremus_state *= s)
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 RPRINTF("registering ctl fifo\n"= );
>
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 /* register ctl fd */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0s->ctl_fd.cid =3D tapdisk_server_regist= er_event(SCHEDULER_POLL_READ_FD, s->ctl_fd.fd, 0, ctl_request, s);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0s->ctl_fd.= id =3D tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->ctl_= fd.fd, 0, ctl_request, s);
>
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (s->ctl_fd.cid < 0) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (s->ctl= _fd.id < 0) {
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 RPRINTF("= error registering ctrl FIFO %s: %d\n",
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0s->ctl_path, s->ctl_fd.cid);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0s->ctl_path, s->ctl_fd.id<= /a>);
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 return -1;
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
>
> @@ -1859,7 +1458,7 @@ static void ctl_unregister(struct tdremus_state = *s)
> =C2=A0{
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 RPRINTF("unregistering ctl fifo\n&quo= t;);
>
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0UNREGISTER_EVENT(s->ctl_fd.cid);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0UNREGISTER_EVENT(s->
ctl_fd.id);
> =C2=A0}
>
> =C2=A0/* interface */
> @@ -1867,6 +1466,7 @@ static void ctl_unregister(struct tdremus_state = *s)
> =C2=A0static int tdremus_open(td_driver_t *driver, td_image_t *image, = td_uuid_t uuid)
> =C2=A0{
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 struct tdremus_state *s =3D (struct tdremu= s_state *)driver->data;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0td_replication_connect_t *t =3D &s->= ;t;
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 int rc;
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 const char *name =3D image->name;
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 td_flag_t flags =3D image->flags;
> @@ -1877,7 +1477,6 @@ static int tdremus_open(td_driver_t *driver, td_= image_t *image, td_uuid_t uuid)
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 remus_image =3D image;
>
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 memset(s, 0, sizeof(*s));
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0s->server_fd.fd =3D -1;
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 s->stream_fd.fd =3D -1;
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 s->ctl_fd.fd =3D -1;
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 s->msg_fd.fd =3D -1;
> @@ -1886,8 +1485,12 @@ static int tdremus_open(td_driver_t *driver, td= _image_t *image, td_uuid_t uuid)
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0* the driver stack from the stream_f= d event handler */
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 s->tdremus_driver =3D driver;
>
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0t->log_prefix =3D "remus"; > +=C2=A0 =C2=A0 =C2=A0 =C2=A0t->retry_timeout_s =3D REMUS_CONNRETRY_= TIMEOUT;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0t->max_connections =3D 10;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0t->callback =3D remus_server_establishe= d;
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 /* parse name to get info etc */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if ((rc =3D get_args(driver, name)))
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if ((rc =3D td_replication_connect_init(t,= name)))
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 return rc;
>
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 if ((rc =3D ctl_open(driver, name))) {
> @@ -1901,7 +1504,7 @@ static int tdremus_open(td_driver_t *driver, td_= image_t *image, td_uuid_t uuid)
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 return rc;
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
>
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!(rc =3D remus_bind(s)))
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!(rc =3D td_replication_server_start(t= )))
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 rc =3D switch_= mode(driver, mode_backup);
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 else if (rc =3D=3D -2)
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 rc =3D switch_= mode(driver, mode_primary);
> @@ -1932,8 +1535,7 @@ static int tdremus_close(td_driver_t *driver) > =C2=A0 =C2=A0 =C2=A0 =C2=A0 if (s->ramdisk.inprogress)
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 hashtable_dest= roy(s->ramdisk.inprogress, 0);
>
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0close_server_fd(s);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0close_stream_fd(s);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0td_replication_connect_kill(&s->t);=
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 ctl_unregister(s);
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 ctl_close(s);
>
> diff --git a/tools/blktap2/drivers/block-replication.c b/tools/blktap2= /drivers/block-replication.c
> new file mode 100644
> index 0000000..e4b2679
> --- /dev/null
> +++ b/tools/blktap2/drivers/block-replication.c
> @@ -0,0 +1,468 @@
> +/*
> + * Copyright (C) 2014 FUJITSU LIMITED
> + * Author: Wen Congyang <w= ency@cn.fujitsu.com>
> + *
> + * This program is free software; you can redistribute it and/or modi= fy
> + * it under the terms of the GNU Lesser General Public License as pub= lished
> + * by the Free Software Foundation; version 2.1 only. with the specia= l
> + * exception on linking described in file LICENSE.
> + *
> + * This program is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.=C2=A0 See the=
> + * GNU Lesser General Public License for more details.
> + */
> +
> +#include "tapdisk-server.h"
> +#include "block-replication.h"
> +
> +#include <string.h>
> +#include <errno.h>
> +#include <sys/types.h>
> +#include <unistd.h>
> +#include <fcntl.h>
> +#include <syslog.h>
> +#include <stdlib.h>
> +#include <arpa/inet.h>
> +
> +#undef DPRINTF
> +#undef EPRINTF
> +#define DPRINTF(_f, _a...) syslog (LOG_DEBUG, "%s: " _f, lo= g_prefix, ## _a)
> +#define EPRINTF(_f, _a...) syslog (LOG_ERR, "%s: " _f, log_= prefix, ## _a)
> +
> +/* connection status */
> +enum {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0connection_none,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0connection_in_progress,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0connection_established,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0connection_closed,
> +};
> +
> +/* common functions */
> +/* args should be host:port */
> +static int get_args(td_replication_connect_t *t, const char* name) > +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0char* host;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0const char* port;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0int gai_status;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0int valid_addr;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0struct addrinfo gai_hints;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0struct addrinfo *servinfo, *servinfo_itr;<= br> > +=C2=A0 =C2=A0 =C2=A0 =C2=A0const char *log_prefix =3D t->log_prefi= x;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0memset(&gai_hints, 0, sizeof gai_hints= );
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0gai_hints.ai_family =3D AF_UNSPEC;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0gai_hints.ai_socktype =3D SOCK_STREAM;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0port =3D strchr(name, ':');
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!port) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0EPRINTF("= missing host in %s\n", name);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -ENOENT= ;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!(host =3D strndup(name, port - name))= ) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0EPRINTF("= unable to allocate host\n");
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -ENOMEM= ;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0port++;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if ((gai_status =3D getaddrinfo(host, port= ,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0&gai_hin= ts, &servinfo)) !=3D 0) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0EPRINTF("= getaddrinfo error: %s\n", gai_strerror(gai_status));
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0free(host); > +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -ENOENT= ;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0free(host);
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/* TODO: do something smarter here */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0valid_addr =3D 0;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0for (servinfo_itr =3D servinfo; servinfo_i= tr !=3D NULL;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 servinfo_itr =3D servinfo_i= tr->ai_next) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (servinfo_i= tr->ai_family =3D=3D AF_INET) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0valid_addr =3D 1;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0memset(&t->sa, 0, sizeof(t->sa));
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0t->sa =3D *(struct sockaddr_in *)servinfo_itr->ai_addr;=
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0break;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0freeaddrinfo(servinfo);
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!valid_addr)
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -ENOENT= ;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0DPRINTF("host: %s, port: %d\n", = inet_ntoa(t->sa.sin_addr),
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0ntohs(t->sa= .sin_port));
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> +}
> +
> +int td_replication_connect_init(td_replication_connect_t *t, const ch= ar *name)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0int rc;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0rc =3D get_args(t, name);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (rc)
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return rc;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0t->listen_fd =3D -1;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0t->id =3D -1;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0t->status =3D connection_none;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> +}
> +
> +int td_replication_connect_status(td_replication_connect_t *t)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0const char *log_prefix =3D t->log_prefi= x;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0switch (t->status) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0case connection_none:
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0case connection_closed:
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -1;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0case connection_in_progress:
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0case connection_established:
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return 1;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0default:
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0EPRINTF("= td_replication_connect is corruptted\n");
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -2;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +}
> +
> +void td_replication_connect_kill(td_replication_connect_t *t)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (t->status !=3D connection_in_progre= ss &&
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0t->status !=3D connection= _established)
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0UNREGISTER_EVENT(t->id);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0CLOSE_FD(t->fd);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0CLOSE_FD(t->listen_fd);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0t->status =3D connection_closed;
> +}
> +
> +/* server */
> +static void td_replication_server_accept(event_id_t id, char mode, > +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 void= *private);
> +
> +int td_replication_server_start(td_replication_connect_t *t)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0int opt;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0int rc =3D -1;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0event_id_t id;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0int fd;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0const char *log_prefix =3D t->log_prefi= x;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (t->status =3D=3D connection_in_prog= ress ||
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0t->status =3D=3D connecti= on_established)
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return rc;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0fd =3D socket(AF_INET, SOCK_STREAM, 0); > +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (fd < 0) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0EPRINTF("= could not create server socket: %d\n", errno);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return rc;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0opt =3D 1;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (setsockopt(fd, SOL_SOCKET,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 SO_REUSEADDR, &opt, sizeof(opt)) < 0)
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0DPRINTF("= Error setting REUSEADDR on %d: %d\n", fd, errno);
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (bind(fd, (struct sockaddr *)&t->= ;sa, sizeof(t->sa)) < 0) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0DPRINTF("= could not bind server socket %d to %s:%d: %d %s\n",
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0fd, inet_ntoa(t->sa.sin_addr),
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0ntohs(t->sa.sin_port), errno, strerror(errno));
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (errno =3D= =3D EADDRNOTAVAIL)
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0rc =3D -2;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0goto err;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (listen(fd, t->max_connections)) { > +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0EPRINTF("= could not listen on socket: %d\n", errno);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0goto err;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/*
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * The socket is now bound to the address = and listening so we
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * may now register the fd with tapdisk > +=C2=A0 =C2=A0 =C2=A0 =C2=A0 */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0id =3D=C2=A0 tapdisk_server_register_event= (SCHEDULER_POLL_READ_FD,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0fd, 0,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0td_replication_server_accept, t);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (id < 0) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0EPRINTF("= error registering server connection event handler: %s",
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0strerror(id));
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0goto err;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0t->listen_fd =3D fd;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0t->id =3D id;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0t->status =3D connection_in_progress; > +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> +
> +err:
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0close(fd);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return rc;
> +}
> +
> +static void td_replication_server_accept(event_id_t id, char mode, > +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 void= *private)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0td_replication_connect_t *t =3D private; > +=C2=A0 =C2=A0 =C2=A0 =C2=A0int fd;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0const char *log_prefix =3D t->log_prefi= x;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/* XXX: add address-based black/white list= */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0fd =3D accept(t->listen_fd, NULL, NULL)= ;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (fd < 0) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0EPRINTF("= error accepting connection: %d\n", errno);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (t->status =3D=3D connection_establi= shed) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0EPRINTF("= connection is already established\n");
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0close(fd);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0DPRINTF("server accepted connection\n= ");
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0t->fd =3D fd;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0t->status =3D connection_established; > +=C2=A0 =C2=A0 =C2=A0 =C2=A0t->callback(t, 0);
> +}
> +
> +int td_replication_server_restart(td_replication_connect_t *t)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0switch (t->status) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0case connection_in_progress:
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0case connection_established:
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0CLOSE_FD(t->= ;fd);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0t->status = =3D connection_in_progress;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0case connection_none:
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0case connection_closed:
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return td_repl= ication_server_start(t);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0default:
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0/* not reached= */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -1;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +}
> +
> +/* client */
> +static void td_replication_retry_connect_event(event_id_t id, char mo= de,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 void *private);
> +static void td_replication_connect_event(event_id_t id, char mode, > +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 void= *private);
> +int td_replication_client_start(td_replication_connect_t *t)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0event_id_t id;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0int fd;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0int rc;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0int flags;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0const char *log_prefix =3D t->log_prefi= x;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (t->status =3D=3D connection_in_prog= ress ||
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0t->status =3D=3D connecti= on_established)
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return ERROR_I= NTERNAL;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0DPRINTF("client connecting to %s:%d..= .\n",
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0inet_ntoa(t-&g= t;sa.sin_addr), ntohs(t->sa.sin_port));
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if ((fd =3D socket(PF_INET, SOCK_STREAM, 0= )) < 0) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0EPRINTF("= could not create client socket: %d\n", errno);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return ERROR_I= NTERNAL;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/* make socket nonblocking */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if ((flags =3D fcntl(fd, F_GETFL, 0)) =3D= =3D -1)
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0flags =3D 0; > +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (fcntl(fd, F_SETFL, flags | O_NONBLOCK)= =3D=3D -1) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0EPRINTF("= error setting fd %d to non block mode\n", fd);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0goto err;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/*
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * once we have created the socket and pop= ulated the address,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * we can now start our non-blocking conne= ct. rather than
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * duplicating code we trigger a timeout o= n the socket fd,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * which calls out nonblocking connect cod= e
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0id =3D tapdisk_server_register_event(SCHED= ULER_POLL_TIMEOUT, fd, 0,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 td_replication_retry_connect_event,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 t);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if(id < 0) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0EPRINTF("= error registering timeout client connection event handler: %s\n",
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0strerror(id));
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0goto err;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0t->fd =3D fd;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0t->id =3D id;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0t->status =3D connection_in_progress; > +=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> +
> +err:
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0close(fd);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return ERROR_INTERNAL;
> +}
> +
> +static void td_replication_client_failed(td_replication_connect_t *t,= int rc)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0td_replication_connect_kill(t);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0t->callback(t, rc);
> +}
> +
> +static void td_replication_client_done(td_replication_connect_t *t) > +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0UNREGISTER_EVENT(t->id);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0t->status =3D connection_established; > +=C2=A0 =C2=A0 =C2=A0 =C2=A0t->callback(t, 0);
> +}
> +
> +static int td_replication_retry_connect(td_replication_connect_t *t)<= br> > +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0event_id_t id;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0const char *log_prefix =3D t->log_prefi= x;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0UNREGISTER_EVENT(t->id);
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0DPRINTF("connect to server 1 second l= ater");
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0id =3D tapdisk_server_register_event(SCHED= ULER_POLL_TIMEOUT,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 t->fd, t->retry_timeout_s,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 td_replication_retry_connect_event,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 t);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (id < 0) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0EPRINTF("= error registering timeout client connection event handler: %s\n",
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0strerror(id));
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return ERROR_I= NTERNAL;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0t->id =3D id;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> +}
> +
> +static int td_replication_wait_connect_done(td_replication_connect_t = *t)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0event_id_t id;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0const char *log_prefix =3D t->log_prefi= x;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0UNREGISTER_EVENT(t->id);
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0id =3D tapdisk_server_register_event(SCHED= ULER_POLL_WRITE_FD,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 t->fd, 0,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 td_replication_connect_event, t);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (id < 0) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0EPRINTF("= error registering client connection event handler: %s\n",
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0strerror(id));
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return ERROR_I= NTERNAL;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0t->id =3D id;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> +}
> +
> +/* return 1 if we need to reconnect to backup server */
> +static int check_connect_errno(int err)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/*
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * The fd is non-block, so we will not get= ETIMEDOUT
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * after calling connect(). We only can ge= t this errno
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * by getsockopt().
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (err =3D=3D ECONNREFUSED || err =3D=3D = ENETUNREACH ||
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0err =3D=3D EAGAIN || err =3D= =3D ECONNABORTED ||
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0err =3D=3D ETIMEDOUT)
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return 1;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> +}
> +
> +static void td_replication_retry_connect_event(event_id_t id, char mo= de,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 void *private)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0td_replication_connect_t *t =3D private; > +=C2=A0 =C2=A0 =C2=A0 =C2=A0int rc, ret;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0const char *log_prefix =3D t->log_prefi= x;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/* do a non-blocking connect */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0ret =3D connect(t->fd, (struct sockaddr= *)&t->sa, sizeof(t->sa));
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (ret) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (errno =3D= =3D EINPROGRESS) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0/*
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 * the connect returned EINPROGRESS (nonblocking
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 * connect) we must wait for the fd to be writeable
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 * to determine if the connect worked
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0rc =3D td_replication_wait_connect_done(t);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0if (rc)
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0goto fail;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0return;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (check_conn= ect_errno(errno)) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0rc =3D td_replication_retry_connect(t);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0if (rc)
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0goto fail;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0return;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0/* not recover= able */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0EPRINTF("= error connection to server %s\n", strerror(errno));
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0rc =3D ERROR_C= ONNECTION;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0goto fail;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/* The connection is established unexpecte= dly */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0td_replication_client_done(t);
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return;
> +
> +fail:
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0td_replication_client_failed(t, rc);
> +}
> +
> +/* callback when nonblocking connect() is finished */
> +static void td_replication_connect_event(event_id_t id, char mode, > +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 void= *private)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0int socket_errno;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0socklen_t socket_errno_size;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0td_replication_connect_t *t =3D private; > +=C2=A0 =C2=A0 =C2=A0 =C2=A0int rc;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0const char *log_prefix =3D t->log_prefi= x;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/* check to see if the connect succeeded *= /
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0socket_errno_size =3D sizeof(socket_errno)= ;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (getsockopt(t->fd, SOL_SOCKET, SO_ER= ROR,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 &socket_errno, &socket_errno_size)) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0EPRINTF("= error getting socket errno\n");
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0DPRINTF("socket connect returned %d\n= ", socket_errno);
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (socket_errno) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0/* the connect= did not succeed */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (check_conn= ect_errno(socket_errno)) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0/*
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 * we can probably assume that the backup is down.
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 * just try again later
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0rc =3D td_replication_retry_connect(t);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0if (rc)
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0goto fail;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0return;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0} else {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0EPRINTF("socket connect returned %d, giving up\n",<= br> > +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0socket_errno);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0rc =3D ERROR_CONNECTION;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0goto fail;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0td_replication_client_done(t);
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return;
> +
> +fail:
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0td_replication_client_failed(t, rc);
> +}
> diff --git a/tools/blktap2/drivers/block-replication.h b/tools/blktap2= /drivers/block-replication.h
> new file mode 100644
> index 0000000..0bd6e71
> --- /dev/null
> +++ b/tools/blktap2/drivers/block-replication.h
> @@ -0,0 +1,113 @@
> +/*
> + * Copyright (C) 2014 FUJITSU LIMITED
> + * Author: Wen Congyang <w= ency@cn.fujitsu.com>
> + *
> + * This program is free software; you can redistribute it and/or modi= fy
> + * it under the terms of the GNU Lesser General Public License as pub= lished
> + * by the Free Software Foundation; version 2.1 only. with the specia= l
> + * exception on linking described in file LICENSE.
> + *
> + * This program is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.=C2=A0 See the=
> + * GNU Lesser General Public License for more details.
> + */
> +
> +#ifndef BLOCK_REPLICATION_H
> +#define BLOCK_REPLICATION_H
> +
> +#include "scheduler.h"
> +#include <sys/socket.h>
> +#include <netdb.h>
> +
> +#define CONTAINER_OF(inner_ptr, outer, member_name)=C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 \
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0({=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 \
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0typeof(outer) = *container_of_;=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0\
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0container_of_ = =3D (void*)((char*)(inner_ptr) -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 \=
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0offsetof(typeof(outer), member_na= me));=C2=A0 \
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0(void)(&co= ntainer_of_->member_name =3D=3D=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0\
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 (typeof(inner_ptr))0) /* type check */;=C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 \
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0container_of_;= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 \=
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0})
> +
> +#define UNREGISTER_EVENT(id)=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0\
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0do {=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 \
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (id >=3D= 0) {=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 \
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0tapdisk_server_unregister_event(id);=C2=A0 =C2=A0 \
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0id =3D -1;=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 \
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}=C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0\
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0} while (0)
> +#define CLOSE_FD(fd)=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0\
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0do {=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 \
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (fd >=3D= 0) {=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 \
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0close(fd);=C2=A0 =C2=A0 =C2=A0 \
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0fd =3D -1;=C2=A0 =C2=A0 =C2=A0 =C2=A0 \
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}=C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0\
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0} while (0)
> +
> +enum {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0ERROR_INTERNAL =3D -1,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0ERROR_IO =3D -2,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0ERROR_CONNECTION =3D -3,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0ERROR_CLOSE =3D -4,
> +};
> +
> +typedef struct td_replication_connect td_replication_connect_t;
> +typedef void td_replication_callback(td_replication_connect_t *r, int= rc);
> +
> +struct td_replication_connect {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/*
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * caller must fill these in before callin= g
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * td_replication_connect_init()
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0const char *log_prefix;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0td_replication_callback *callback;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0int retry_timeout_s;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0int max_connections;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/*
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * The caller uses this fd to read/write a= fter
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * the connection is established
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0int fd;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/* private */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0struct sockaddr_in sa;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0int listen_fd;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0event_id_t id;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0int status;
> +};
> +
> +/* return -errno if failure happened, otherwise return 0 */
> +int td_replication_connect_init(td_replication_connect_t *t, const ch= ar *name);
> +/*
> + * Return value:
> + *=C2=A0 =C2=A0-1: connection is closed or not connected
> + *=C2=A0 =C2=A0 0: connection is in progress
> + *=C2=A0 =C2=A0 1: connection is established
> + */
> +int td_replication_connect_status(td_replication_connect_t *t);
> +void td_replication_connect_kill(td_replication_connect_t *t);
> +
> +/*
> + * Return value:
> + *=C2=A0 =C2=A0-2: this caller should be client
> + *=C2=A0 =C2=A0-1: error
> + *=C2=A0 =C2=A0 0: connection is in progress
> + */
> +int td_replication_server_start(td_replication_connect_t *t);
> +/*
> + * Return value:
> + *=C2=A0 =C2=A0-2: this caller should be client
> + *=C2=A0 =C2=A0-1: error
> + *=C2=A0 =C2=A0 0: connection is in progress
> + */
> +int td_replication_server_restart(td_replication_connect_t *t);
> +/*
> + * Return value:
> + *=C2=A0 =C2=A0-1: error
> + *=C2=A0 =C2=A0 0: connection is in progress
> + */
> +int td_replication_client_start(td_replication_connect_t *t);
> +
> +#endif
> --
> 1.9.3
>

Acked-by:=C2=A0 Shriram Rajagopalan <rshriram@cs.ubc.ca>

--bcaec5014c9fdc65c80503d42006-- --===============8024538590219703831== Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Content-Disposition: inline _______________________________________________ Xen-devel mailing list Xen-devel@lists.xen.org http://lists.xen.org/xen-devel --===============8024538590219703831==--