On Sep 5, 2014 5:31 AM, "Wen Congyang" wrote: > > COLO will reuse them. > > Signed-off-by: Wen Congyang > Cc: Shriram Rajagopalan > --- > tools/blktap2/drivers/Makefile | 2 +- > tools/blktap2/drivers/block-remus.c | 494 +++--------------------------- > tools/blktap2/drivers/block-replication.c | 468 ++++++++++++++++++++++++++++ > tools/blktap2/drivers/block-replication.h | 113 +++++++ > 4 files changed, 630 insertions(+), 447 deletions(-) > create mode 100644 tools/blktap2/drivers/block-replication.c > create mode 100644 tools/blktap2/drivers/block-replication.h > > diff --git a/tools/blktap2/drivers/Makefile b/tools/blktap2/drivers/Makefile > index 37c3485..3d8ed8a 100644 > --- a/tools/blktap2/drivers/Makefile > +++ b/tools/blktap2/drivers/Makefile > @@ -23,7 +23,7 @@ endif > > VHDLIBS := -L$(LIBVHDDIR) -lvhd > > -REMUS-OBJS := block-remus.o > +REMUS-OBJS := block-remus.o block-replication.o > REMUS-OBJS += hashtable.o > REMUS-OBJS += hashtable_itr.o > REMUS-OBJS += hashtable_utility.o > diff --git a/tools/blktap2/drivers/block-remus.c b/tools/blktap2/drivers/block-remus.c > index 5d27d41..8b6f157 100644 > --- a/tools/blktap2/drivers/block-remus.c > +++ b/tools/blktap2/drivers/block-remus.c > @@ -40,6 +40,7 @@ > #include "hashtable.h" > #include "hashtable_itr.h" > #include "hashtable_utility.h" > +#include "block-replication.h" > > #include > #include > @@ -49,10 +50,7 @@ > #include > #include > #include > -#include > -#include > #include > -#include > #include > #include > #include > @@ -67,22 +65,6 @@ > > #define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: " _f, ## _a) > > -#define UNREGISTER_EVENT(id) \ > - do { \ > - if (id >= 0) { \ > - tapdisk_server_unregister_event(id); \ > - id = -1; \ > - } \ > - } while (0) > - > -#define CLOSE_FD(fd) \ > - do { \ > - if (fd >= 0) { \ > - close(fd); \ > - fd = -1; \ > - } \ > - } while (0) > - > #define MAX_REMUS_REQUEST TAPDISK_DATA_REQUESTS > > enum tdremus_mode { > @@ -92,13 +74,6 @@ enum tdremus_mode { > mode_backup > }; > > -enum { > - ERROR_INTERNAL = -1, > - ERROR_IO = -2, > - ERROR_CONNECTION = -3, > - ERROR_CLOSE = -4, > -}; > - > struct tdremus_req { > td_request_t treq; > }; > @@ -167,21 +142,9 @@ struct ramdisk_write_cbdata { > > typedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq); > > -/* > - * If cid, rid and wid are -1, fd must be -1. It means that > - * we are in unpritected mode or we don't start to connect > - * to backup. > - * If fd is an valid fd: > - * cid is valid, rid and wid must be invalid. It means that > - * the connection is in progress. > - * cid is invalid. rid or wid must be valid. It means that > - * the connection is established. > - */ > typedef struct poll_fd { > int fd; > - event_id_t cid; > - event_id_t rid; > - event_id_t wid; > + event_id_t id; > } poll_fd_t; > > struct tdremus_state { > @@ -195,9 +158,7 @@ struct tdremus_state { > char* msg_path; /* output completion message here */ > poll_fd_t msg_fd; > > - /* replication host */ > - struct sockaddr_in sa; > - poll_fd_t server_fd; /* server listen port */ > + td_replication_connect_t t; > poll_fd_t stream_fd; /* replication channel */ > > /* > @@ -777,28 +738,8 @@ static int mwrite(int fd, void* buf, size_t len) > select(fd + 1, NULL, &wfds, NULL, &tv); > } > > - > -static void inline close_stream_fd(struct tdremus_state *s) > -{ > - > - UNREGISTER_EVENT(s->stream_fd.cid); > - UNREGISTER_EVENT(s->stream_fd.rid); > - UNREGISTER_EVENT(s->stream_fd.wid); > - > - /* close the connection */ > - CLOSE_FD(s->stream_fd.fd); > -} > - > -static void close_server_fd(struct tdremus_state *s) > -{ > - UNREGISTER_EVENT(s->server_fd.cid); > - CLOSE_FD(s->server_fd.fd); > -} > - > /* primary functions */ > -static void remus_client_event(event_id_t, char mode, void *private); > -static void remus_connect_event(event_id_t id, char mode, void *private); > -static void remus_retry_connect_event(event_id_t id, char mode, void *private); > +static void remus_client_event(event_id_t id, char mode, void *private); > static int primary_forward_request(struct tdremus_state *s, > const td_request_t *treq); > > @@ -808,56 +749,15 @@ static int primary_forward_request(struct tdremus_state *s, > */ > static void primary_failed(struct tdremus_state *s, int rc) > { > - close_stream_fd(s); > + td_replication_connect_kill(&s->t); > if (rc == ERROR_INTERNAL) > RPRINTF("switch to unprotected mode due to internal error"); > if (rc == ERROR_CLOSE) > RPRINTF("switch to unprotected mode before closing"); > + UNREGISTER_EVENT(s->stream_fd.id); > switch_mode(s->tdremus_driver, mode_unprotected); > } > > -static int primary_do_connect(struct tdremus_state *state) > -{ > - event_id_t id; > - int fd; > - int rc; > - int flags; > - > - RPRINTF("client connecting to %s:%d...\n", > - inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port)); > - > - if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) { > - RPRINTF("could not create client socket: %d\n", errno); > - return ERROR_INTERNAL; > - } > - state->stream_fd.fd = fd; > - > - /* make socket nonblocking */ > - if ((flags = fcntl(fd, F_GETFL, 0)) == -1) > - flags = 0; > - if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) { > - RPRINTF("error setting fd %d to non block mode\n", fd); > - return ERROR_INTERNAL; > - } > - > - /* > - * once we have created the socket and populated the address, > - * we can now start our non-blocking connect. rather than > - * duplicating code we trigger a timeout on the socket fd, > - * which calls out nonblocking connect code > - */ > - if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0, > - remus_retry_connect_event, > - state)) < 0) { > - RPRINTF("error registering timeout client connection event handler: %s\n", > - strerror(id)); > - return ERROR_INTERNAL; > - } > - > - state->stream_fd.cid = id; > - return 0; > -} > - > static int remus_handle_queued_io(struct tdremus_state *s) > { > struct req_ring *queued_io = &s->queued_io; > @@ -882,184 +782,35 @@ static int remus_handle_queued_io(struct tdremus_state *s) > return 0; > } > > -static int remus_connection_done(struct tdremus_state *s) > +static void remus_client_established(td_replication_connect_t *t, int rc) > { > + struct tdremus_state *s = CONTAINER_OF(t, *s, t); > event_id_t id; > > - /* the connect succeeded */ > - /* unregister this function and register a new event handler */ > - tapdisk_server_unregister_event(s->stream_fd.cid); > - s->stream_fd.cid = -1; > + if (rc) { > + primary_failed(s, rc); > + return; > + } > > - id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->stream_fd.fd, > + /* the connect succeeded */ > + id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd, > 0, remus_client_event, s); > if(id < 0) { > RPRINTF("error registering client event handler: %s\n", > strerror(id)); > - return ERROR_INTERNAL; > - } > - s->stream_fd.rid = id; > - > - /* handle the queued requests */ > - return remus_handle_queued_io(s); > -} > - > -static int remus_retry_connect(struct tdremus_state *s) > -{ > - event_id_t id; > - > - tapdisk_server_unregister_event(s->stream_fd.cid); > - s->stream_fd.cid = -1; > - > - RPRINTF("connect to backup 1 second later"); > - id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, > - s->stream_fd.fd, > - REMUS_CONNRETRY_TIMEOUT, > - remus_retry_connect_event, s); > - if (id < 0) { > - RPRINTF("error registering timeout client connection event handler: %s\n", > - strerror(id)); > - return ERROR_INTERNAL; > - } > - > - s->stream_fd.cid = id; > - return 0; > -} > - > -static int remus_wait_connect_done(struct tdremus_state *s) > -{ > - event_id_t id; > - > - tapdisk_server_unregister_event(s->stream_fd.cid); > - s->stream_fd.cid = -1; > - > - id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD, > - s->stream_fd.fd, 0, > - remus_connect_event, s); > - if (id < 0) { > - RPRINTF("error registering client connection event handler: %s\n", > - strerror(id)); > - return ERROR_INTERNAL; > - } > - s->stream_fd.cid = id; > - > - return 0; > -} > - > -/* return 1 if we need to reconnect to backup */ > -static int check_connect_errno(int err) > -{ > - /* > - * The fd is non-block, so we will not get ETIMEDOUT > - * after calling connect(). We only can get this errno > - * by getsockopt(). > - */ > - if (err == ECONNREFUSED || err == ENETUNREACH || > - err == EAGAIN || err == ECONNABORTED || > - err == ETIMEDOUT) > - return 1; > - > - return 0; > -} > - > -static void remus_retry_connect_event(event_id_t id, char mode, void *private) > -{ > - struct tdremus_state *s = (struct tdremus_state *)private; > - int rc, ret; > - > - /* do a non-blocking connect */ > - ret = connect(s->stream_fd.fd, > - (struct sockaddr *)&s->sa, > - sizeof(s->sa)); > - if (ret) { > - if (errno == EINPROGRESS) { > - /* > - * the connect returned EINPROGRESS (nonblocking > - * connect) we must wait for the fd to be writeable > - * to determine if the connect worked > - */ > - rc = remus_wait_connect_done(s); > - if (rc) > - goto fail; > - return; > - } > - > - if (check_connect_errno(errno)) { > - rc = remus_retry_connect(s); > - if (rc) > - goto fail; > - return; > - } > - > - /* not recoverable */ > - RPRINTF("error connection to server %s\n", strerror(errno)); > - rc = ERROR_CONNECTION; > - goto fail; > - } > - > - /* The connection is established unexpectedly */ > - rc = remus_connection_done(s); > - if (rc) > - goto fail; > - > - return; > - > -fail: > - primary_failed(s, rc); > - return; > -} > - > -/* callback when nonblocking connect() is finished */ > -static void remus_connect_event(event_id_t id, char mode, void *private) > -{ > - int socket_errno; > - socklen_t socket_errno_size; > - struct tdremus_state *s = (struct tdremus_state *)private; > - int rc; > - > - /* check to see if the connect succeeded */ > - socket_errno_size = sizeof(socket_errno); > - if (getsockopt(s->stream_fd.fd, SOL_SOCKET, SO_ERROR, > - &socket_errno, &socket_errno_size)) { > - RPRINTF("error getting socket errno\n"); > + primary_failed(s, ERROR_INTERNAL); > return; > } > > - RPRINTF("socket connect returned %d\n", socket_errno); > + s->stream_fd.fd = t->fd; > + s->stream_fd.id = id; > > - if (socket_errno) { > - /* the connect did not succeed */ > - if (check_connect_errno(socket_errno)) { > - /* > - * we can probably assume that the backup is down. > - * just try again later > - */ > - rc = remus_retry_connect(s); > - if (rc) > - goto fail; > - > - return; > - } else { > - RPRINTF("socket connect returned %d, giving up\n", > - socket_errno); > - rc = ERROR_CONNECTION; > - goto fail; > - } > - > - return; > - } > - > - rc = remus_connection_done(s); > + /* handle the queued requests */ > + rc = remus_handle_queued_io(s); > if (rc) > - goto fail; > - > - return; > - > -fail: > - primary_failed(s, rc); > + primary_failed(s, rc); > } > > - > /* > * we install this event handler on the primary once we have > * connected to the backup. > @@ -1142,19 +893,21 @@ static int primary_forward_request(struct tdremus_state *s, > static void primary_queue_write(td_driver_t *driver, td_request_t treq) > { > struct tdremus_state *s = (struct tdremus_state *)driver->data; > - int rc; > + int rc, ret; > > // RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd); > > - if(s->stream_fd.fd < 0) { > + ret = td_replication_connect_status(&s->t); > + if(ret == -1) { > RPRINTF("connecting to backup...\n"); > - rc = primary_do_connect(s); > + s->t.callback = remus_client_established; > + rc = td_replication_client_start(&s->t); > if (rc) > goto fail; > } > > /* The connection is not established, just queue the request */ > - if (s->stream_fd.cid >= 0) { > + if (ret != 1) { > ring_add_request(&s->queued_io, &treq); > return; > } > @@ -1227,9 +980,7 @@ static int primary_start(td_driver_t *driver) > s->queue_flush = primary_flush; > > s->stream_fd.fd = -1; > - s->stream_fd.cid = -1; > - s->stream_fd.rid = -1; > - s->stream_fd.wid = -1; > + s->stream_fd.id = -1; > > return 0; > } > @@ -1240,100 +991,32 @@ static void remus_server_event(event_id_t id, char mode, void *private); > /* It is called when we find some I/O error */ > static void backup_failed(struct tdremus_state *s, int rc) > { > - close_stream_fd(s); > - close_server_fd(s); > + td_replication_connect_kill(&s->t); > /* We will switch to unprotected mode in backup_queue_write() */ > } > > /* returns the socket that receives write requests */ > -static void remus_server_accept(event_id_t id, char mode, void* private) > +static void remus_server_established(td_replication_connect_t *t, int rc) > { > - struct tdremus_state* s = (struct tdremus_state *) private; > - > - int stream_fd; > - > - /* XXX: add address-based black/white list */ > - if ((stream_fd = accept(s->server_fd.fd, NULL, NULL)) < 0) { > - RPRINTF("error accepting connection: %d\n", errno); > - return; > - } > + struct tdremus_state *s = CONTAINER_OF(t, *s, t); > + event_id_t id; > > - /* > - * TODO: check to see if we are already replicating. > - * if so just close the connection (or do something > - * smarter) > - */ > - RPRINTF("server accepted connection\n"); > + /* rc is always 0 */ > > /* add tapdisk event for replication stream */ > - id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd, 0, > + id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd, 0, > remus_server_event, s); > > if (id < 0) { > RPRINTF("error registering connection event handler: %s\n", > strerror(errno)); > - close(stream_fd); > + td_replication_server_restart(t); > return; > } > > /* store replication file descriptor */ > - s->stream_fd.fd = stream_fd; > - s->stream_fd.rid = id; > -} > - > -/* returns -2 if EADDRNOTAVAIL */ > -static int remus_bind(struct tdremus_state* s) > -{ > - int opt; > - int rc = -1; > - event_id_t id; > - > - if ((s->server_fd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { > - RPRINTF("could not create server socket: %d\n", errno); > - return rc; > - } > - > - opt = 1; > - if (setsockopt(s->server_fd.fd, SOL_SOCKET, > - SO_REUSEADDR, &opt, sizeof(opt)) < 0) > - RPRINTF("Error setting REUSEADDR on %d: %d\n", > - s->server_fd.fd, errno); > - > - if (bind(s->server_fd.fd, (struct sockaddr *)&s->sa, > - sizeof(s->sa)) < 0) { > - RPRINTF("could not bind server socket %d to %s:%d: %d %s\n", > - s->server_fd.fd, inet_ntoa(s->sa.sin_addr), > - ntohs(s->sa.sin_port), errno, strerror(errno)); > - if (errno == EADDRNOTAVAIL) > - rc = -2; > - goto err_sfd; > - } > - > - if (listen(s->server_fd.fd, 10)) { > - RPRINTF("could not listen on socket: %d\n", errno); > - goto err_sfd; > - } > - > - /* > - * The socket s now bound to the address and listening so we > - * may now register the fd with tapdisk > - */ > - id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, > - s->server_fd.fd, 0, > - remus_server_accept, s); > - if (id < 0) { > - RPRINTF("error registering server connection event handler: %s", > - strerror(id)); > - goto err_sfd; > - } > - s->server_fd.cid = id; > - > - return 0; > - > -err_sfd: > - CLOSE_FD(s->server_fd.fd); > - > - return rc; > + s->stream_fd.fd = t->fd; > + s->stream_fd.id = id; > } > > /* wait for latest checkpoint to be applied */ > @@ -1566,90 +1249,6 @@ static int unprotected_start(td_driver_t *driver) > > > /* control */ > - > -static inline int resolve_address(const char* addr, struct in_addr* ia) > -{ > - struct hostent* he; > - uint32_t ip; > - > - if (!(he = gethostbyname(addr))) { > - RPRINTF("error resolving %s: %d\n", addr, h_errno); > - return -1; > - } > - > - if (!he->h_addr_list[0]) { > - RPRINTF("no address found for %s\n", addr); > - return -1; > - } > - > - /* network byte order */ > - ip = *((uint32_t**)he->h_addr_list)[0]; > - ia->s_addr = ip; > - > - return 0; > -} > - > -static int get_args(td_driver_t *driver, const char* name) > -{ > - struct tdremus_state *state = (struct tdremus_state *)driver->data; > - char* host; > - char* port; > -// char* driver_str; > -// char* parent; > -// int type; > -// char* path; > -// unsigned long ulport; > -// int i; > -// struct sockaddr_in server_addr_in; > - > - int gai_status; > - int valid_addr; > - struct addrinfo gai_hints; > - struct addrinfo *servinfo, *servinfo_itr; > - > - memset(&gai_hints, 0, sizeof gai_hints); > - gai_hints.ai_family = AF_UNSPEC; > - gai_hints.ai_socktype = SOCK_STREAM; > - > - port = strchr(name, ':'); > - if (!port) { > - RPRINTF("missing host in %s\n", name); > - return -ENOENT; > - } > - if (!(host = strndup(name, port - name))) { > - RPRINTF("unable to allocate host\n"); > - return -ENOMEM; > - } > - port++; > - > - if ((gai_status = getaddrinfo(host, port, &gai_hints, &servinfo)) != 0) { > - RPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status)); > - return -ENOENT; > - } > - > - /* TODO: do something smarter here */ > - valid_addr = 0; > - for(servinfo_itr = servinfo; servinfo_itr != NULL; servinfo_itr = servinfo_itr->ai_next) { > - void *addr; > - char *ipver; > - > - if (servinfo_itr->ai_family == AF_INET) { > - valid_addr = 1; > - memset(&state->sa, 0, sizeof(state->sa)); > - state->sa = *(struct sockaddr_in *)servinfo_itr->ai_addr; > - break; > - } > - } > - freeaddrinfo(servinfo); > - > - if (!valid_addr) > - return -ENOENT; > - > - RPRINTF("host: %s, port: %d\n", inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port)); > - > - return 0; > -} > - > static int switch_mode(td_driver_t *driver, enum tdremus_mode mode) > { > struct tdremus_state *s = (struct tdremus_state *)driver->data; > @@ -1844,11 +1443,11 @@ static int ctl_register(struct tdremus_state *s) > RPRINTF("registering ctl fifo\n"); > > /* register ctl fd */ > - s->ctl_fd.cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->ctl_fd.fd, 0, ctl_request, s); > + s->ctl_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->ctl_fd.fd, 0, ctl_request, s); > > - if (s->ctl_fd.cid < 0) { > + if (s->ctl_fd.id < 0) { > RPRINTF("error registering ctrl FIFO %s: %d\n", > - s->ctl_path, s->ctl_fd.cid); > + s->ctl_path, s->ctl_fd.id); > return -1; > } > > @@ -1859,7 +1458,7 @@ static void ctl_unregister(struct tdremus_state *s) > { > RPRINTF("unregistering ctl fifo\n"); > > - UNREGISTER_EVENT(s->ctl_fd.cid); > + UNREGISTER_EVENT(s->ctl_fd.id); > } > > /* interface */ > @@ -1867,6 +1466,7 @@ static void ctl_unregister(struct tdremus_state *s) > static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid) > { > struct tdremus_state *s = (struct tdremus_state *)driver->data; > + td_replication_connect_t *t = &s->t; > int rc; > const char *name = image->name; > td_flag_t flags = image->flags; > @@ -1877,7 +1477,6 @@ static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid) > remus_image = image; > > memset(s, 0, sizeof(*s)); > - s->server_fd.fd = -1; > s->stream_fd.fd = -1; > s->ctl_fd.fd = -1; > s->msg_fd.fd = -1; > @@ -1886,8 +1485,12 @@ static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid) > * the driver stack from the stream_fd event handler */ > s->tdremus_driver = driver; > > + t->log_prefix = "remus"; > + t->retry_timeout_s = REMUS_CONNRETRY_TIMEOUT; > + t->max_connections = 10; > + t->callback = remus_server_established; > /* parse name to get info etc */ > - if ((rc = get_args(driver, name))) > + if ((rc = td_replication_connect_init(t, name))) > return rc; > > if ((rc = ctl_open(driver, name))) { > @@ -1901,7 +1504,7 @@ static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid) > return rc; > } > > - if (!(rc = remus_bind(s))) > + if (!(rc = td_replication_server_start(t))) > rc = switch_mode(driver, mode_backup); > else if (rc == -2) > rc = switch_mode(driver, mode_primary); > @@ -1932,8 +1535,7 @@ static int tdremus_close(td_driver_t *driver) > if (s->ramdisk.inprogress) > hashtable_destroy(s->ramdisk.inprogress, 0); > > - close_server_fd(s); > - close_stream_fd(s); > + td_replication_connect_kill(&s->t); > ctl_unregister(s); > ctl_close(s); > > diff --git a/tools/blktap2/drivers/block-replication.c b/tools/blktap2/drivers/block-replication.c > new file mode 100644 > index 0000000..e4b2679 > --- /dev/null > +++ b/tools/blktap2/drivers/block-replication.c > @@ -0,0 +1,468 @@ > +/* > + * Copyright (C) 2014 FUJITSU LIMITED > + * Author: Wen Congyang > + * > + * This program is free software; you can redistribute it and/or modify > + * it under the terms of the GNU Lesser General Public License as published > + * by the Free Software Foundation; version 2.1 only. with the special > + * exception on linking described in file LICENSE. > + * > + * This program is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > + * GNU Lesser General Public License for more details. > + */ > + > +#include "tapdisk-server.h" > +#include "block-replication.h" > + > +#include > +#include > +#include > +#include > +#include > +#include > +#include > +#include > + > +#undef DPRINTF > +#undef EPRINTF > +#define DPRINTF(_f, _a...) syslog (LOG_DEBUG, "%s: " _f, log_prefix, ## _a) > +#define EPRINTF(_f, _a...) syslog (LOG_ERR, "%s: " _f, log_prefix, ## _a) > + > +/* connection status */ > +enum { > + connection_none, > + connection_in_progress, > + connection_established, > + connection_closed, > +}; > + > +/* common functions */ > +/* args should be host:port */ > +static int get_args(td_replication_connect_t *t, const char* name) > +{ > + char* host; > + const char* port; > + int gai_status; > + int valid_addr; > + struct addrinfo gai_hints; > + struct addrinfo *servinfo, *servinfo_itr; > + const char *log_prefix = t->log_prefix; > + > + memset(&gai_hints, 0, sizeof gai_hints); > + gai_hints.ai_family = AF_UNSPEC; > + gai_hints.ai_socktype = SOCK_STREAM; > + > + port = strchr(name, ':'); > + if (!port) { > + EPRINTF("missing host in %s\n", name); > + return -ENOENT; > + } > + if (!(host = strndup(name, port - name))) { > + EPRINTF("unable to allocate host\n"); > + return -ENOMEM; > + } > + port++; > + if ((gai_status = getaddrinfo(host, port, > + &gai_hints, &servinfo)) != 0) { > + EPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status)); > + free(host); > + return -ENOENT; > + } > + free(host); > + > + /* TODO: do something smarter here */ > + valid_addr = 0; > + for (servinfo_itr = servinfo; servinfo_itr != NULL; > + servinfo_itr = servinfo_itr->ai_next) { > + if (servinfo_itr->ai_family == AF_INET) { > + valid_addr = 1; > + memset(&t->sa, 0, sizeof(t->sa)); > + t->sa = *(struct sockaddr_in *)servinfo_itr->ai_addr; > + break; > + } > + } > + freeaddrinfo(servinfo); > + > + if (!valid_addr) > + return -ENOENT; > + > + DPRINTF("host: %s, port: %d\n", inet_ntoa(t->sa.sin_addr), > + ntohs(t->sa.sin_port)); > + > + return 0; > +} > + > +int td_replication_connect_init(td_replication_connect_t *t, const char *name) > +{ > + int rc; > + > + rc = get_args(t, name); > + if (rc) > + return rc; > + > + t->listen_fd = -1; > + t->id = -1; > + t->status = connection_none; > + return 0; > +} > + > +int td_replication_connect_status(td_replication_connect_t *t) > +{ > + const char *log_prefix = t->log_prefix; > + > + switch (t->status) { > + case connection_none: > + case connection_closed: > + return -1; > + case connection_in_progress: > + return 0; > + case connection_established: > + return 1; > + default: > + EPRINTF("td_replication_connect is corruptted\n"); > + return -2; > + } > +} > + > +void td_replication_connect_kill(td_replication_connect_t *t) > +{ > + if (t->status != connection_in_progress && > + t->status != connection_established) > + return; > + > + UNREGISTER_EVENT(t->id); > + CLOSE_FD(t->fd); > + CLOSE_FD(t->listen_fd); > + t->status = connection_closed; > +} > + > +/* server */ > +static void td_replication_server_accept(event_id_t id, char mode, > + void *private); > + > +int td_replication_server_start(td_replication_connect_t *t) > +{ > + int opt; > + int rc = -1; > + event_id_t id; > + int fd; > + const char *log_prefix = t->log_prefix; > + > + if (t->status == connection_in_progress || > + t->status == connection_established) > + return rc; > + > + fd = socket(AF_INET, SOCK_STREAM, 0); > + if (fd < 0) { > + EPRINTF("could not create server socket: %d\n", errno); > + return rc; > + } > + > + opt = 1; > + if (setsockopt(fd, SOL_SOCKET, > + SO_REUSEADDR, &opt, sizeof(opt)) < 0) > + DPRINTF("Error setting REUSEADDR on %d: %d\n", fd, errno); > + > + if (bind(fd, (struct sockaddr *)&t->sa, sizeof(t->sa)) < 0) { > + DPRINTF("could not bind server socket %d to %s:%d: %d %s\n", > + fd, inet_ntoa(t->sa.sin_addr), > + ntohs(t->sa.sin_port), errno, strerror(errno)); > + if (errno == EADDRNOTAVAIL) > + rc = -2; > + goto err; > + } > + > + if (listen(fd, t->max_connections)) { > + EPRINTF("could not listen on socket: %d\n", errno); > + goto err; > + } > + > + /* > + * The socket is now bound to the address and listening so we > + * may now register the fd with tapdisk > + */ > + id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, > + fd, 0, > + td_replication_server_accept, t); > + if (id < 0) { > + EPRINTF("error registering server connection event handler: %s", > + strerror(id)); > + goto err; > + } > + t->listen_fd = fd; > + t->id = id; > + t->status = connection_in_progress; > + > + return 0; > + > +err: > + close(fd); > + return rc; > +} > + > +static void td_replication_server_accept(event_id_t id, char mode, > + void *private) > +{ > + td_replication_connect_t *t = private; > + int fd; > + const char *log_prefix = t->log_prefix; > + > + /* XXX: add address-based black/white list */ > + fd = accept(t->listen_fd, NULL, NULL); > + if (fd < 0) { > + EPRINTF("error accepting connection: %d\n", errno); > + return; > + } > + > + if (t->status == connection_established) { > + EPRINTF("connection is already established\n"); > + close(fd); > + return; > + } > + > + DPRINTF("server accepted connection\n"); > + t->fd = fd; > + t->status = connection_established; > + t->callback(t, 0); > +} > + > +int td_replication_server_restart(td_replication_connect_t *t) > +{ > + switch (t->status) { > + case connection_in_progress: > + return 0; > + case connection_established: > + CLOSE_FD(t->fd); > + t->status = connection_in_progress; > + return 0; > + case connection_none: > + case connection_closed: > + return td_replication_server_start(t); > + default: > + /* not reached */ > + return -1; > + } > +} > + > +/* client */ > +static void td_replication_retry_connect_event(event_id_t id, char mode, > + void *private); > +static void td_replication_connect_event(event_id_t id, char mode, > + void *private); > +int td_replication_client_start(td_replication_connect_t *t) > +{ > + event_id_t id; > + int fd; > + int rc; > + int flags; > + const char *log_prefix = t->log_prefix; > + > + if (t->status == connection_in_progress || > + t->status == connection_established) > + return ERROR_INTERNAL; > + > + DPRINTF("client connecting to %s:%d...\n", > + inet_ntoa(t->sa.sin_addr), ntohs(t->sa.sin_port)); > + > + if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) { > + EPRINTF("could not create client socket: %d\n", errno); > + return ERROR_INTERNAL; > + } > + > + /* make socket nonblocking */ > + if ((flags = fcntl(fd, F_GETFL, 0)) == -1) > + flags = 0; > + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) { > + EPRINTF("error setting fd %d to non block mode\n", fd); > + goto err; > + } > + > + /* > + * once we have created the socket and populated the address, > + * we can now start our non-blocking connect. rather than > + * duplicating code we trigger a timeout on the socket fd, > + * which calls out nonblocking connect code > + */ > + id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0, > + td_replication_retry_connect_event, > + t); > + if(id < 0) { > + EPRINTF("error registering timeout client connection event handler: %s\n", > + strerror(id)); > + goto err; > + } > + > + t->fd = fd; > + t->id = id; > + t->status = connection_in_progress; > + return 0; > + > +err: > + close(fd); > + return ERROR_INTERNAL; > +} > + > +static void td_replication_client_failed(td_replication_connect_t *t, int rc) > +{ > + td_replication_connect_kill(t); > + t->callback(t, rc); > +} > + > +static void td_replication_client_done(td_replication_connect_t *t) > +{ > + UNREGISTER_EVENT(t->id); > + t->status = connection_established; > + t->callback(t, 0); > +} > + > +static int td_replication_retry_connect(td_replication_connect_t *t) > +{ > + event_id_t id; > + const char *log_prefix = t->log_prefix; > + > + UNREGISTER_EVENT(t->id); > + > + DPRINTF("connect to server 1 second later"); > + id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, > + t->fd, t->retry_timeout_s, > + td_replication_retry_connect_event, > + t); > + if (id < 0) { > + EPRINTF("error registering timeout client connection event handler: %s\n", > + strerror(id)); > + return ERROR_INTERNAL; > + } > + > + t->id = id; > + return 0; > +} > + > +static int td_replication_wait_connect_done(td_replication_connect_t *t) > +{ > + event_id_t id; > + const char *log_prefix = t->log_prefix; > + > + UNREGISTER_EVENT(t->id); > + > + id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD, > + t->fd, 0, > + td_replication_connect_event, t); > + if (id < 0) { > + EPRINTF("error registering client connection event handler: %s\n", > + strerror(id)); > + return ERROR_INTERNAL; > + } > + t->id = id; > + > + return 0; > +} > + > +/* return 1 if we need to reconnect to backup server */ > +static int check_connect_errno(int err) > +{ > + /* > + * The fd is non-block, so we will not get ETIMEDOUT > + * after calling connect(). We only can get this errno > + * by getsockopt(). > + */ > + if (err == ECONNREFUSED || err == ENETUNREACH || > + err == EAGAIN || err == ECONNABORTED || > + err == ETIMEDOUT) > + return 1; > + > + return 0; > +} > + > +static void td_replication_retry_connect_event(event_id_t id, char mode, > + void *private) > +{ > + td_replication_connect_t *t = private; > + int rc, ret; > + const char *log_prefix = t->log_prefix; > + > + /* do a non-blocking connect */ > + ret = connect(t->fd, (struct sockaddr *)&t->sa, sizeof(t->sa)); > + if (ret) { > + if (errno == EINPROGRESS) { > + /* > + * the connect returned EINPROGRESS (nonblocking > + * connect) we must wait for the fd to be writeable > + * to determine if the connect worked > + */ > + rc = td_replication_wait_connect_done(t); > + if (rc) > + goto fail; > + return; > + } > + > + if (check_connect_errno(errno)) { > + rc = td_replication_retry_connect(t); > + if (rc) > + goto fail; > + return; > + } > + > + /* not recoverable */ > + EPRINTF("error connection to server %s\n", strerror(errno)); > + rc = ERROR_CONNECTION; > + goto fail; > + } > + > + /* The connection is established unexpectedly */ > + td_replication_client_done(t); > + > + return; > + > +fail: > + td_replication_client_failed(t, rc); > +} > + > +/* callback when nonblocking connect() is finished */ > +static void td_replication_connect_event(event_id_t id, char mode, > + void *private) > +{ > + int socket_errno; > + socklen_t socket_errno_size; > + td_replication_connect_t *t = private; > + int rc; > + const char *log_prefix = t->log_prefix; > + > + /* check to see if the connect succeeded */ > + socket_errno_size = sizeof(socket_errno); > + if (getsockopt(t->fd, SOL_SOCKET, SO_ERROR, > + &socket_errno, &socket_errno_size)) { > + EPRINTF("error getting socket errno\n"); > + return; > + } > + > + DPRINTF("socket connect returned %d\n", socket_errno); > + > + if (socket_errno) { > + /* the connect did not succeed */ > + if (check_connect_errno(socket_errno)) { > + /* > + * we can probably assume that the backup is down. > + * just try again later > + */ > + rc = td_replication_retry_connect(t); > + if (rc) > + goto fail; > + > + return; > + } else { > + EPRINTF("socket connect returned %d, giving up\n", > + socket_errno); > + rc = ERROR_CONNECTION; > + goto fail; > + } > + } > + > + td_replication_client_done(t); > + > + return; > + > +fail: > + td_replication_client_failed(t, rc); > +} > diff --git a/tools/blktap2/drivers/block-replication.h b/tools/blktap2/drivers/block-replication.h > new file mode 100644 > index 0000000..0bd6e71 > --- /dev/null > +++ b/tools/blktap2/drivers/block-replication.h > @@ -0,0 +1,113 @@ > +/* > + * Copyright (C) 2014 FUJITSU LIMITED > + * Author: Wen Congyang > + * > + * This program is free software; you can redistribute it and/or modify > + * it under the terms of the GNU Lesser General Public License as published > + * by the Free Software Foundation; version 2.1 only. with the special > + * exception on linking described in file LICENSE. > + * > + * This program is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > + * GNU Lesser General Public License for more details. > + */ > + > +#ifndef BLOCK_REPLICATION_H > +#define BLOCK_REPLICATION_H > + > +#include "scheduler.h" > +#include > +#include > + > +#define CONTAINER_OF(inner_ptr, outer, member_name) \ > + ({ \ > + typeof(outer) *container_of_; \ > + container_of_ = (void*)((char*)(inner_ptr) - \ > + offsetof(typeof(outer), member_name)); \ > + (void)(&container_of_->member_name == \ > + (typeof(inner_ptr))0) /* type check */; \ > + container_of_; \ > + }) > + > +#define UNREGISTER_EVENT(id) \ > + do { \ > + if (id >= 0) { \ > + tapdisk_server_unregister_event(id); \ > + id = -1; \ > + } \ > + } while (0) > +#define CLOSE_FD(fd) \ > + do { \ > + if (fd >= 0) { \ > + close(fd); \ > + fd = -1; \ > + } \ > + } while (0) > + > +enum { > + ERROR_INTERNAL = -1, > + ERROR_IO = -2, > + ERROR_CONNECTION = -3, > + ERROR_CLOSE = -4, > +}; > + > +typedef struct td_replication_connect td_replication_connect_t; > +typedef void td_replication_callback(td_replication_connect_t *r, int rc); > + > +struct td_replication_connect { > + /* > + * caller must fill these in before calling > + * td_replication_connect_init() > + */ > + const char *log_prefix; > + td_replication_callback *callback; > + int retry_timeout_s; > + int max_connections; > + /* > + * The caller uses this fd to read/write after > + * the connection is established > + */ > + int fd; > + > + /* private */ > + struct sockaddr_in sa; > + int listen_fd; > + event_id_t id; > + > + int status; > +}; > + > +/* return -errno if failure happened, otherwise return 0 */ > +int td_replication_connect_init(td_replication_connect_t *t, const char *name); > +/* > + * Return value: > + * -1: connection is closed or not connected > + * 0: connection is in progress > + * 1: connection is established > + */ > +int td_replication_connect_status(td_replication_connect_t *t); > +void td_replication_connect_kill(td_replication_connect_t *t); > + > +/* > + * Return value: > + * -2: this caller should be client > + * -1: error > + * 0: connection is in progress > + */ > +int td_replication_server_start(td_replication_connect_t *t); > +/* > + * Return value: > + * -2: this caller should be client > + * -1: error > + * 0: connection is in progress > + */ > +int td_replication_server_restart(td_replication_connect_t *t); > +/* > + * Return value: > + * -1: error > + * 0: connection is in progress > + */ > +int td_replication_client_start(td_replication_connect_t *t); > + > +#endif > -- > 1.9.3 > Acked-by: Shriram Rajagopalan