On Sep 5, 2014 5:31 AM, "Wen Congyang" <wency@cn.fujitsu.com> wrote:
>
> COLO will reuse them.
>
> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
> Cc: Shriram Rajagopalan <rshriram@cs.ubc.ca>
> ---
> tools/blktap2/drivers/Makefile | 2 +-
> tools/blktap2/drivers/block-remus.c | 494 +++---------------------------
> tools/blktap2/drivers/block-replication.c | 468 ++++++++++++++++++++++++++++
> tools/blktap2/drivers/block-replication.h | 113 +++++++
> 4 files changed, 630 insertions(+), 447 deletions(-)
> create mode 100644 tools/blktap2/drivers/block-replication.c
> create mode 100644 tools/blktap2/drivers/block-replication.h
>
> diff --git a/tools/blktap2/drivers/Makefile b/tools/blktap2/drivers/Makefile
> index 37c3485..3d8ed8a 100644
> --- a/tools/blktap2/drivers/Makefile
> +++ b/tools/blktap2/drivers/Makefile
> @@ -23,7 +23,7 @@ endif
>
> VHDLIBS := -L$(LIBVHDDIR) -lvhd
>
> -REMUS-OBJS := block-remus.o
> +REMUS-OBJS := block-remus.o block-replication.o
> REMUS-OBJS += hashtable.o
> REMUS-OBJS += hashtable_itr.o
> REMUS-OBJS += hashtable_utility.o
> diff --git a/tools/blktap2/drivers/block-remus.c b/tools/blktap2/drivers/block-remus.c
> index 5d27d41..8b6f157 100644
> --- a/tools/blktap2/drivers/block-remus.c
> +++ b/tools/blktap2/drivers/block-remus.c
> @@ -40,6 +40,7 @@
> #include "hashtable.h"
> #include "hashtable_itr.h"
> #include "hashtable_utility.h"
> +#include "block-replication.h"
>
> #include <errno.h>
> #include <inttypes.h>
> @@ -49,10 +50,7 @@
> #include <string.h>
> #include <sys/time.h>
> #include <sys/types.h>
> -#include <sys/socket.h>
> -#include <netdb.h>
> #include <netinet/in.h>
> -#include <arpa/inet.h>
> #include <sys/param.h>
> #include <sys/sysctl.h>
> #include <unistd.h>
> @@ -67,22 +65,6 @@
>
> #define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: " _f, ## _a)
>
> -#define UNREGISTER_EVENT(id) \
> - do { \
> - if (id >= 0) { \
> - tapdisk_server_unregister_event(id); \
> - id = -1; \
> - } \
> - } while (0)
> -
> -#define CLOSE_FD(fd) \
> - do { \
> - if (fd >= 0) { \
> - close(fd); \
> - fd = -1; \
> - } \
> - } while (0)
> -
> #define MAX_REMUS_REQUEST TAPDISK_DATA_REQUESTS
>
> enum tdremus_mode {
> @@ -92,13 +74,6 @@ enum tdremus_mode {
> mode_backup
> };
>
> -enum {
> - ERROR_INTERNAL = -1,
> - ERROR_IO = -2,
> - ERROR_CONNECTION = -3,
> - ERROR_CLOSE = -4,
> -};
> -
> struct tdremus_req {
> td_request_t treq;
> };
> @@ -167,21 +142,9 @@ struct ramdisk_write_cbdata {
>
> typedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq);
>
> -/*
> - * If cid, rid and wid are -1, fd must be -1. It means that
> - * we are in unpritected mode or we don't start to connect
> - * to backup.
> - * If fd is an valid fd:
> - * cid is valid, rid and wid must be invalid. It means that
> - * the connection is in progress.
> - * cid is invalid. rid or wid must be valid. It means that
> - * the connection is established.
> - */
> typedef struct poll_fd {
> int fd;
> - event_id_t cid;
> - event_id_t rid;
> - event_id_t wid;
> + event_id_t id;
> } poll_fd_t;
>
> struct tdremus_state {
> @@ -195,9 +158,7 @@ struct tdremus_state {
> char* msg_path; /* output completion message here */
> poll_fd_t msg_fd;
>
> - /* replication host */
> - struct sockaddr_in sa;
> - poll_fd_t server_fd; /* server listen port */
> + td_replication_connect_t t;
> poll_fd_t stream_fd; /* replication channel */
>
> /*
> @@ -777,28 +738,8 @@ static int mwrite(int fd, void* buf, size_t len)
> select(fd + 1, NULL, &wfds, NULL, &tv);
> }
>
> -
> -static void inline close_stream_fd(struct tdremus_state *s)
> -{
> -
> - UNREGISTER_EVENT(s->stream_fd.cid);
> - UNREGISTER_EVENT(s->stream_fd.rid);
> - UNREGISTER_EVENT(s->stream_fd.wid);
> -
> - /* close the connection */
> - CLOSE_FD(s->stream_fd.fd);
> -}
> -
> -static void close_server_fd(struct tdremus_state *s)
> -{
> - UNREGISTER_EVENT(s->server_fd.cid);
> - CLOSE_FD(s->server_fd.fd);
> -}
> -
> /* primary functions */
> -static void remus_client_event(event_id_t, char mode, void *private);
> -static void remus_connect_event(event_id_t id, char mode, void *private);
> -static void remus_retry_connect_event(event_id_t id, char mode, void *private);
> +static void remus_client_event(event_id_t id, char mode, void *private);
> static int primary_forward_request(struct tdremus_state *s,
> const td_request_t *treq);
>
> @@ -808,56 +749,15 @@ static int primary_forward_request(struct tdremus_state *s,
> */
> static void primary_failed(struct tdremus_state *s, int rc)
> {
> - close_stream_fd(s);
> + td_replication_connect_kill(&s->t);
> if (rc == ERROR_INTERNAL)
> RPRINTF("switch to unprotected mode due to internal error");
> if (rc == ERROR_CLOSE)
> RPRINTF("switch to unprotected mode before closing");
> + UNREGISTER_EVENT(s->stream_fd.id);
> switch_mode(s->tdremus_driver, mode_unprotected);
> }
>
> -static int primary_do_connect(struct tdremus_state *state)
> -{
> - event_id_t id;
> - int fd;
> - int rc;
> - int flags;
> -
> - RPRINTF("client connecting to %s:%d...\n",
> - inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
> -
> - if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
> - RPRINTF("could not create client socket: %d\n", errno);
> - return ERROR_INTERNAL;
> - }
> - state->stream_fd.fd = fd;
> -
> - /* make socket nonblocking */
> - if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
> - flags = 0;
> - if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
> - RPRINTF("error setting fd %d to non block mode\n", fd);
> - return ERROR_INTERNAL;
> - }
> -
> - /*
> - * once we have created the socket and populated the address,
> - * we can now start our non-blocking connect. rather than
> - * duplicating code we trigger a timeout on the socket fd,
> - * which calls out nonblocking connect code
> - */
> - if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0,
> - remus_retry_connect_event,
> - state)) < 0) {
> - RPRINTF("error registering timeout client connection event handler: %s\n",
> - strerror(id));
> - return ERROR_INTERNAL;
> - }
> -
> - state->stream_fd.cid = id;
> - return 0;
> -}
> -
> static int remus_handle_queued_io(struct tdremus_state *s)
> {
> struct req_ring *queued_io = &s->queued_io;
> @@ -882,184 +782,35 @@ static int remus_handle_queued_io(struct tdremus_state *s)
> return 0;
> }
>
> -static int remus_connection_done(struct tdremus_state *s)
> +static void remus_client_established(td_replication_connect_t *t, int rc)
> {
> + struct tdremus_state *s = CONTAINER_OF(t, *s, t);
> event_id_t id;
>
> - /* the connect succeeded */
> - /* unregister this function and register a new event handler */
> - tapdisk_server_unregister_event(s->stream_fd.cid);
> - s->stream_fd.cid = -1;
> + if (rc) {
> + primary_failed(s, rc);
> + return;
> + }
>
> - id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->stream_fd.fd,
> + /* the connect succeeded */
> + id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd,
> 0, remus_client_event, s);
> if(id < 0) {
> RPRINTF("error registering client event handler: %s\n",
> strerror(id));
> - return ERROR_INTERNAL;
> - }
> - s->stream_fd.rid = id;
> -
> - /* handle the queued requests */
> - return remus_handle_queued_io(s);
> -}
> -
> -static int remus_retry_connect(struct tdremus_state *s)
> -{
> - event_id_t id;
> -
> - tapdisk_server_unregister_event(s->stream_fd.cid);
> - s->stream_fd.cid = -1;
> -
> - RPRINTF("connect to backup 1 second later");
> - id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
> - s->stream_fd.fd,
> - REMUS_CONNRETRY_TIMEOUT,
> - remus_retry_connect_event, s);
> - if (id < 0) {
> - RPRINTF("error registering timeout client connection event handler: %s\n",
> - strerror(id));
> - return ERROR_INTERNAL;
> - }
> -
> - s->stream_fd.cid = id;
> - return 0;
> -}
> -
> -static int remus_wait_connect_done(struct tdremus_state *s)
> -{
> - event_id_t id;
> -
> - tapdisk_server_unregister_event(s->stream_fd.cid);
> - s->stream_fd.cid = -1;
> -
> - id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD,
> - s->stream_fd.fd, 0,
> - remus_connect_event, s);
> - if (id < 0) {
> - RPRINTF("error registering client connection event handler: %s\n",
> - strerror(id));
> - return ERROR_INTERNAL;
> - }
> - s->stream_fd.cid = id;
> -
> - return 0;
> -}
> -
> -/* return 1 if we need to reconnect to backup */
> -static int check_connect_errno(int err)
> -{
> - /*
> - * The fd is non-block, so we will not get ETIMEDOUT
> - * after calling connect(). We only can get this errno
> - * by getsockopt().
> - */
> - if (err == ECONNREFUSED || err == ENETUNREACH ||
> - err == EAGAIN || err == ECONNABORTED ||
> - err == ETIMEDOUT)
> - return 1;
> -
> - return 0;
> -}
> -
> -static void remus_retry_connect_event(event_id_t id, char mode, void *private)
> -{
> - struct tdremus_state *s = (struct tdremus_state *)private;
> - int rc, ret;
> -
> - /* do a non-blocking connect */
> - ret = connect(s->stream_fd.fd,
> - (struct sockaddr *)&s->sa,
> - sizeof(s->sa));
> - if (ret) {
> - if (errno == EINPROGRESS) {
> - /*
> - * the connect returned EINPROGRESS (nonblocking
> - * connect) we must wait for the fd to be writeable
> - * to determine if the connect worked
> - */
> - rc = remus_wait_connect_done(s);
> - if (rc)
> - goto fail;
> - return;
> - }
> -
> - if (check_connect_errno(errno)) {
> - rc = remus_retry_connect(s);
> - if (rc)
> - goto fail;
> - return;
> - }
> -
> - /* not recoverable */
> - RPRINTF("error connection to server %s\n", strerror(errno));
> - rc = ERROR_CONNECTION;
> - goto fail;
> - }
> -
> - /* The connection is established unexpectedly */
> - rc = remus_connection_done(s);
> - if (rc)
> - goto fail;
> -
> - return;
> -
> -fail:
> - primary_failed(s, rc);
> - return;
> -}
> -
> -/* callback when nonblocking connect() is finished */
> -static void remus_connect_event(event_id_t id, char mode, void *private)
> -{
> - int socket_errno;
> - socklen_t socket_errno_size;
> - struct tdremus_state *s = (struct tdremus_state *)private;
> - int rc;
> -
> - /* check to see if the connect succeeded */
> - socket_errno_size = sizeof(socket_errno);
> - if (getsockopt(s->stream_fd.fd, SOL_SOCKET, SO_ERROR,
> - &socket_errno, &socket_errno_size)) {
> - RPRINTF("error getting socket errno\n");
> + primary_failed(s, ERROR_INTERNAL);
> return;
> }
>
> - RPRINTF("socket connect returned %d\n", socket_errno);
> + s->stream_fd.fd = t->fd;
> + s->stream_fd.id = id;
>
> - if (socket_errno) {
> - /* the connect did not succeed */
> - if (check_connect_errno(socket_errno)) {
> - /*
> - * we can probably assume that the backup is down.
> - * just try again later
> - */
> - rc = remus_retry_connect(s);
> - if (rc)
> - goto fail;
> -
> - return;
> - } else {
> - RPRINTF("socket connect returned %d, giving up\n",
> - socket_errno);
> - rc = ERROR_CONNECTION;
> - goto fail;
> - }
> -
> - return;
> - }
> -
> - rc = remus_connection_done(s);
> + /* handle the queued requests */
> + rc = remus_handle_queued_io(s);
> if (rc)
> - goto fail;
> -
> - return;
> -
> -fail:
> - primary_failed(s, rc);
> + primary_failed(s, rc);
> }
>
> -
> /*
> * we install this event handler on the primary once we have
> * connected to the backup.
> @@ -1142,19 +893,21 @@ static int primary_forward_request(struct tdremus_state *s,
> static void primary_queue_write(td_driver_t *driver, td_request_t treq)
> {
> struct tdremus_state *s = (struct tdremus_state *)driver->data;
> - int rc;
> + int rc, ret;
>
> // RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd);
>
> - if(s->stream_fd.fd < 0) {
> + ret = td_replication_connect_status(&s->t);
> + if(ret == -1) {
> RPRINTF("connecting to backup...\n");
> - rc = primary_do_connect(s);
> + s->t.callback = remus_client_established;
> + rc = td_replication_client_start(&s->t);
> if (rc)
> goto fail;
> }
>
> /* The connection is not established, just queue the request */
> - if (s->stream_fd.cid >= 0) {
> + if (ret != 1) {
> ring_add_request(&s->queued_io, &treq);
> return;
> }
> @@ -1227,9 +980,7 @@ static int primary_start(td_driver_t *driver)
> s->queue_flush = primary_flush;
>
> s->stream_fd.fd = -1;
> - s->stream_fd.cid = -1;
> - s->stream_fd.rid = -1;
> - s->stream_fd.wid = -1;
> + s->stream_fd.id = -1;
>
> return 0;
> }
> @@ -1240,100 +991,32 @@ static void remus_server_event(event_id_t id, char mode, void *private);
> /* It is called when we find some I/O error */
> static void backup_failed(struct tdremus_state *s, int rc)
> {
> - close_stream_fd(s);
> - close_server_fd(s);
> + td_replication_connect_kill(&s->t);
> /* We will switch to unprotected mode in backup_queue_write() */
> }
>
> /* returns the socket that receives write requests */
> -static void remus_server_accept(event_id_t id, char mode, void* private)
> +static void remus_server_established(td_replication_connect_t *t, int rc)
> {
> - struct tdremus_state* s = (struct tdremus_state *) private;
> -
> - int stream_fd;
> -
> - /* XXX: add address-based black/white list */
> - if ((stream_fd = accept(s->server_fd.fd, NULL, NULL)) < 0) {
> - RPRINTF("error accepting connection: %d\n", errno);
> - return;
> - }
> + struct tdremus_state *s = CONTAINER_OF(t, *s, t);
> + event_id_t id;
>
> - /*
> - * TODO: check to see if we are already replicating.
> - * if so just close the connection (or do something
> - * smarter)
> - */
> - RPRINTF("server accepted connection\n");
> + /* rc is always 0 */
>
> /* add tapdisk event for replication stream */
> - id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd, 0,
> + id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd, 0,
> remus_server_event, s);
>
> if (id < 0) {
> RPRINTF("error registering connection event handler: %s\n",
> strerror(errno));
> - close(stream_fd);
> + td_replication_server_restart(t);
> return;
> }
>
> /* store replication file descriptor */
> - s->stream_fd.fd = stream_fd;
> - s->stream_fd.rid = id;
> -}
> -
> -/* returns -2 if EADDRNOTAVAIL */
> -static int remus_bind(struct tdremus_state* s)
> -{
> - int opt;
> - int rc = -1;
> - event_id_t id;
> -
> - if ((s->server_fd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
> - RPRINTF("could not create server socket: %d\n", errno);
> - return rc;
> - }
> -
> - opt = 1;
> - if (setsockopt(s->server_fd.fd, SOL_SOCKET,
> - SO_REUSEADDR, &opt, sizeof(opt)) < 0)
> - RPRINTF("Error setting REUSEADDR on %d: %d\n",
> - s->server_fd.fd, errno);
> -
> - if (bind(s->server_fd.fd, (struct sockaddr *)&s->sa,
> - sizeof(s->sa)) < 0) {
> - RPRINTF("could not bind server socket %d to %s:%d: %d %s\n",
> - s->server_fd.fd, inet_ntoa(s->sa.sin_addr),
> - ntohs(s->sa.sin_port), errno, strerror(errno));
> - if (errno == EADDRNOTAVAIL)
> - rc = -2;
> - goto err_sfd;
> - }
> -
> - if (listen(s->server_fd.fd, 10)) {
> - RPRINTF("could not listen on socket: %d\n", errno);
> - goto err_sfd;
> - }
> -
> - /*
> - * The socket s now bound to the address and listening so we
> - * may now register the fd with tapdisk
> - */
> - id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
> - s->server_fd.fd, 0,
> - remus_server_accept, s);
> - if (id < 0) {
> - RPRINTF("error registering server connection event handler: %s",
> - strerror(id));
> - goto err_sfd;
> - }
> - s->server_fd.cid = id;
> -
> - return 0;
> -
> -err_sfd:
> - CLOSE_FD(s->server_fd.fd);
> -
> - return rc;
> + s->stream_fd.fd = t->fd;
> + s->stream_fd.id = id;
> }
>
> /* wait for latest checkpoint to be applied */
> @@ -1566,90 +1249,6 @@ static int unprotected_start(td_driver_t *driver)
>
>
> /* control */
> -
> -static inline int resolve_address(const char* addr, struct in_addr* ia)
> -{
> - struct hostent* he;
> - uint32_t ip;
> -
> - if (!(he = gethostbyname(addr))) {
> - RPRINTF("error resolving %s: %d\n", addr, h_errno);
> - return -1;
> - }
> -
> - if (!he->h_addr_list[0]) {
> - RPRINTF("no address found for %s\n", addr);
> - return -1;
> - }
> -
> - /* network byte order */
> - ip = *((uint32_t**)he->h_addr_list)[0];
> - ia->s_addr = ip;
> -
> - return 0;
> -}
> -
> -static int get_args(td_driver_t *driver, const char* name)
> -{
> - struct tdremus_state *state = (struct tdremus_state *)driver->data;
> - char* host;
> - char* port;
> -// char* driver_str;
> -// char* parent;
> -// int type;
> -// char* path;
> -// unsigned long ulport;
> -// int i;
> -// struct sockaddr_in server_addr_in;
> -
> - int gai_status;
> - int valid_addr;
> - struct addrinfo gai_hints;
> - struct addrinfo *servinfo, *servinfo_itr;
> -
> - memset(&gai_hints, 0, sizeof gai_hints);
> - gai_hints.ai_family = AF_UNSPEC;
> - gai_hints.ai_socktype = SOCK_STREAM;
> -
> - port = strchr(name, ':');
> - if (!port) {
> - RPRINTF("missing host in %s\n", name);
> - return -ENOENT;
> - }
> - if (!(host = strndup(name, port - name))) {
> - RPRINTF("unable to allocate host\n");
> - return -ENOMEM;
> - }
> - port++;
> -
> - if ((gai_status = getaddrinfo(host, port, &gai_hints, &servinfo)) != 0) {
> - RPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status));
> - return -ENOENT;
> - }
> -
> - /* TODO: do something smarter here */
> - valid_addr = 0;
> - for(servinfo_itr = servinfo; servinfo_itr != NULL; servinfo_itr = servinfo_itr->ai_next) {
> - void *addr;
> - char *ipver;
> -
> - if (servinfo_itr->ai_family == AF_INET) {
> - valid_addr = 1;
> - memset(&state->sa, 0, sizeof(state->sa));
> - state->sa = *(struct sockaddr_in *)servinfo_itr->ai_addr;
> - break;
> - }
> - }
> - freeaddrinfo(servinfo);
> -
> - if (!valid_addr)
> - return -ENOENT;
> -
> - RPRINTF("host: %s, port: %d\n", inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
> -
> - return 0;
> -}
> -
> static int switch_mode(td_driver_t *driver, enum tdremus_mode mode)
> {
> struct tdremus_state *s = (struct tdremus_state *)driver->data;
> @@ -1844,11 +1443,11 @@ static int ctl_register(struct tdremus_state *s)
> RPRINTF("registering ctl fifo\n");
>
> /* register ctl fd */
> - s->ctl_fd.cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->ctl_fd.fd, 0, ctl_request, s);
> + s->ctl_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->ctl_fd.fd, 0, ctl_request, s);
>
> - if (s->ctl_fd.cid < 0) {
> + if (s->ctl_fd.id < 0) {
> RPRINTF("error registering ctrl FIFO %s: %d\n",
> - s->ctl_path, s->ctl_fd.cid);
> + s->ctl_path, s->ctl_fd.id);
> return -1;
> }
>
> @@ -1859,7 +1458,7 @@ static void ctl_unregister(struct tdremus_state *s)
> {
> RPRINTF("unregistering ctl fifo\n");
>
> - UNREGISTER_EVENT(s->ctl_fd.cid);
> + UNREGISTER_EVENT(s->ctl_fd.id);
> }
>
> /* interface */
> @@ -1867,6 +1466,7 @@ static void ctl_unregister(struct tdremus_state *s)
> static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid)
> {
> struct tdremus_state *s = (struct tdremus_state *)driver->data;
> + td_replication_connect_t *t = &s->t;
> int rc;
> const char *name = image->name;
> td_flag_t flags = image->flags;
> @@ -1877,7 +1477,6 @@ static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid)
> remus_image = image;
>
> memset(s, 0, sizeof(*s));
> - s->server_fd.fd = -1;
> s->stream_fd.fd = -1;
> s->ctl_fd.fd = -1;
> s->msg_fd.fd = -1;
> @@ -1886,8 +1485,12 @@ static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid)
> * the driver stack from the stream_fd event handler */
> s->tdremus_driver = driver;
>
> + t->log_prefix = "remus";
> + t->retry_timeout_s = REMUS_CONNRETRY_TIMEOUT;
> + t->max_connections = 10;
> + t->callback = remus_server_established;
> /* parse name to get info etc */
> - if ((rc = get_args(driver, name)))
> + if ((rc = td_replication_connect_init(t, name)))
> return rc;
>
> if ((rc = ctl_open(driver, name))) {
> @@ -1901,7 +1504,7 @@ static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid)
> return rc;
> }
>
> - if (!(rc = remus_bind(s)))
> + if (!(rc = td_replication_server_start(t)))
> rc = switch_mode(driver, mode_backup);
> else if (rc == -2)
> rc = switch_mode(driver, mode_primary);
> @@ -1932,8 +1535,7 @@ static int tdremus_close(td_driver_t *driver)
> if (s->ramdisk.inprogress)
> hashtable_destroy(s->ramdisk.inprogress, 0);
>
> - close_server_fd(s);
> - close_stream_fd(s);
> + td_replication_connect_kill(&s->t);
> ctl_unregister(s);
> ctl_close(s);
>
> diff --git a/tools/blktap2/drivers/block-replication.c b/tools/blktap2/drivers/block-replication.c
> new file mode 100644
> index 0000000..e4b2679
> --- /dev/null
> +++ b/tools/blktap2/drivers/block-replication.c
> @@ -0,0 +1,468 @@
> +/*
> + * Copyright (C) 2014 FUJITSU LIMITED
> + * Author: Wen Congyang <wency@cn.fujitsu.com>
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU Lesser General Public License as published
> + * by the Free Software Foundation; version 2.1 only. with the special
> + * exception on linking described in file LICENSE.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU Lesser General Public License for more details.
> + */
> +
> +#include "tapdisk-server.h"
> +#include "block-replication.h"
> +
> +#include <string.h>
> +#include <errno.h>
> +#include <sys/types.h>
> +#include <unistd.h>
> +#include <fcntl.h>
> +#include <syslog.h>
> +#include <stdlib.h>
> +#include <arpa/inet.h>
> +
> +#undef DPRINTF
> +#undef EPRINTF
> +#define DPRINTF(_f, _a...) syslog (LOG_DEBUG, "%s: " _f, log_prefix, ## _a)
> +#define EPRINTF(_f, _a...) syslog (LOG_ERR, "%s: " _f, log_prefix, ## _a)
> +
> +/* connection status */
> +enum {
> + connection_none,
> + connection_in_progress,
> + connection_established,
> + connection_closed,
> +};
> +
> +/* common functions */
> +/* args should be host:port */
> +static int get_args(td_replication_connect_t *t, const char* name)
> +{
> + char* host;
> + const char* port;
> + int gai_status;
> + int valid_addr;
> + struct addrinfo gai_hints;
> + struct addrinfo *servinfo, *servinfo_itr;
> + const char *log_prefix = t->log_prefix;
> +
> + memset(&gai_hints, 0, sizeof gai_hints);
> + gai_hints.ai_family = AF_UNSPEC;
> + gai_hints.ai_socktype = SOCK_STREAM;
> +
> + port = strchr(name, ':');
> + if (!port) {
> + EPRINTF("missing host in %s\n", name);
> + return -ENOENT;
> + }
> + if (!(host = strndup(name, port - name))) {
> + EPRINTF("unable to allocate host\n");
> + return -ENOMEM;
> + }
> + port++;
> + if ((gai_status = getaddrinfo(host, port,
> + &gai_hints, &servinfo)) != 0) {
> + EPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status));
> + free(host);
> + return -ENOENT;
> + }
> + free(host);
> +
> + /* TODO: do something smarter here */
> + valid_addr = 0;
> + for (servinfo_itr = servinfo; servinfo_itr != NULL;
> + servinfo_itr = servinfo_itr->ai_next) {
> + if (servinfo_itr->ai_family == AF_INET) {
> + valid_addr = 1;
> + memset(&t->sa, 0, sizeof(t->sa));
> + t->sa = *(struct sockaddr_in *)servinfo_itr->ai_addr;
> + break;
> + }
> + }
> + freeaddrinfo(servinfo);
> +
> + if (!valid_addr)
> + return -ENOENT;
> +
> + DPRINTF("host: %s, port: %d\n", inet_ntoa(t->sa.sin_addr),
> + ntohs(t->sa.sin_port));
> +
> + return 0;
> +}
> +
> +int td_replication_connect_init(td_replication_connect_t *t, const char *name)
> +{
> + int rc;
> +
> + rc = get_args(t, name);
> + if (rc)
> + return rc;
> +
> + t->listen_fd = -1;
> + t->id = -1;
> + t->status = connection_none;
> + return 0;
> +}
> +
> +int td_replication_connect_status(td_replication_connect_t *t)
> +{
> + const char *log_prefix = t->log_prefix;
> +
> + switch (t->status) {
> + case connection_none:
> + case connection_closed:
> + return -1;
> + case connection_in_progress:
> + return 0;
> + case connection_established:
> + return 1;
> + default:
> + EPRINTF("td_replication_connect is corruptted\n");
> + return -2;
> + }
> +}
> +
> +void td_replication_connect_kill(td_replication_connect_t *t)
> +{
> + if (t->status != connection_in_progress &&
> + t->status != connection_established)
> + return;
> +
> + UNREGISTER_EVENT(t->id);
> + CLOSE_FD(t->fd);
> + CLOSE_FD(t->listen_fd);
> + t->status = connection_closed;
> +}
> +
> +/* server */
> +static void td_replication_server_accept(event_id_t id, char mode,
> + void *private);
> +
> +int td_replication_server_start(td_replication_connect_t *t)
> +{
> + int opt;
> + int rc = -1;
> + event_id_t id;
> + int fd;
> + const char *log_prefix = t->log_prefix;
> +
> + if (t->status == connection_in_progress ||
> + t->status == connection_established)
> + return rc;
> +
> + fd = socket(AF_INET, SOCK_STREAM, 0);
> + if (fd < 0) {
> + EPRINTF("could not create server socket: %d\n", errno);
> + return rc;
> + }
> +
> + opt = 1;
> + if (setsockopt(fd, SOL_SOCKET,
> + SO_REUSEADDR, &opt, sizeof(opt)) < 0)
> + DPRINTF("Error setting REUSEADDR on %d: %d\n", fd, errno);
> +
> + if (bind(fd, (struct sockaddr *)&t->sa, sizeof(t->sa)) < 0) {
> + DPRINTF("could not bind server socket %d to %s:%d: %d %s\n",
> + fd, inet_ntoa(t->sa.sin_addr),
> + ntohs(t->sa.sin_port), errno, strerror(errno));
> + if (errno == EADDRNOTAVAIL)
> + rc = -2;
> + goto err;
> + }
> +
> + if (listen(fd, t->max_connections)) {
> + EPRINTF("could not listen on socket: %d\n", errno);
> + goto err;
> + }
> +
> + /*
> + * The socket is now bound to the address and listening so we
> + * may now register the fd with tapdisk
> + */
> + id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
> + fd, 0,
> + td_replication_server_accept, t);
> + if (id < 0) {
> + EPRINTF("error registering server connection event handler: %s",
> + strerror(id));
> + goto err;
> + }
> + t->listen_fd = fd;
> + t->id = id;
> + t->status = connection_in_progress;
> +
> + return 0;
> +
> +err:
> + close(fd);
> + return rc;
> +}
> +
> +static void td_replication_server_accept(event_id_t id, char mode,
> + void *private)
> +{
> + td_replication_connect_t *t = private;
> + int fd;
> + const char *log_prefix = t->log_prefix;
> +
> + /* XXX: add address-based black/white list */
> + fd = accept(t->listen_fd, NULL, NULL);
> + if (fd < 0) {
> + EPRINTF("error accepting connection: %d\n", errno);
> + return;
> + }
> +
> + if (t->status == connection_established) {
> + EPRINTF("connection is already established\n");
> + close(fd);
> + return;
> + }
> +
> + DPRINTF("server accepted connection\n");
> + t->fd = fd;
> + t->status = connection_established;
> + t->callback(t, 0);
> +}
> +
> +int td_replication_server_restart(td_replication_connect_t *t)
> +{
> + switch (t->status) {
> + case connection_in_progress:
> + return 0;
> + case connection_established:
> + CLOSE_FD(t->fd);
> + t->status = connection_in_progress;
> + return 0;
> + case connection_none:
> + case connection_closed:
> + return td_replication_server_start(t);
> + default:
> + /* not reached */
> + return -1;
> + }
> +}
> +
> +/* client */
> +static void td_replication_retry_connect_event(event_id_t id, char mode,
> + void *private);
> +static void td_replication_connect_event(event_id_t id, char mode,
> + void *private);
> +int td_replication_client_start(td_replication_connect_t *t)
> +{
> + event_id_t id;
> + int fd;
> + int rc;
> + int flags;
> + const char *log_prefix = t->log_prefix;
> +
> + if (t->status == connection_in_progress ||
> + t->status == connection_established)
> + return ERROR_INTERNAL;
> +
> + DPRINTF("client connecting to %s:%d...\n",
> + inet_ntoa(t->sa.sin_addr), ntohs(t->sa.sin_port));
> +
> + if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
> + EPRINTF("could not create client socket: %d\n", errno);
> + return ERROR_INTERNAL;
> + }
> +
> + /* make socket nonblocking */
> + if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
> + flags = 0;
> + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
> + EPRINTF("error setting fd %d to non block mode\n", fd);
> + goto err;
> + }
> +
> + /*
> + * once we have created the socket and populated the address,
> + * we can now start our non-blocking connect. rather than
> + * duplicating code we trigger a timeout on the socket fd,
> + * which calls out nonblocking connect code
> + */
> + id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0,
> + td_replication_retry_connect_event,
> + t);
> + if(id < 0) {
> + EPRINTF("error registering timeout client connection event handler: %s\n",
> + strerror(id));
> + goto err;
> + }
> +
> + t->fd = fd;
> + t->id = id;
> + t->status = connection_in_progress;
> + return 0;
> +
> +err:
> + close(fd);
> + return ERROR_INTERNAL;
> +}
> +
> +static void td_replication_client_failed(td_replication_connect_t *t, int rc)
> +{
> + td_replication_connect_kill(t);
> + t->callback(t, rc);
> +}
> +
> +static void td_replication_client_done(td_replication_connect_t *t)
> +{
> + UNREGISTER_EVENT(t->id);
> + t->status = connection_established;
> + t->callback(t, 0);
> +}
> +
> +static int td_replication_retry_connect(td_replication_connect_t *t)
> +{
> + event_id_t id;
> + const char *log_prefix = t->log_prefix;
> +
> + UNREGISTER_EVENT(t->id);
> +
> + DPRINTF("connect to server 1 second later");
> + id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
> + t->fd, t->retry_timeout_s,
> + td_replication_retry_connect_event,
> + t);
> + if (id < 0) {
> + EPRINTF("error registering timeout client connection event handler: %s\n",
> + strerror(id));
> + return ERROR_INTERNAL;
> + }
> +
> + t->id = id;
> + return 0;
> +}
> +
> +static int td_replication_wait_connect_done(td_replication_connect_t *t)
> +{
> + event_id_t id;
> + const char *log_prefix = t->log_prefix;
> +
> + UNREGISTER_EVENT(t->id);
> +
> + id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD,
> + t->fd, 0,
> + td_replication_connect_event, t);
> + if (id < 0) {
> + EPRINTF("error registering client connection event handler: %s\n",
> + strerror(id));
> + return ERROR_INTERNAL;
> + }
> + t->id = id;
> +
> + return 0;
> +}
> +
> +/* return 1 if we need to reconnect to backup server */
> +static int check_connect_errno(int err)
> +{
> + /*
> + * The fd is non-block, so we will not get ETIMEDOUT
> + * after calling connect(). We only can get this errno
> + * by getsockopt().
> + */
> + if (err == ECONNREFUSED || err == ENETUNREACH ||
> + err == EAGAIN || err == ECONNABORTED ||
> + err == ETIMEDOUT)
> + return 1;
> +
> + return 0;
> +}
> +
> +static void td_replication_retry_connect_event(event_id_t id, char mode,
> + void *private)
> +{
> + td_replication_connect_t *t = private;
> + int rc, ret;
> + const char *log_prefix = t->log_prefix;
> +
> + /* do a non-blocking connect */
> + ret = connect(t->fd, (struct sockaddr *)&t->sa, sizeof(t->sa));
> + if (ret) {
> + if (errno == EINPROGRESS) {
> + /*
> + * the connect returned EINPROGRESS (nonblocking
> + * connect) we must wait for the fd to be writeable
> + * to determine if the connect worked
> + */
> + rc = td_replication_wait_connect_done(t);
> + if (rc)
> + goto fail;
> + return;
> + }
> +
> + if (check_connect_errno(errno)) {
> + rc = td_replication_retry_connect(t);
> + if (rc)
> + goto fail;
> + return;
> + }
> +
> + /* not recoverable */
> + EPRINTF("error connection to server %s\n", strerror(errno));
> + rc = ERROR_CONNECTION;
> + goto fail;
> + }
> +
> + /* The connection is established unexpectedly */
> + td_replication_client_done(t);
> +
> + return;
> +
> +fail:
> + td_replication_client_failed(t, rc);
> +}
> +
> +/* callback when nonblocking connect() is finished */
> +static void td_replication_connect_event(event_id_t id, char mode,
> + void *private)
> +{
> + int socket_errno;
> + socklen_t socket_errno_size;
> + td_replication_connect_t *t = private;
> + int rc;
> + const char *log_prefix = t->log_prefix;
> +
> + /* check to see if the connect succeeded */
> + socket_errno_size = sizeof(socket_errno);
> + if (getsockopt(t->fd, SOL_SOCKET, SO_ERROR,
> + &socket_errno, &socket_errno_size)) {
> + EPRINTF("error getting socket errno\n");
> + return;
> + }
> +
> + DPRINTF("socket connect returned %d\n", socket_errno);
> +
> + if (socket_errno) {
> + /* the connect did not succeed */
> + if (check_connect_errno(socket_errno)) {
> + /*
> + * we can probably assume that the backup is down.
> + * just try again later
> + */
> + rc = td_replication_retry_connect(t);
> + if (rc)
> + goto fail;
> +
> + return;
> + } else {
> + EPRINTF("socket connect returned %d, giving up\n",
> + socket_errno);
> + rc = ERROR_CONNECTION;
> + goto fail;
> + }
> + }
> +
> + td_replication_client_done(t);
> +
> + return;
> +
> +fail:
> + td_replication_client_failed(t, rc);
> +}
> diff --git a/tools/blktap2/drivers/block-replication.h b/tools/blktap2/drivers/block-replication.h
> new file mode 100644
> index 0000000..0bd6e71
> --- /dev/null
> +++ b/tools/blktap2/drivers/block-replication.h
> @@ -0,0 +1,113 @@
> +/*
> + * Copyright (C) 2014 FUJITSU LIMITED
> + * Author: Wen Congyang <wency@cn.fujitsu.com>
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU Lesser General Public License as published
> + * by the Free Software Foundation; version 2.1 only. with the special
> + * exception on linking described in file LICENSE.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU Lesser General Public License for more details.
> + */
> +
> +#ifndef BLOCK_REPLICATION_H
> +#define BLOCK_REPLICATION_H
> +
> +#include "scheduler.h"
> +#include <sys/socket.h>
> +#include <netdb.h>
> +
> +#define CONTAINER_OF(inner_ptr, outer, member_name) \
> + ({ \
> + typeof(outer) *container_of_; \
> + container_of_ = (void*)((char*)(inner_ptr) - \
> + offsetof(typeof(outer), member_name)); \
> + (void)(&container_of_->member_name == \
> + (typeof(inner_ptr))0) /* type check */; \
> + container_of_; \
> + })
> +
> +#define UNREGISTER_EVENT(id) \
> + do { \
> + if (id >= 0) { \
> + tapdisk_server_unregister_event(id); \
> + id = -1; \
> + } \
> + } while (0)
> +#define CLOSE_FD(fd) \
> + do { \
> + if (fd >= 0) { \
> + close(fd); \
> + fd = -1; \
> + } \
> + } while (0)
> +
> +enum {
> + ERROR_INTERNAL = -1,
> + ERROR_IO = -2,
> + ERROR_CONNECTION = -3,
> + ERROR_CLOSE = -4,
> +};
> +
> +typedef struct td_replication_connect td_replication_connect_t;
> +typedef void td_replication_callback(td_replication_connect_t *r, int rc);
> +
> +struct td_replication_connect {
> + /*
> + * caller must fill these in before calling
> + * td_replication_connect_init()
> + */
> + const char *log_prefix;
> + td_replication_callback *callback;
> + int retry_timeout_s;
> + int max_connections;
> + /*
> + * The caller uses this fd to read/write after
> + * the connection is established
> + */
> + int fd;
> +
> + /* private */
> + struct sockaddr_in sa;
> + int listen_fd;
> + event_id_t id;
> +
> + int status;
> +};
> +
> +/* return -errno if failure happened, otherwise return 0 */
> +int td_replication_connect_init(td_replication_connect_t *t, const char *name);
> +/*
> + * Return value:
> + * -1: connection is closed or not connected
> + * 0: connection is in progress
> + * 1: connection is established
> + */
> +int td_replication_connect_status(td_replication_connect_t *t);
> +void td_replication_connect_kill(td_replication_connect_t *t);
> +
> +/*
> + * Return value:
> + * -2: this caller should be client
> + * -1: error
> + * 0: connection is in progress
> + */
> +int td_replication_server_start(td_replication_connect_t *t);
> +/*
> + * Return value:
> + * -2: this caller should be client
> + * -1: error
> + * 0: connection is in progress
> + */
> +int td_replication_server_restart(td_replication_connect_t *t);
> +/*
> + * Return value:
> + * -1: error
> + * 0: connection is in progress
> + */
> +int td_replication_client_start(td_replication_connect_t *t);
> +
> +#endif
> --
> 1.9.3
>
Acked-by: Shriram Rajagopalan <rshriram@cs.ubc.ca>