On Sep 5, 2014 5:31 AM, "Wen Congyang" <wency@cn.fujitsu.com> wrote:
>
>    COLO will reuse them.
>
> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
> Cc: Shriram Rajagopalan <rshriram@cs.ubc.ca>
> ---
>  tools/blktap2/drivers/Makefile            |   2 +-
>  tools/blktap2/drivers/block-remus.c       | 494 +++---------------------------
>  tools/blktap2/drivers/block-replication.c | 468 ++++++++++++++++++++++++++++
>  tools/blktap2/drivers/block-replication.h | 113 +++++++
>  4 files changed, 630 insertions(+), 447 deletions(-)
>  create mode 100644 tools/blktap2/drivers/block-replication.c
>  create mode 100644 tools/blktap2/drivers/block-replication.h
>
> diff --git a/tools/blktap2/drivers/Makefile b/tools/blktap2/drivers/Makefile
> index 37c3485..3d8ed8a 100644
> --- a/tools/blktap2/drivers/Makefile
> +++ b/tools/blktap2/drivers/Makefile
> @@ -23,7 +23,7 @@ endif
>
>  VHDLIBS    := -L$(LIBVHDDIR) -lvhd
>
> -REMUS-OBJS  := block-remus.o
> +REMUS-OBJS  := block-remus.o block-replication.o
>  REMUS-OBJS  += hashtable.o
>  REMUS-OBJS  += hashtable_itr.o
>  REMUS-OBJS  += hashtable_utility.o
> diff --git a/tools/blktap2/drivers/block-remus.c b/tools/blktap2/drivers/block-remus.c
> index 5d27d41..8b6f157 100644
> --- a/tools/blktap2/drivers/block-remus.c
> +++ b/tools/blktap2/drivers/block-remus.c
> @@ -40,6 +40,7 @@
>  #include "hashtable.h"
>  #include "hashtable_itr.h"
>  #include "hashtable_utility.h"
> +#include "block-replication.h"
>
>  #include <errno.h>
>  #include <inttypes.h>
> @@ -49,10 +50,7 @@
>  #include <string.h>
>  #include <sys/time.h>
>  #include <sys/types.h>
> -#include <sys/socket.h>
> -#include <netdb.h>
>  #include <netinet/in.h>
> -#include <arpa/inet.h>
>  #include <sys/param.h>
>  #include <sys/sysctl.h>
>  #include <unistd.h>
> @@ -67,22 +65,6 @@
>
>  #define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: " _f, ## _a)
>
> -#define UNREGISTER_EVENT(id)                                   \
> -       do {                                                    \
> -               if (id >= 0) {                                  \
> -                       tapdisk_server_unregister_event(id);    \
> -                       id = -1;                                \
> -               }                                               \
> -       } while (0)
> -
> -#define CLOSE_FD(fd)                   \
> -       do {                            \
> -               if (fd >= 0) {          \
> -                       close(fd);      \
> -                       fd = -1;        \
> -               }                       \
> -       } while (0)
> -
>  #define MAX_REMUS_REQUEST       TAPDISK_DATA_REQUESTS
>
>  enum tdremus_mode {
> @@ -92,13 +74,6 @@ enum tdremus_mode {
>         mode_backup
>  };
>
> -enum {
> -       ERROR_INTERNAL = -1,
> -       ERROR_IO = -2,
> -       ERROR_CONNECTION = -3,
> -       ERROR_CLOSE = -4,
> -};
> -
>  struct tdremus_req {
>         td_request_t treq;
>  };
> @@ -167,21 +142,9 @@ struct ramdisk_write_cbdata {
>
>  typedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq);
>
> -/*
> - * If cid, rid and wid are -1, fd must be -1. It means that
> - * we are in unpritected mode or we don't start to connect
> - * to backup.
> - * If fd is an valid fd:
> - *  cid is valid, rid and wid must be invalid. It means that
> - *      the connection is in progress.
> - *  cid is invalid. rid or wid must be valid. It means that
> - *      the connection is established.
> - */
>  typedef struct poll_fd {
>         int        fd;
> -       event_id_t cid;
> -       event_id_t rid;
> -       event_id_t wid;
> +       event_id_t id;
>  } poll_fd_t;
>
>  struct tdremus_state {
> @@ -195,9 +158,7 @@ struct tdremus_state {
>         char*     msg_path; /* output completion message here */
>         poll_fd_t msg_fd;
>
> -  /* replication host */
> -       struct sockaddr_in sa;
> -       poll_fd_t server_fd;    /* server listen port */
> +       td_replication_connect_t t;
>         poll_fd_t stream_fd;     /* replication channel */
>
>         /*
> @@ -777,28 +738,8 @@ static int mwrite(int fd, void* buf, size_t len)
>         select(fd + 1, NULL, &wfds, NULL, &tv);
>  }
>
> -
> -static void inline close_stream_fd(struct tdremus_state *s)
> -{
> -
> -       UNREGISTER_EVENT(s->stream_fd.cid);
> -       UNREGISTER_EVENT(s->stream_fd.rid);
> -       UNREGISTER_EVENT(s->stream_fd.wid);
> -
> -       /* close the connection */
> -       CLOSE_FD(s->stream_fd.fd);
> -}
> -
> -static void close_server_fd(struct tdremus_state *s)
> -{
> -       UNREGISTER_EVENT(s->server_fd.cid);
> -       CLOSE_FD(s->server_fd.fd);
> -}
> -
>  /* primary functions */
> -static void remus_client_event(event_id_t, char mode, void *private);
> -static void remus_connect_event(event_id_t id, char mode, void *private);
> -static void remus_retry_connect_event(event_id_t id, char mode, void *private);
> +static void remus_client_event(event_id_t id, char mode, void *private);
>  static int primary_forward_request(struct tdremus_state *s,
>                                    const td_request_t *treq);
>
> @@ -808,56 +749,15 @@ static int primary_forward_request(struct tdremus_state *s,
>   */
>  static void primary_failed(struct tdremus_state *s, int rc)
>  {
> -       close_stream_fd(s);
> +       td_replication_connect_kill(&s->t);
>         if (rc == ERROR_INTERNAL)
>                 RPRINTF("switch to unprotected mode due to internal error");
>         if (rc == ERROR_CLOSE)
>                 RPRINTF("switch to unprotected mode before closing");
> +       UNREGISTER_EVENT(s->stream_fd.id);
>         switch_mode(s->tdremus_driver, mode_unprotected);
>  }
>
> -static int primary_do_connect(struct tdremus_state *state)
> -{
> -       event_id_t id;
> -       int fd;
> -       int rc;
> -       int flags;
> -
> -       RPRINTF("client connecting to %s:%d...\n",
> -               inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
> -
> -       if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
> -               RPRINTF("could not create client socket: %d\n", errno);
> -               return ERROR_INTERNAL;
> -       }
> -       state->stream_fd.fd = fd;
> -
> -       /* make socket nonblocking */
> -       if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
> -               flags = 0;
> -       if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
> -               RPRINTF("error setting fd %d to non block mode\n", fd);
> -               return ERROR_INTERNAL;
> -       }
> -
> -       /*
> -        * once we have created the socket and populated the address,
> -        * we can now start our non-blocking connect. rather than
> -        * duplicating code we trigger a timeout on the socket fd,
> -        * which calls out nonblocking connect code
> -        */
> -       if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0,
> -                                              remus_retry_connect_event,
> -                                              state)) < 0) {
> -               RPRINTF("error registering timeout client connection event handler: %s\n",
> -                       strerror(id));
> -               return ERROR_INTERNAL;
> -       }
> -
> -       state->stream_fd.cid = id;
> -       return 0;
> -}
> -
>  static int remus_handle_queued_io(struct tdremus_state *s)
>  {
>         struct req_ring *queued_io = &s->queued_io;
> @@ -882,184 +782,35 @@ static int remus_handle_queued_io(struct tdremus_state *s)
>         return 0;
>  }
>
> -static int remus_connection_done(struct tdremus_state *s)
> +static void remus_client_established(td_replication_connect_t *t, int rc)
>  {
> +       struct tdremus_state *s = CONTAINER_OF(t, *s, t);
>         event_id_t id;
>
> -       /* the connect succeeded */
> -       /* unregister this function and register a new event handler */
> -       tapdisk_server_unregister_event(s->stream_fd.cid);
> -       s->stream_fd.cid = -1;
> +       if (rc) {
> +               primary_failed(s, rc);
> +               return;
> +       }
>
> -       id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->stream_fd.fd,
> +       /* the connect succeeded */
> +       id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd,
>                                            0, remus_client_event, s);
>         if(id < 0) {
>                 RPRINTF("error registering client event handler: %s\n",
>                         strerror(id));
> -               return ERROR_INTERNAL;
> -       }
> -       s->stream_fd.rid = id;
> -
> -       /* handle the queued requests */
> -       return remus_handle_queued_io(s);
> -}
> -
> -static int remus_retry_connect(struct tdremus_state *s)
> -{
> -       event_id_t id;
> -
> -       tapdisk_server_unregister_event(s->stream_fd.cid);
> -       s->stream_fd.cid = -1;
> -
> -       RPRINTF("connect to backup 1 second later");
> -       id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
> -                                          s->stream_fd.fd,
> -                                          REMUS_CONNRETRY_TIMEOUT,
> -                                          remus_retry_connect_event, s);
> -       if (id < 0) {
> -               RPRINTF("error registering timeout client connection event handler: %s\n",
> -                       strerror(id));
> -               return ERROR_INTERNAL;
> -       }
> -
> -       s->stream_fd.cid = id;
> -       return 0;
> -}
> -
> -static int remus_wait_connect_done(struct tdremus_state *s)
> -{
> -       event_id_t id;
> -
> -       tapdisk_server_unregister_event(s->stream_fd.cid);
> -       s->stream_fd.cid = -1;
> -
> -       id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD,
> -                                          s->stream_fd.fd, 0,
> -                                          remus_connect_event, s);
> -       if (id < 0) {
> -               RPRINTF("error registering client connection event handler: %s\n",
> -                       strerror(id));
> -               return ERROR_INTERNAL;
> -       }
> -       s->stream_fd.cid = id;
> -
> -       return 0;
> -}
> -
> -/* return 1 if we need to reconnect to backup */
> -static int check_connect_errno(int err)
> -{
> -       /*
> -        * The fd is non-block, so we will not get ETIMEDOUT
> -        * after calling connect(). We only can get this errno
> -        * by getsockopt().
> -        */
> -       if (err == ECONNREFUSED || err == ENETUNREACH ||
> -           err == EAGAIN || err == ECONNABORTED ||
> -           err == ETIMEDOUT)
> -           return 1;
> -
> -       return 0;
> -}
> -
> -static void remus_retry_connect_event(event_id_t id, char mode, void *private)
> -{
> -       struct tdremus_state *s = (struct tdremus_state *)private;
> -       int rc, ret;
> -
> -       /* do a non-blocking connect */
> -       ret = connect(s->stream_fd.fd,
> -                     (struct sockaddr *)&s->sa,
> -                     sizeof(s->sa));
> -       if (ret) {
> -               if (errno == EINPROGRESS) {
> -                       /*
> -                        * the connect returned EINPROGRESS (nonblocking
> -                        * connect) we must wait for the fd to be writeable
> -                        * to determine if the connect worked
> -                        */
> -                       rc = remus_wait_connect_done(s);
> -                       if (rc)
> -                               goto fail;
> -                       return;
> -               }
> -
> -               if (check_connect_errno(errno)) {
> -                       rc = remus_retry_connect(s);
> -                       if (rc)
> -                               goto fail;
> -                       return;
> -               }
> -
> -               /* not recoverable */
> -               RPRINTF("error connection to server %s\n", strerror(errno));
> -               rc = ERROR_CONNECTION;
> -               goto fail;
> -       }
> -
> -       /* The connection is established unexpectedly */
> -       rc = remus_connection_done(s);
> -       if (rc)
> -               goto fail;
> -
> -       return;
> -
> -fail:
> -       primary_failed(s, rc);
> -       return;
> -}
> -
> -/* callback when nonblocking connect() is finished */
> -static void remus_connect_event(event_id_t id, char mode, void *private)
> -{
> -       int socket_errno;
> -       socklen_t socket_errno_size;
> -       struct tdremus_state *s = (struct tdremus_state *)private;
> -       int rc;
> -
> -       /* check to see if the connect succeeded */
> -       socket_errno_size = sizeof(socket_errno);
> -       if (getsockopt(s->stream_fd.fd, SOL_SOCKET, SO_ERROR,
> -                      &socket_errno, &socket_errno_size)) {
> -               RPRINTF("error getting socket errno\n");
> +               primary_failed(s, ERROR_INTERNAL);
>                 return;
>         }
>
> -       RPRINTF("socket connect returned %d\n", socket_errno);
> +       s->stream_fd.fd = t->fd;
> +       s->stream_fd.id = id;
>
> -       if (socket_errno) {
> -               /* the connect did not succeed */
> -               if (check_connect_errno(socket_errno)) {
> -                       /*
> -                        * we can probably assume that the backup is down.
> -                        * just try again later
> -                        */
> -                       rc = remus_retry_connect(s);
> -                       if (rc)
> -                               goto fail;
> -
> -                       return;
> -               } else {
> -                       RPRINTF("socket connect returned %d, giving up\n",
> -                               socket_errno);
> -                       rc = ERROR_CONNECTION;
> -                       goto fail;
> -               }
> -
> -               return;
> -       }
> -
> -       rc = remus_connection_done(s);
> +       /* handle the queued requests */
> +       rc = remus_handle_queued_io(s);
>         if (rc)
> -               goto fail;
> -
> -       return;
> -
> -fail:
> -       primary_failed(s, rc);
> +               primary_failed(s, rc);
>  }
>
> -
>  /*
>   * we install this event handler on the primary once we have
>   * connected to the backup.
> @@ -1142,19 +893,21 @@ static int primary_forward_request(struct tdremus_state *s,
>  static void primary_queue_write(td_driver_t *driver, td_request_t treq)
>  {
>         struct tdremus_state *s = (struct tdremus_state *)driver->data;
> -       int rc;
> +       int rc, ret;
>
>         // RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd);
>
> -       if(s->stream_fd.fd < 0) {
> +       ret = td_replication_connect_status(&s->t);
> +       if(ret == -1) {
>                 RPRINTF("connecting to backup...\n");
> -               rc = primary_do_connect(s);
> +               s->t.callback = remus_client_established;
> +               rc = td_replication_client_start(&s->t);
>                 if (rc)
>                         goto fail;
>         }
>
>         /* The connection is not established, just queue the request */
> -       if (s->stream_fd.cid >= 0) {
> +       if (ret != 1) {
>                 ring_add_request(&s->queued_io, &treq);
>                 return;
>         }
> @@ -1227,9 +980,7 @@ static int primary_start(td_driver_t *driver)
>         s->queue_flush = primary_flush;
>
>         s->stream_fd.fd = -1;
> -       s->stream_fd.cid = -1;
> -       s->stream_fd.rid = -1;
> -       s->stream_fd.wid = -1;
> +       s->stream_fd.id = -1;
>
>         return 0;
>  }
> @@ -1240,100 +991,32 @@ static void remus_server_event(event_id_t id, char mode, void *private);
>  /* It is called when we find some I/O error */
>  static void backup_failed(struct tdremus_state *s, int rc)
>  {
> -       close_stream_fd(s);
> -       close_server_fd(s);
> +       td_replication_connect_kill(&s->t);
>         /* We will switch to unprotected mode in backup_queue_write() */
>  }
>
>  /* returns the socket that receives write requests */
> -static void remus_server_accept(event_id_t id, char mode, void* private)
> +static void remus_server_established(td_replication_connect_t *t, int rc)
>  {
> -       struct tdremus_state* s = (struct tdremus_state *) private;
> -
> -       int stream_fd;
> -
> -       /* XXX: add address-based black/white list */
> -       if ((stream_fd = accept(s->server_fd.fd, NULL, NULL)) < 0) {
> -               RPRINTF("error accepting connection: %d\n", errno);
> -               return;
> -       }
> +       struct tdremus_state *s = CONTAINER_OF(t, *s, t);
> +       event_id_t id;
>
> -       /*
> -        * TODO: check to see if we are already replicating.
> -        * if so just close the connection (or do something
> -        * smarter)
> -        */
> -       RPRINTF("server accepted connection\n");
> +       /* rc is always 0 */
>
>         /* add tapdisk event for replication stream */
> -       id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd, 0,
> +       id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd, 0,
>                                            remus_server_event, s);
>
>         if (id < 0) {
>                 RPRINTF("error registering connection event handler: %s\n",
>                         strerror(errno));
> -               close(stream_fd);
> +               td_replication_server_restart(t);
>                 return;
>         }
>
>         /* store replication file descriptor */
> -       s->stream_fd.fd = stream_fd;
> -       s->stream_fd.rid = id;
> -}
> -
> -/* returns -2 if EADDRNOTAVAIL */
> -static int remus_bind(struct tdremus_state* s)
> -{
> -       int opt;
> -       int rc = -1;
> -       event_id_t id;
> -
> -       if ((s->server_fd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
> -               RPRINTF("could not create server socket: %d\n", errno);
> -               return rc;
> -       }
> -
> -       opt = 1;
> -       if (setsockopt(s->server_fd.fd, SOL_SOCKET,
> -                      SO_REUSEADDR, &opt, sizeof(opt)) < 0)
> -               RPRINTF("Error setting REUSEADDR on %d: %d\n",
> -                       s->server_fd.fd, errno);
> -
> -       if (bind(s->server_fd.fd, (struct sockaddr *)&s->sa,
> -                sizeof(s->sa)) < 0) {
> -               RPRINTF("could not bind server socket %d to %s:%d: %d %s\n",
> -                       s->server_fd.fd, inet_ntoa(s->sa.sin_addr),
> -                       ntohs(s->sa.sin_port), errno, strerror(errno));
> -               if (errno == EADDRNOTAVAIL)
> -                       rc = -2;
> -               goto err_sfd;
> -       }
> -
> -       if (listen(s->server_fd.fd, 10)) {
> -               RPRINTF("could not listen on socket: %d\n", errno);
> -               goto err_sfd;
> -       }
> -
> -       /*
> -        * The socket s now bound to the address and listening so we
> -        * may now register the fd with tapdisk
> -        */
> -       id =  tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
> -                                           s->server_fd.fd, 0,
> -                                           remus_server_accept, s);
> -       if (id < 0) {
> -               RPRINTF("error registering server connection event handler: %s",
> -                       strerror(id));
> -               goto err_sfd;
> -       }
> -       s->server_fd.cid = id;
> -
> -       return 0;
> -
> -err_sfd:
> -       CLOSE_FD(s->server_fd.fd);
> -
> -       return rc;
> +       s->stream_fd.fd = t->fd;
> +       s->stream_fd.id = id;
>  }
>
>  /* wait for latest checkpoint to be applied */
> @@ -1566,90 +1249,6 @@ static int unprotected_start(td_driver_t *driver)
>
>
>  /* control */
> -
> -static inline int resolve_address(const char* addr, struct in_addr* ia)
> -{
> -       struct hostent* he;
> -       uint32_t ip;
> -
> -       if (!(he = gethostbyname(addr))) {
> -               RPRINTF("error resolving %s: %d\n", addr, h_errno);
> -               return -1;
> -       }
> -
> -       if (!he->h_addr_list[0]) {
> -               RPRINTF("no address found for %s\n", addr);
> -               return -1;
> -       }
> -
> -       /* network byte order */
> -       ip = *((uint32_t**)he->h_addr_list)[0];
> -       ia->s_addr = ip;
> -
> -       return 0;
> -}
> -
> -static int get_args(td_driver_t *driver, const char* name)
> -{
> -       struct tdremus_state *state = (struct tdremus_state *)driver->data;
> -       char* host;
> -       char* port;
> -//  char* driver_str;
> -//  char* parent;
> -//  int type;
> -//  char* path;
> -//  unsigned long ulport;
> -//  int i;
> -//  struct sockaddr_in server_addr_in;
> -
> -       int gai_status;
> -       int valid_addr;
> -       struct addrinfo gai_hints;
> -       struct addrinfo *servinfo, *servinfo_itr;
> -
> -       memset(&gai_hints, 0, sizeof gai_hints);
> -       gai_hints.ai_family = AF_UNSPEC;
> -       gai_hints.ai_socktype = SOCK_STREAM;
> -
> -       port = strchr(name, ':');
> -       if (!port) {
> -               RPRINTF("missing host in %s\n", name);
> -               return -ENOENT;
> -       }
> -       if (!(host = strndup(name, port - name))) {
> -               RPRINTF("unable to allocate host\n");
> -               return -ENOMEM;
> -       }
> -       port++;
> -
> -       if ((gai_status = getaddrinfo(host, port, &gai_hints, &servinfo)) != 0) {
> -               RPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status));
> -               return -ENOENT;
> -       }
> -
> -       /* TODO: do something smarter here */
> -       valid_addr = 0;
> -       for(servinfo_itr = servinfo; servinfo_itr != NULL; servinfo_itr = servinfo_itr->ai_next) {
> -               void *addr;
> -               char *ipver;
> -
> -               if (servinfo_itr->ai_family == AF_INET) {
> -                       valid_addr = 1;
> -                       memset(&state->sa, 0, sizeof(state->sa));
> -                       state->sa = *(struct sockaddr_in *)servinfo_itr->ai_addr;
> -                       break;
> -               }
> -       }
> -       freeaddrinfo(servinfo);
> -
> -       if (!valid_addr)
> -               return -ENOENT;
> -
> -       RPRINTF("host: %s, port: %d\n", inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
> -
> -       return 0;
> -}
> -
>  static int switch_mode(td_driver_t *driver, enum tdremus_mode mode)
>  {
>         struct tdremus_state *s = (struct tdremus_state *)driver->data;
> @@ -1844,11 +1443,11 @@ static int ctl_register(struct tdremus_state *s)
>         RPRINTF("registering ctl fifo\n");
>
>         /* register ctl fd */
> -       s->ctl_fd.cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->ctl_fd.fd, 0, ctl_request, s);
> +       s->ctl_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->ctl_fd.fd, 0, ctl_request, s);
>
> -       if (s->ctl_fd.cid < 0) {
> +       if (s->ctl_fd.id < 0) {
>                 RPRINTF("error registering ctrl FIFO %s: %d\n",
> -                       s->ctl_path, s->ctl_fd.cid);
> +                       s->ctl_path, s->ctl_fd.id);
>                 return -1;
>         }
>
> @@ -1859,7 +1458,7 @@ static void ctl_unregister(struct tdremus_state *s)
>  {
>         RPRINTF("unregistering ctl fifo\n");
>
> -       UNREGISTER_EVENT(s->ctl_fd.cid);
> +       UNREGISTER_EVENT(s->ctl_fd.id);
>  }
>
>  /* interface */
> @@ -1867,6 +1466,7 @@ static void ctl_unregister(struct tdremus_state *s)
>  static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid)
>  {
>         struct tdremus_state *s = (struct tdremus_state *)driver->data;
> +       td_replication_connect_t *t = &s->t;
>         int rc;
>         const char *name = image->name;
>         td_flag_t flags = image->flags;
> @@ -1877,7 +1477,6 @@ static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid)
>         remus_image = image;
>
>         memset(s, 0, sizeof(*s));
> -       s->server_fd.fd = -1;
>         s->stream_fd.fd = -1;
>         s->ctl_fd.fd = -1;
>         s->msg_fd.fd = -1;
> @@ -1886,8 +1485,12 @@ static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid)
>          * the driver stack from the stream_fd event handler */
>         s->tdremus_driver = driver;
>
> +       t->log_prefix = "remus";
> +       t->retry_timeout_s = REMUS_CONNRETRY_TIMEOUT;
> +       t->max_connections = 10;
> +       t->callback = remus_server_established;
>         /* parse name to get info etc */
> -       if ((rc = get_args(driver, name)))
> +       if ((rc = td_replication_connect_init(t, name)))
>                 return rc;
>
>         if ((rc = ctl_open(driver, name))) {
> @@ -1901,7 +1504,7 @@ static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid)
>                 return rc;
>         }
>
> -       if (!(rc = remus_bind(s)))
> +       if (!(rc = td_replication_server_start(t)))
>                 rc = switch_mode(driver, mode_backup);
>         else if (rc == -2)
>                 rc = switch_mode(driver, mode_primary);
> @@ -1932,8 +1535,7 @@ static int tdremus_close(td_driver_t *driver)
>         if (s->ramdisk.inprogress)
>                 hashtable_destroy(s->ramdisk.inprogress, 0);
>
> -       close_server_fd(s);
> -       close_stream_fd(s);
> +       td_replication_connect_kill(&s->t);
>         ctl_unregister(s);
>         ctl_close(s);
>
> diff --git a/tools/blktap2/drivers/block-replication.c b/tools/blktap2/drivers/block-replication.c
> new file mode 100644
> index 0000000..e4b2679
> --- /dev/null
> +++ b/tools/blktap2/drivers/block-replication.c
> @@ -0,0 +1,468 @@
> +/*
> + * Copyright (C) 2014 FUJITSU LIMITED
> + * Author: Wen Congyang <wency@cn.fujitsu.com>
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU Lesser General Public License as published
> + * by the Free Software Foundation; version 2.1 only. with the special
> + * exception on linking described in file LICENSE.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> + * GNU Lesser General Public License for more details.
> + */
> +
> +#include "tapdisk-server.h"
> +#include "block-replication.h"
> +
> +#include <string.h>
> +#include <errno.h>
> +#include <sys/types.h>
> +#include <unistd.h>
> +#include <fcntl.h>
> +#include <syslog.h>
> +#include <stdlib.h>
> +#include <arpa/inet.h>
> +
> +#undef DPRINTF
> +#undef EPRINTF
> +#define DPRINTF(_f, _a...) syslog (LOG_DEBUG, "%s: " _f, log_prefix, ## _a)
> +#define EPRINTF(_f, _a...) syslog (LOG_ERR, "%s: " _f, log_prefix, ## _a)
> +
> +/* connection status */
> +enum {
> +       connection_none,
> +       connection_in_progress,
> +       connection_established,
> +       connection_closed,
> +};
> +
> +/* common functions */
> +/* args should be host:port */
> +static int get_args(td_replication_connect_t *t, const char* name)
> +{
> +       char* host;
> +       const char* port;
> +       int gai_status;
> +       int valid_addr;
> +       struct addrinfo gai_hints;
> +       struct addrinfo *servinfo, *servinfo_itr;
> +       const char *log_prefix = t->log_prefix;
> +
> +       memset(&gai_hints, 0, sizeof gai_hints);
> +       gai_hints.ai_family = AF_UNSPEC;
> +       gai_hints.ai_socktype = SOCK_STREAM;
> +
> +       port = strchr(name, ':');
> +       if (!port) {
> +               EPRINTF("missing host in %s\n", name);
> +               return -ENOENT;
> +       }
> +       if (!(host = strndup(name, port - name))) {
> +               EPRINTF("unable to allocate host\n");
> +               return -ENOMEM;
> +       }
> +       port++;
> +       if ((gai_status = getaddrinfo(host, port,
> +                                     &gai_hints, &servinfo)) != 0) {
> +               EPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status));
> +               free(host);
> +               return -ENOENT;
> +       }
> +       free(host);
> +
> +       /* TODO: do something smarter here */
> +       valid_addr = 0;
> +       for (servinfo_itr = servinfo; servinfo_itr != NULL;
> +            servinfo_itr = servinfo_itr->ai_next) {
> +               if (servinfo_itr->ai_family == AF_INET) {
> +                       valid_addr = 1;
> +                       memset(&t->sa, 0, sizeof(t->sa));
> +                       t->sa = *(struct sockaddr_in *)servinfo_itr->ai_addr;
> +                       break;
> +               }
> +       }
> +       freeaddrinfo(servinfo);
> +
> +       if (!valid_addr)
> +               return -ENOENT;
> +
> +       DPRINTF("host: %s, port: %d\n", inet_ntoa(t->sa.sin_addr),
> +               ntohs(t->sa.sin_port));
> +
> +       return 0;
> +}
> +
> +int td_replication_connect_init(td_replication_connect_t *t, const char *name)
> +{
> +       int rc;
> +
> +       rc = get_args(t, name);
> +       if (rc)
> +               return rc;
> +
> +       t->listen_fd = -1;
> +       t->id = -1;
> +       t->status = connection_none;
> +       return 0;
> +}
> +
> +int td_replication_connect_status(td_replication_connect_t *t)
> +{
> +       const char *log_prefix = t->log_prefix;
> +
> +       switch (t->status) {
> +       case connection_none:
> +       case connection_closed:
> +               return -1;
> +       case connection_in_progress:
> +               return 0;
> +       case connection_established:
> +               return 1;
> +       default:
> +               EPRINTF("td_replication_connect is corruptted\n");
> +               return -2;
> +       }
> +}
> +
> +void td_replication_connect_kill(td_replication_connect_t *t)
> +{
> +       if (t->status != connection_in_progress &&
> +           t->status != connection_established)
> +               return;
> +
> +       UNREGISTER_EVENT(t->id);
> +       CLOSE_FD(t->fd);
> +       CLOSE_FD(t->listen_fd);
> +       t->status = connection_closed;
> +}
> +
> +/* server */
> +static void td_replication_server_accept(event_id_t id, char mode,
> +                                        void *private);
> +
> +int td_replication_server_start(td_replication_connect_t *t)
> +{
> +       int opt;
> +       int rc = -1;
> +       event_id_t id;
> +       int fd;
> +       const char *log_prefix = t->log_prefix;
> +
> +       if (t->status == connection_in_progress ||
> +           t->status == connection_established)
> +               return rc;
> +
> +       fd = socket(AF_INET, SOCK_STREAM, 0);
> +       if (fd < 0) {
> +               EPRINTF("could not create server socket: %d\n", errno);
> +               return rc;
> +       }
> +
> +       opt = 1;
> +       if (setsockopt(fd, SOL_SOCKET,
> +                      SO_REUSEADDR, &opt, sizeof(opt)) < 0)
> +               DPRINTF("Error setting REUSEADDR on %d: %d\n", fd, errno);
> +
> +       if (bind(fd, (struct sockaddr *)&t->sa, sizeof(t->sa)) < 0) {
> +               DPRINTF("could not bind server socket %d to %s:%d: %d %s\n",
> +                       fd, inet_ntoa(t->sa.sin_addr),
> +                       ntohs(t->sa.sin_port), errno, strerror(errno));
> +               if (errno == EADDRNOTAVAIL)
> +                       rc = -2;
> +               goto err;
> +       }
> +
> +       if (listen(fd, t->max_connections)) {
> +               EPRINTF("could not listen on socket: %d\n", errno);
> +               goto err;
> +       }
> +
> +       /*
> +        * The socket is now bound to the address and listening so we
> +        * may now register the fd with tapdisk
> +        */
> +       id =  tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
> +                                           fd, 0,
> +                                           td_replication_server_accept, t);
> +       if (id < 0) {
> +               EPRINTF("error registering server connection event handler: %s",
> +                       strerror(id));
> +               goto err;
> +       }
> +       t->listen_fd = fd;
> +       t->id = id;
> +       t->status = connection_in_progress;
> +
> +       return 0;
> +
> +err:
> +       close(fd);
> +       return rc;
> +}
> +
> +static void td_replication_server_accept(event_id_t id, char mode,
> +                                        void *private)
> +{
> +       td_replication_connect_t *t = private;
> +       int fd;
> +       const char *log_prefix = t->log_prefix;
> +
> +       /* XXX: add address-based black/white list */
> +       fd = accept(t->listen_fd, NULL, NULL);
> +       if (fd < 0) {
> +               EPRINTF("error accepting connection: %d\n", errno);
> +               return;
> +       }
> +
> +       if (t->status == connection_established) {
> +               EPRINTF("connection is already established\n");
> +               close(fd);
> +               return;
> +       }
> +
> +       DPRINTF("server accepted connection\n");
> +       t->fd = fd;
> +       t->status = connection_established;
> +       t->callback(t, 0);
> +}
> +
> +int td_replication_server_restart(td_replication_connect_t *t)
> +{
> +       switch (t->status) {
> +       case connection_in_progress:
> +               return 0;
> +       case connection_established:
> +               CLOSE_FD(t->fd);
> +               t->status = connection_in_progress;
> +               return 0;
> +       case connection_none:
> +       case connection_closed:
> +               return td_replication_server_start(t);
> +       default:
> +               /* not reached */
> +               return -1;
> +       }
> +}
> +
> +/* client */
> +static void td_replication_retry_connect_event(event_id_t id, char mode,
> +                                              void *private);
> +static void td_replication_connect_event(event_id_t id, char mode,
> +                                        void *private);
> +int td_replication_client_start(td_replication_connect_t *t)
> +{
> +       event_id_t id;
> +       int fd;
> +       int rc;
> +       int flags;
> +       const char *log_prefix = t->log_prefix;
> +
> +       if (t->status == connection_in_progress ||
> +           t->status == connection_established)
> +               return ERROR_INTERNAL;
> +
> +       DPRINTF("client connecting to %s:%d...\n",
> +               inet_ntoa(t->sa.sin_addr), ntohs(t->sa.sin_port));
> +
> +       if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
> +               EPRINTF("could not create client socket: %d\n", errno);
> +               return ERROR_INTERNAL;
> +       }
> +
> +       /* make socket nonblocking */
> +       if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
> +               flags = 0;
> +       if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
> +               EPRINTF("error setting fd %d to non block mode\n", fd);
> +               goto err;
> +       }
> +
> +       /*
> +        * once we have created the socket and populated the address,
> +        * we can now start our non-blocking connect. rather than
> +        * duplicating code we trigger a timeout on the socket fd,
> +        * which calls out nonblocking connect code
> +        */
> +       id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0,
> +                                          td_replication_retry_connect_event,
> +                                          t);
> +       if(id < 0) {
> +               EPRINTF("error registering timeout client connection event handler: %s\n",
> +                       strerror(id));
> +               goto err;
> +       }
> +
> +       t->fd = fd;
> +       t->id = id;
> +       t->status = connection_in_progress;
> +       return 0;
> +
> +err:
> +       close(fd);
> +       return ERROR_INTERNAL;
> +}
> +
> +static void td_replication_client_failed(td_replication_connect_t *t, int rc)
> +{
> +       td_replication_connect_kill(t);
> +       t->callback(t, rc);
> +}
> +
> +static void td_replication_client_done(td_replication_connect_t *t)
> +{
> +       UNREGISTER_EVENT(t->id);
> +       t->status = connection_established;
> +       t->callback(t, 0);
> +}
> +
> +static int td_replication_retry_connect(td_replication_connect_t *t)
> +{
> +       event_id_t id;
> +       const char *log_prefix = t->log_prefix;
> +
> +       UNREGISTER_EVENT(t->id);
> +
> +       DPRINTF("connect to server 1 second later");
> +       id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
> +                                          t->fd, t->retry_timeout_s,
> +                                          td_replication_retry_connect_event,
> +                                          t);
> +       if (id < 0) {
> +               EPRINTF("error registering timeout client connection event handler: %s\n",
> +                       strerror(id));
> +               return ERROR_INTERNAL;
> +       }
> +
> +       t->id = id;
> +       return 0;
> +}
> +
> +static int td_replication_wait_connect_done(td_replication_connect_t *t)
> +{
> +       event_id_t id;
> +       const char *log_prefix = t->log_prefix;
> +
> +       UNREGISTER_EVENT(t->id);
> +
> +       id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD,
> +                                          t->fd, 0,
> +                                          td_replication_connect_event, t);
> +       if (id < 0) {
> +               EPRINTF("error registering client connection event handler: %s\n",
> +                       strerror(id));
> +               return ERROR_INTERNAL;
> +       }
> +       t->id = id;
> +
> +       return 0;
> +}
> +
> +/* return 1 if we need to reconnect to backup server */
> +static int check_connect_errno(int err)
> +{
> +       /*
> +        * The fd is non-block, so we will not get ETIMEDOUT
> +        * after calling connect(). We only can get this errno
> +        * by getsockopt().
> +        */
> +       if (err == ECONNREFUSED || err == ENETUNREACH ||
> +           err == EAGAIN || err == ECONNABORTED ||
> +           err == ETIMEDOUT)
> +           return 1;
> +
> +       return 0;
> +}
> +
> +static void td_replication_retry_connect_event(event_id_t id, char mode,
> +                                              void *private)
> +{
> +       td_replication_connect_t *t = private;
> +       int rc, ret;
> +       const char *log_prefix = t->log_prefix;
> +
> +       /* do a non-blocking connect */
> +       ret = connect(t->fd, (struct sockaddr *)&t->sa, sizeof(t->sa));
> +       if (ret) {
> +               if (errno == EINPROGRESS) {
> +                       /*
> +                        * the connect returned EINPROGRESS (nonblocking
> +                        * connect) we must wait for the fd to be writeable
> +                        * to determine if the connect worked
> +                        */
> +                       rc = td_replication_wait_connect_done(t);
> +                       if (rc)
> +                               goto fail;
> +                       return;
> +               }
> +
> +               if (check_connect_errno(errno)) {
> +                       rc = td_replication_retry_connect(t);
> +                       if (rc)
> +                               goto fail;
> +                       return;
> +               }
> +
> +               /* not recoverable */
> +               EPRINTF("error connection to server %s\n", strerror(errno));
> +               rc = ERROR_CONNECTION;
> +               goto fail;
> +       }
> +
> +       /* The connection is established unexpectedly */
> +       td_replication_client_done(t);
> +
> +       return;
> +
> +fail:
> +       td_replication_client_failed(t, rc);
> +}
> +
> +/* callback when nonblocking connect() is finished */
> +static void td_replication_connect_event(event_id_t id, char mode,
> +                                        void *private)
> +{
> +       int socket_errno;
> +       socklen_t socket_errno_size;
> +       td_replication_connect_t *t = private;
> +       int rc;
> +       const char *log_prefix = t->log_prefix;
> +
> +       /* check to see if the connect succeeded */
> +       socket_errno_size = sizeof(socket_errno);
> +       if (getsockopt(t->fd, SOL_SOCKET, SO_ERROR,
> +                      &socket_errno, &socket_errno_size)) {
> +               EPRINTF("error getting socket errno\n");
> +               return;
> +       }
> +
> +       DPRINTF("socket connect returned %d\n", socket_errno);
> +
> +       if (socket_errno) {
> +               /* the connect did not succeed */
> +               if (check_connect_errno(socket_errno)) {
> +                       /*
> +                        * we can probably assume that the backup is down.
> +                        * just try again later
> +                        */
> +                       rc = td_replication_retry_connect(t);
> +                       if (rc)
> +                               goto fail;
> +
> +                       return;
> +               } else {
> +                       EPRINTF("socket connect returned %d, giving up\n",
> +                               socket_errno);
> +                       rc = ERROR_CONNECTION;
> +                       goto fail;
> +               }
> +       }
> +
> +       td_replication_client_done(t);
> +
> +       return;
> +
> +fail:
> +       td_replication_client_failed(t, rc);
> +}
> diff --git a/tools/blktap2/drivers/block-replication.h b/tools/blktap2/drivers/block-replication.h
> new file mode 100644
> index 0000000..0bd6e71
> --- /dev/null
> +++ b/tools/blktap2/drivers/block-replication.h
> @@ -0,0 +1,113 @@
> +/*
> + * Copyright (C) 2014 FUJITSU LIMITED
> + * Author: Wen Congyang <wency@cn.fujitsu.com>
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU Lesser General Public License as published
> + * by the Free Software Foundation; version 2.1 only. with the special
> + * exception on linking described in file LICENSE.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> + * GNU Lesser General Public License for more details.
> + */
> +
> +#ifndef BLOCK_REPLICATION_H
> +#define BLOCK_REPLICATION_H
> +
> +#include "scheduler.h"
> +#include <sys/socket.h>
> +#include <netdb.h>
> +
> +#define CONTAINER_OF(inner_ptr, outer, member_name)                    \
> +       ({                                                              \
> +               typeof(outer) *container_of_;                           \
> +               container_of_ = (void*)((char*)(inner_ptr) -            \
> +                               offsetof(typeof(outer), member_name));  \
> +               (void)(&container_of_->member_name ==                   \
> +                      (typeof(inner_ptr))0) /* type check */;          \
> +               container_of_;                                          \
> +       })
> +
> +#define UNREGISTER_EVENT(id)                                   \
> +       do {                                                    \
> +               if (id >= 0) {                                  \
> +                       tapdisk_server_unregister_event(id);    \
> +                       id = -1;                                \
> +               }                                               \
> +       } while (0)
> +#define CLOSE_FD(fd)                   \
> +       do {                            \
> +               if (fd >= 0) {          \
> +                       close(fd);      \
> +                       fd = -1;        \
> +               }                       \
> +       } while (0)
> +
> +enum {
> +       ERROR_INTERNAL = -1,
> +       ERROR_IO = -2,
> +       ERROR_CONNECTION = -3,
> +       ERROR_CLOSE = -4,
> +};
> +
> +typedef struct td_replication_connect td_replication_connect_t;
> +typedef void td_replication_callback(td_replication_connect_t *r, int rc);
> +
> +struct td_replication_connect {
> +       /*
> +        * caller must fill these in before calling
> +        * td_replication_connect_init()
> +        */
> +       const char *log_prefix;
> +       td_replication_callback *callback;
> +       int retry_timeout_s;
> +       int max_connections;
> +       /*
> +        * The caller uses this fd to read/write after
> +        * the connection is established
> +        */
> +       int fd;
> +
> +       /* private */
> +       struct sockaddr_in sa;
> +       int listen_fd;
> +       event_id_t id;
> +
> +       int status;
> +};
> +
> +/* return -errno if failure happened, otherwise return 0 */
> +int td_replication_connect_init(td_replication_connect_t *t, const char *name);
> +/*
> + * Return value:
> + *   -1: connection is closed or not connected
> + *    0: connection is in progress
> + *    1: connection is established
> + */
> +int td_replication_connect_status(td_replication_connect_t *t);
> +void td_replication_connect_kill(td_replication_connect_t *t);
> +
> +/*
> + * Return value:
> + *   -2: this caller should be client
> + *   -1: error
> + *    0: connection is in progress
> + */
> +int td_replication_server_start(td_replication_connect_t *t);
> +/*
> + * Return value:
> + *   -2: this caller should be client
> + *   -1: error
> + *    0: connection is in progress
> + */
> +int td_replication_server_restart(td_replication_connect_t *t);
> +/*
> + * Return value:
> + *   -1: error
> + *    0: connection is in progress
> + */
> +int td_replication_client_start(td_replication_connect_t *t);
> +
> +#endif
> --
> 1.9.3
>

Acked-by:  Shriram Rajagopalan <rshriram@cs.ubc.ca>