All of lore.kernel.org
 help / color / mirror / Atom feed
From: Wen Congyang <wency@cn.fujitsu.com>
To: xen devel <xen-devel@lists.xen.org>
Cc: Ian Campbell <Ian.Campbell@citrix.com>,
	Wen Congyang <wency@cn.fujitsu.com>,
	Ian Jackson <Ian.Jackson@eu.citrix.com>,
	Jiang Yunhong <yunhong.jiang@intel.com>,
	Dong Eddie <eddie.dong@intel.com>,
	Shriram Rajagopalan <rshriram@cs.ubc.ca>,
	Yang Hongyang <yanghy@cn.fujitsu.com>,
	Lai Jiangshan <laijs@cn.fujitsu.com>
Subject: [RFC Patch v3 15/22] blktap2: move async connect related codes to block-replication.c
Date: Fri, 5 Sep 2014 17:25:50 +0800	[thread overview]
Message-ID: <1409909158-19243-16-git-send-email-wency@cn.fujitsu.com> (raw)
In-Reply-To: <1409909158-19243-1-git-send-email-wency@cn.fujitsu.com>

   COLO will reuse them.

Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
Cc: Shriram Rajagopalan <rshriram@cs.ubc.ca>
---
 tools/blktap2/drivers/Makefile            |   2 +-
 tools/blktap2/drivers/block-remus.c       | 494 +++---------------------------
 tools/blktap2/drivers/block-replication.c | 468 ++++++++++++++++++++++++++++
 tools/blktap2/drivers/block-replication.h | 113 +++++++
 4 files changed, 630 insertions(+), 447 deletions(-)
 create mode 100644 tools/blktap2/drivers/block-replication.c
 create mode 100644 tools/blktap2/drivers/block-replication.h

diff --git a/tools/blktap2/drivers/Makefile b/tools/blktap2/drivers/Makefile
index 37c3485..3d8ed8a 100644
--- a/tools/blktap2/drivers/Makefile
+++ b/tools/blktap2/drivers/Makefile
@@ -23,7 +23,7 @@ endif
 
 VHDLIBS    := -L$(LIBVHDDIR) -lvhd
 
-REMUS-OBJS  := block-remus.o
+REMUS-OBJS  := block-remus.o block-replication.o
 REMUS-OBJS  += hashtable.o
 REMUS-OBJS  += hashtable_itr.o
 REMUS-OBJS  += hashtable_utility.o
diff --git a/tools/blktap2/drivers/block-remus.c b/tools/blktap2/drivers/block-remus.c
index 5d27d41..8b6f157 100644
--- a/tools/blktap2/drivers/block-remus.c
+++ b/tools/blktap2/drivers/block-remus.c
@@ -40,6 +40,7 @@
 #include "hashtable.h"
 #include "hashtable_itr.h"
 #include "hashtable_utility.h"
+#include "block-replication.h"
 
 #include <errno.h>
 #include <inttypes.h>
@@ -49,10 +50,7 @@
 #include <string.h>
 #include <sys/time.h>
 #include <sys/types.h>
-#include <sys/socket.h>
-#include <netdb.h>
 #include <netinet/in.h>
-#include <arpa/inet.h>
 #include <sys/param.h>
 #include <sys/sysctl.h>
 #include <unistd.h>
@@ -67,22 +65,6 @@
 
 #define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: " _f, ## _a)
 
-#define UNREGISTER_EVENT(id)					\
-	do {							\
-		if (id >= 0) {					\
-			tapdisk_server_unregister_event(id);	\
-			id = -1;				\
-		}						\
-	} while (0)
-
-#define CLOSE_FD(fd)			\
-	do {				\
-		if (fd >= 0) {		\
-			close(fd);	\
-			fd = -1;	\
-		}			\
-	} while (0)
-
 #define MAX_REMUS_REQUEST       TAPDISK_DATA_REQUESTS
 
 enum tdremus_mode {
@@ -92,13 +74,6 @@ enum tdremus_mode {
 	mode_backup
 };
 
-enum {
-	ERROR_INTERNAL = -1,
-	ERROR_IO = -2,
-	ERROR_CONNECTION = -3,
-	ERROR_CLOSE = -4,
-};
-
 struct tdremus_req {
 	td_request_t treq;
 };
@@ -167,21 +142,9 @@ struct ramdisk_write_cbdata {
 
 typedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq);
 
-/*
- * If cid, rid and wid are -1, fd must be -1. It means that
- * we are in unpritected mode or we don't start to connect
- * to backup.
- * If fd is an valid fd:
- *  cid is valid, rid and wid must be invalid. It means that
- *      the connection is in progress.
- *  cid is invalid. rid or wid must be valid. It means that
- *      the connection is established.
- */
 typedef struct poll_fd {
 	int        fd;
-	event_id_t cid;
-	event_id_t rid;
-	event_id_t wid;
+	event_id_t id;
 } poll_fd_t;
 
 struct tdremus_state {
@@ -195,9 +158,7 @@ struct tdremus_state {
 	char*     msg_path; /* output completion message here */
 	poll_fd_t msg_fd;
 
-  /* replication host */
-	struct sockaddr_in sa;
-	poll_fd_t server_fd;    /* server listen port */
+	td_replication_connect_t t;
 	poll_fd_t stream_fd;     /* replication channel */
 
 	/*
@@ -777,28 +738,8 @@ static int mwrite(int fd, void* buf, size_t len)
 	select(fd + 1, NULL, &wfds, NULL, &tv);
 }
 
-
-static void inline close_stream_fd(struct tdremus_state *s)
-{
-
-	UNREGISTER_EVENT(s->stream_fd.cid);
-	UNREGISTER_EVENT(s->stream_fd.rid);
-	UNREGISTER_EVENT(s->stream_fd.wid);
-
-	/* close the connection */
-	CLOSE_FD(s->stream_fd.fd);
-}
-
-static void close_server_fd(struct tdremus_state *s)
-{
-	UNREGISTER_EVENT(s->server_fd.cid);
-	CLOSE_FD(s->server_fd.fd);
-}
-
 /* primary functions */
-static void remus_client_event(event_id_t, char mode, void *private);
-static void remus_connect_event(event_id_t id, char mode, void *private);
-static void remus_retry_connect_event(event_id_t id, char mode, void *private);
+static void remus_client_event(event_id_t id, char mode, void *private);
 static int primary_forward_request(struct tdremus_state *s,
 				   const td_request_t *treq);
 
@@ -808,56 +749,15 @@ static int primary_forward_request(struct tdremus_state *s,
  */
 static void primary_failed(struct tdremus_state *s, int rc)
 {
-	close_stream_fd(s);
+	td_replication_connect_kill(&s->t);
 	if (rc == ERROR_INTERNAL)
 		RPRINTF("switch to unprotected mode due to internal error");
 	if (rc == ERROR_CLOSE)
 		RPRINTF("switch to unprotected mode before closing");
+	UNREGISTER_EVENT(s->stream_fd.id);
 	switch_mode(s->tdremus_driver, mode_unprotected);
 }
 
-static int primary_do_connect(struct tdremus_state *state)
-{
-	event_id_t id;
-	int fd;
-	int rc;
-	int flags;
-
-	RPRINTF("client connecting to %s:%d...\n",
-		inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
-
-	if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
-		RPRINTF("could not create client socket: %d\n", errno);
-		return ERROR_INTERNAL;
-	}
-	state->stream_fd.fd = fd;
-
-	/* make socket nonblocking */
-	if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
-		flags = 0;
-	if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
-		RPRINTF("error setting fd %d to non block mode\n", fd);
-		return ERROR_INTERNAL;
-	}
-
-	/*
-	 * once we have created the socket and populated the address,
-	 * we can now start our non-blocking connect. rather than
-	 * duplicating code we trigger a timeout on the socket fd,
-	 * which calls out nonblocking connect code
-	 */
-	if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0,
-					       remus_retry_connect_event,
-					       state)) < 0) {
-		RPRINTF("error registering timeout client connection event handler: %s\n",
-			strerror(id));
-		return ERROR_INTERNAL;
-	}
-
-	state->stream_fd.cid = id;
-	return 0;
-}
-
 static int remus_handle_queued_io(struct tdremus_state *s)
 {
 	struct req_ring *queued_io = &s->queued_io;
@@ -882,184 +782,35 @@ static int remus_handle_queued_io(struct tdremus_state *s)
 	return 0;
 }
 
-static int remus_connection_done(struct tdremus_state *s)
+static void remus_client_established(td_replication_connect_t *t, int rc)
 {
+	struct tdremus_state *s = CONTAINER_OF(t, *s, t);
 	event_id_t id;
 
-	/* the connect succeeded */
-	/* unregister this function and register a new event handler */
-	tapdisk_server_unregister_event(s->stream_fd.cid);
-	s->stream_fd.cid = -1;
+	if (rc) {
+		primary_failed(s, rc);
+		return;
+	}
 
-	id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->stream_fd.fd,
+	/* the connect succeeded */
+	id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd,
 					   0, remus_client_event, s);
 	if(id < 0) {
 		RPRINTF("error registering client event handler: %s\n",
 			strerror(id));
-		return ERROR_INTERNAL;
-	}
-	s->stream_fd.rid = id;
-
-	/* handle the queued requests */
-	return remus_handle_queued_io(s);
-}
-
-static int remus_retry_connect(struct tdremus_state *s)
-{
-	event_id_t id;
-
-	tapdisk_server_unregister_event(s->stream_fd.cid);
-	s->stream_fd.cid = -1;
-
-	RPRINTF("connect to backup 1 second later");
-	id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
-					   s->stream_fd.fd,
-					   REMUS_CONNRETRY_TIMEOUT,
-					   remus_retry_connect_event, s);
-	if (id < 0) {
-		RPRINTF("error registering timeout client connection event handler: %s\n",
-			strerror(id));
-		return ERROR_INTERNAL;
-	}
-
-	s->stream_fd.cid = id;
-	return 0;
-}
-
-static int remus_wait_connect_done(struct tdremus_state *s)
-{
-	event_id_t id;
-
-	tapdisk_server_unregister_event(s->stream_fd.cid);
-	s->stream_fd.cid = -1;
-
-	id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD,
-					   s->stream_fd.fd, 0,
-					   remus_connect_event, s);
-	if (id < 0) {
-		RPRINTF("error registering client connection event handler: %s\n",
-			strerror(id));
-		return ERROR_INTERNAL;
-	}
-	s->stream_fd.cid = id;
-
-	return 0;
-}
-
-/* return 1 if we need to reconnect to backup */
-static int check_connect_errno(int err)
-{
-	/*
-	 * The fd is non-block, so we will not get ETIMEDOUT
-	 * after calling connect(). We only can get this errno
-	 * by getsockopt().
-	 */
-	if (err == ECONNREFUSED || err == ENETUNREACH ||
-	    err == EAGAIN || err == ECONNABORTED ||
-	    err == ETIMEDOUT)
-	    return 1;
-
-	return 0;
-}
-
-static void remus_retry_connect_event(event_id_t id, char mode, void *private)
-{
-	struct tdremus_state *s = (struct tdremus_state *)private;
-	int rc, ret;
-
-	/* do a non-blocking connect */
-	ret = connect(s->stream_fd.fd,
-		      (struct sockaddr *)&s->sa,
-		      sizeof(s->sa));
-	if (ret) {
-		if (errno == EINPROGRESS) {
-			/*
-			 * the connect returned EINPROGRESS (nonblocking
-			 * connect) we must wait for the fd to be writeable
-			 * to determine if the connect worked
-			 */
-			rc = remus_wait_connect_done(s);
-			if (rc)
-				goto fail;
-			return;
-		}
-
-		if (check_connect_errno(errno)) {
-			rc = remus_retry_connect(s);
-			if (rc)
-				goto fail;
-			return;
-		}
-
-		/* not recoverable */
-		RPRINTF("error connection to server %s\n", strerror(errno));
-		rc = ERROR_CONNECTION;
-		goto fail;
-	}
-
-	/* The connection is established unexpectedly */
-	rc = remus_connection_done(s);
-	if (rc)
-		goto fail;
-
-	return;
-
-fail:
-	primary_failed(s, rc);
-	return;
-}
-
-/* callback when nonblocking connect() is finished */
-static void remus_connect_event(event_id_t id, char mode, void *private)
-{
-	int socket_errno;
-	socklen_t socket_errno_size;
-	struct tdremus_state *s = (struct tdremus_state *)private;
-	int rc;
-
-	/* check to see if the connect succeeded */
-	socket_errno_size = sizeof(socket_errno);
-	if (getsockopt(s->stream_fd.fd, SOL_SOCKET, SO_ERROR,
-		       &socket_errno, &socket_errno_size)) {
-		RPRINTF("error getting socket errno\n");
+		primary_failed(s, ERROR_INTERNAL);
 		return;
 	}
 
-	RPRINTF("socket connect returned %d\n", socket_errno);
+	s->stream_fd.fd = t->fd;
+	s->stream_fd.id = id;
 
-	if (socket_errno) {
-		/* the connect did not succeed */
-		if (check_connect_errno(socket_errno)) {
-			/*
-			 * we can probably assume that the backup is down.
-			 * just try again later
-			 */
-			rc = remus_retry_connect(s);
-			if (rc)
-				goto fail;
-
-			return;
-		} else {
-			RPRINTF("socket connect returned %d, giving up\n",
-				socket_errno);
-			rc = ERROR_CONNECTION;
-			goto fail;
-		}
-
-		return;
-	}
-
-	rc = remus_connection_done(s);
+	/* handle the queued requests */
+	rc = remus_handle_queued_io(s);
 	if (rc)
-		goto fail;
-
-	return;
-
-fail:
-	primary_failed(s, rc);
+		primary_failed(s, rc);
 }
 
-
 /*
  * we install this event handler on the primary once we have
  * connected to the backup.
@@ -1142,19 +893,21 @@ static int primary_forward_request(struct tdremus_state *s,
 static void primary_queue_write(td_driver_t *driver, td_request_t treq)
 {
 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
-	int rc;
+	int rc, ret;
 
 	// RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd);
 
-	if(s->stream_fd.fd < 0) {
+	ret = td_replication_connect_status(&s->t);
+	if(ret == -1) {
 		RPRINTF("connecting to backup...\n");
-		rc = primary_do_connect(s);
+		s->t.callback = remus_client_established;
+		rc = td_replication_client_start(&s->t);
 		if (rc)
 			goto fail;
 	}
 
 	/* The connection is not established, just queue the request */
-	if (s->stream_fd.cid >= 0) {
+	if (ret != 1) {
 		ring_add_request(&s->queued_io, &treq);
 		return;
 	}
@@ -1227,9 +980,7 @@ static int primary_start(td_driver_t *driver)
 	s->queue_flush = primary_flush;
 
 	s->stream_fd.fd = -1;
-	s->stream_fd.cid = -1;
-	s->stream_fd.rid = -1;
-	s->stream_fd.wid = -1;
+	s->stream_fd.id = -1;
 
 	return 0;
 }
@@ -1240,100 +991,32 @@ static void remus_server_event(event_id_t id, char mode, void *private);
 /* It is called when we find some I/O error */
 static void backup_failed(struct tdremus_state *s, int rc)
 {
-	close_stream_fd(s);
-	close_server_fd(s);
+	td_replication_connect_kill(&s->t);
 	/* We will switch to unprotected mode in backup_queue_write() */
 }
 
 /* returns the socket that receives write requests */
-static void remus_server_accept(event_id_t id, char mode, void* private)
+static void remus_server_established(td_replication_connect_t *t, int rc)
 {
-	struct tdremus_state* s = (struct tdremus_state *) private;
-
-	int stream_fd;
-
-	/* XXX: add address-based black/white list */
-	if ((stream_fd = accept(s->server_fd.fd, NULL, NULL)) < 0) {
-		RPRINTF("error accepting connection: %d\n", errno);
-		return;
-	}
+	struct tdremus_state *s = CONTAINER_OF(t, *s, t);
+	event_id_t id;
 
-	/*
-	 * TODO: check to see if we are already replicating.
-	 * if so just close the connection (or do something
-	 * smarter)
-	 */
-	RPRINTF("server accepted connection\n");
+	/* rc is always 0 */
 
 	/* add tapdisk event for replication stream */
-	id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd, 0,
+	id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd, 0,
 					   remus_server_event, s);
 
 	if (id < 0) {
 		RPRINTF("error registering connection event handler: %s\n",
 			strerror(errno));
-		close(stream_fd);
+		td_replication_server_restart(t);
 		return;
 	}
 
 	/* store replication file descriptor */
-	s->stream_fd.fd = stream_fd;
-	s->stream_fd.rid = id;
-}
-
-/* returns -2 if EADDRNOTAVAIL */
-static int remus_bind(struct tdremus_state* s)
-{
-	int opt;
-	int rc = -1;
-	event_id_t id;
-
-	if ((s->server_fd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-		RPRINTF("could not create server socket: %d\n", errno);
-		return rc;
-	}
-
-	opt = 1;
-	if (setsockopt(s->server_fd.fd, SOL_SOCKET,
-		       SO_REUSEADDR, &opt, sizeof(opt)) < 0)
-		RPRINTF("Error setting REUSEADDR on %d: %d\n",
-			s->server_fd.fd, errno);
-
-	if (bind(s->server_fd.fd, (struct sockaddr *)&s->sa,
-		 sizeof(s->sa)) < 0) {
-		RPRINTF("could not bind server socket %d to %s:%d: %d %s\n",
-			s->server_fd.fd, inet_ntoa(s->sa.sin_addr),
-			ntohs(s->sa.sin_port), errno, strerror(errno));
-		if (errno == EADDRNOTAVAIL)
-			rc = -2;
-		goto err_sfd;
-	}
-
-	if (listen(s->server_fd.fd, 10)) {
-		RPRINTF("could not listen on socket: %d\n", errno);
-		goto err_sfd;
-	}
-
-	/*
-	 * The socket s now bound to the address and listening so we
-	 * may now register the fd with tapdisk
-	 */
-	id =  tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
-					    s->server_fd.fd, 0,
-					    remus_server_accept, s);
-	if (id < 0) {
-		RPRINTF("error registering server connection event handler: %s",
-			strerror(id));
-		goto err_sfd;
-	}
-	s->server_fd.cid = id;
-
-	return 0;
-
-err_sfd:
-	CLOSE_FD(s->server_fd.fd);
-
-	return rc;
+	s->stream_fd.fd = t->fd;
+	s->stream_fd.id = id;
 }
 
 /* wait for latest checkpoint to be applied */
@@ -1566,90 +1249,6 @@ static int unprotected_start(td_driver_t *driver)
 
 
 /* control */
-
-static inline int resolve_address(const char* addr, struct in_addr* ia)
-{
-	struct hostent* he;
-	uint32_t ip;
-
-	if (!(he = gethostbyname(addr))) {
-		RPRINTF("error resolving %s: %d\n", addr, h_errno);
-		return -1;
-	}
-
-	if (!he->h_addr_list[0]) {
-		RPRINTF("no address found for %s\n", addr);
-		return -1;
-	}
-
-	/* network byte order */
-	ip = *((uint32_t**)he->h_addr_list)[0];
-	ia->s_addr = ip;
-
-	return 0;
-}
-
-static int get_args(td_driver_t *driver, const char* name)
-{
-	struct tdremus_state *state = (struct tdremus_state *)driver->data;
-	char* host;
-	char* port;
-//  char* driver_str;
-//  char* parent;
-//  int type;
-//  char* path;
-//  unsigned long ulport;
-//  int i;
-//  struct sockaddr_in server_addr_in;
-
-	int gai_status;
-	int valid_addr;
-	struct addrinfo gai_hints;
-	struct addrinfo *servinfo, *servinfo_itr;
-
-	memset(&gai_hints, 0, sizeof gai_hints);
-	gai_hints.ai_family = AF_UNSPEC;
-	gai_hints.ai_socktype = SOCK_STREAM;
-
-	port = strchr(name, ':');
-	if (!port) {
-		RPRINTF("missing host in %s\n", name);
-		return -ENOENT;
-	}
-	if (!(host = strndup(name, port - name))) {
-		RPRINTF("unable to allocate host\n");
-		return -ENOMEM;
-	}
-	port++;
-
-	if ((gai_status = getaddrinfo(host, port, &gai_hints, &servinfo)) != 0) {
-		RPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status));
-		return -ENOENT;
-	}
-
-	/* TODO: do something smarter here */
-	valid_addr = 0;
-	for(servinfo_itr = servinfo; servinfo_itr != NULL; servinfo_itr = servinfo_itr->ai_next) {
-		void *addr;
-		char *ipver;
-
-		if (servinfo_itr->ai_family == AF_INET) {
-			valid_addr = 1;
-			memset(&state->sa, 0, sizeof(state->sa));
-			state->sa = *(struct sockaddr_in *)servinfo_itr->ai_addr;
-			break;
-		}
-	}
-	freeaddrinfo(servinfo);
-
-	if (!valid_addr)
-		return -ENOENT;
-
-	RPRINTF("host: %s, port: %d\n", inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
-
-	return 0;
-}
-
 static int switch_mode(td_driver_t *driver, enum tdremus_mode mode)
 {
 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
@@ -1844,11 +1443,11 @@ static int ctl_register(struct tdremus_state *s)
 	RPRINTF("registering ctl fifo\n");
 
 	/* register ctl fd */
-	s->ctl_fd.cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->ctl_fd.fd, 0, ctl_request, s);
+	s->ctl_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->ctl_fd.fd, 0, ctl_request, s);
 
-	if (s->ctl_fd.cid < 0) {
+	if (s->ctl_fd.id < 0) {
 		RPRINTF("error registering ctrl FIFO %s: %d\n",
-			s->ctl_path, s->ctl_fd.cid);
+			s->ctl_path, s->ctl_fd.id);
 		return -1;
 	}
 
@@ -1859,7 +1458,7 @@ static void ctl_unregister(struct tdremus_state *s)
 {
 	RPRINTF("unregistering ctl fifo\n");
 
-	UNREGISTER_EVENT(s->ctl_fd.cid);
+	UNREGISTER_EVENT(s->ctl_fd.id);
 }
 
 /* interface */
@@ -1867,6 +1466,7 @@ static void ctl_unregister(struct tdremus_state *s)
 static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid)
 {
 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
+	td_replication_connect_t *t = &s->t;
 	int rc;
 	const char *name = image->name;
 	td_flag_t flags = image->flags;
@@ -1877,7 +1477,6 @@ static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid)
 	remus_image = image;
 
 	memset(s, 0, sizeof(*s));
-	s->server_fd.fd = -1;
 	s->stream_fd.fd = -1;
 	s->ctl_fd.fd = -1;
 	s->msg_fd.fd = -1;
@@ -1886,8 +1485,12 @@ static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid)
 	 * the driver stack from the stream_fd event handler */
 	s->tdremus_driver = driver;
 
+	t->log_prefix = "remus";
+	t->retry_timeout_s = REMUS_CONNRETRY_TIMEOUT;
+	t->max_connections = 10;
+	t->callback = remus_server_established;
 	/* parse name to get info etc */
-	if ((rc = get_args(driver, name)))
+	if ((rc = td_replication_connect_init(t, name)))
 		return rc;
 
 	if ((rc = ctl_open(driver, name))) {
@@ -1901,7 +1504,7 @@ static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid)
 		return rc;
 	}
 
-	if (!(rc = remus_bind(s)))
+	if (!(rc = td_replication_server_start(t)))
 		rc = switch_mode(driver, mode_backup);
 	else if (rc == -2)
 		rc = switch_mode(driver, mode_primary);
@@ -1932,8 +1535,7 @@ static int tdremus_close(td_driver_t *driver)
 	if (s->ramdisk.inprogress)
 		hashtable_destroy(s->ramdisk.inprogress, 0);
 
-	close_server_fd(s);
-	close_stream_fd(s);
+	td_replication_connect_kill(&s->t);
 	ctl_unregister(s);
 	ctl_close(s);
 
diff --git a/tools/blktap2/drivers/block-replication.c b/tools/blktap2/drivers/block-replication.c
new file mode 100644
index 0000000..e4b2679
--- /dev/null
+++ b/tools/blktap2/drivers/block-replication.c
@@ -0,0 +1,468 @@
+/*
+ * Copyright (C) 2014 FUJITSU LIMITED
+ * Author: Wen Congyang <wency@cn.fujitsu.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; version 2.1 only. with the special
+ * exception on linking described in file LICENSE.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Lesser General Public License for more details.
+ */
+
+#include "tapdisk-server.h"
+#include "block-replication.h"
+
+#include <string.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <syslog.h>
+#include <stdlib.h>
+#include <arpa/inet.h>
+
+#undef DPRINTF
+#undef EPRINTF
+#define DPRINTF(_f, _a...) syslog (LOG_DEBUG, "%s: " _f, log_prefix, ## _a)
+#define EPRINTF(_f, _a...) syslog (LOG_ERR, "%s: " _f, log_prefix, ## _a)
+
+/* connection status */
+enum {
+	connection_none,
+	connection_in_progress,
+	connection_established,
+	connection_closed,
+};
+
+/* common functions */
+/* args should be host:port */
+static int get_args(td_replication_connect_t *t, const char* name)
+{
+	char* host;
+	const char* port;
+	int gai_status;
+	int valid_addr;
+	struct addrinfo gai_hints;
+	struct addrinfo *servinfo, *servinfo_itr;
+	const char *log_prefix = t->log_prefix;
+
+	memset(&gai_hints, 0, sizeof gai_hints);
+	gai_hints.ai_family = AF_UNSPEC;
+	gai_hints.ai_socktype = SOCK_STREAM;
+
+	port = strchr(name, ':');
+	if (!port) {
+		EPRINTF("missing host in %s\n", name);
+		return -ENOENT;
+	}
+	if (!(host = strndup(name, port - name))) {
+		EPRINTF("unable to allocate host\n");
+		return -ENOMEM;
+	}
+	port++;
+	if ((gai_status = getaddrinfo(host, port,
+				      &gai_hints, &servinfo)) != 0) {
+		EPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status));
+		free(host);
+		return -ENOENT;
+	}
+	free(host);
+
+	/* TODO: do something smarter here */
+	valid_addr = 0;
+	for (servinfo_itr = servinfo; servinfo_itr != NULL;
+	     servinfo_itr = servinfo_itr->ai_next) {
+		if (servinfo_itr->ai_family == AF_INET) {
+			valid_addr = 1;
+			memset(&t->sa, 0, sizeof(t->sa));
+			t->sa = *(struct sockaddr_in *)servinfo_itr->ai_addr;
+			break;
+		}
+	}
+	freeaddrinfo(servinfo);
+
+	if (!valid_addr)
+		return -ENOENT;
+
+	DPRINTF("host: %s, port: %d\n", inet_ntoa(t->sa.sin_addr),
+		ntohs(t->sa.sin_port));
+
+	return 0;
+}
+
+int td_replication_connect_init(td_replication_connect_t *t, const char *name)
+{
+	int rc;
+
+	rc = get_args(t, name);
+	if (rc)
+		return rc;
+
+	t->listen_fd = -1;
+	t->id = -1;
+	t->status = connection_none;
+	return 0;
+}
+
+int td_replication_connect_status(td_replication_connect_t *t)
+{
+	const char *log_prefix = t->log_prefix;
+
+	switch (t->status) {
+	case connection_none:
+	case connection_closed:
+		return -1;
+	case connection_in_progress:
+		return 0;
+	case connection_established:
+		return 1;
+	default:
+		EPRINTF("td_replication_connect is corruptted\n");
+		return -2;
+	}
+}
+
+void td_replication_connect_kill(td_replication_connect_t *t)
+{
+	if (t->status != connection_in_progress &&
+	    t->status != connection_established)
+		return;
+
+	UNREGISTER_EVENT(t->id);
+	CLOSE_FD(t->fd);
+	CLOSE_FD(t->listen_fd);
+	t->status = connection_closed;
+}
+
+/* server */
+static void td_replication_server_accept(event_id_t id, char mode,
+					 void *private);
+
+int td_replication_server_start(td_replication_connect_t *t)
+{
+	int opt;
+	int rc = -1;
+	event_id_t id;
+	int fd;
+	const char *log_prefix = t->log_prefix;
+
+	if (t->status == connection_in_progress ||
+	    t->status == connection_established)
+		return rc;
+
+	fd = socket(AF_INET, SOCK_STREAM, 0);
+	if (fd < 0) {
+		EPRINTF("could not create server socket: %d\n", errno);
+		return rc;
+	}
+
+	opt = 1;
+	if (setsockopt(fd, SOL_SOCKET,
+		       SO_REUSEADDR, &opt, sizeof(opt)) < 0)
+		DPRINTF("Error setting REUSEADDR on %d: %d\n", fd, errno);
+
+	if (bind(fd, (struct sockaddr *)&t->sa, sizeof(t->sa)) < 0) {
+		DPRINTF("could not bind server socket %d to %s:%d: %d %s\n",
+			fd, inet_ntoa(t->sa.sin_addr),
+			ntohs(t->sa.sin_port), errno, strerror(errno));
+		if (errno == EADDRNOTAVAIL)
+			rc = -2;
+		goto err;
+	}
+
+	if (listen(fd, t->max_connections)) {
+		EPRINTF("could not listen on socket: %d\n", errno);
+		goto err;
+	}
+
+	/*
+	 * The socket is now bound to the address and listening so we
+	 * may now register the fd with tapdisk
+	 */
+	id =  tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
+					    fd, 0,
+					    td_replication_server_accept, t);
+	if (id < 0) {
+		EPRINTF("error registering server connection event handler: %s",
+			strerror(id));
+		goto err;
+	}
+	t->listen_fd = fd;
+	t->id = id;
+	t->status = connection_in_progress;
+
+	return 0;
+
+err:
+	close(fd);
+	return rc;
+}
+
+static void td_replication_server_accept(event_id_t id, char mode,
+					 void *private)
+{
+	td_replication_connect_t *t = private;
+	int fd;
+	const char *log_prefix = t->log_prefix;
+
+	/* XXX: add address-based black/white list */
+	fd = accept(t->listen_fd, NULL, NULL);
+	if (fd < 0) {
+		EPRINTF("error accepting connection: %d\n", errno);
+		return;
+	}
+
+	if (t->status == connection_established) {
+		EPRINTF("connection is already established\n");
+		close(fd);
+		return;
+	}
+
+	DPRINTF("server accepted connection\n");
+	t->fd = fd;
+	t->status = connection_established;
+	t->callback(t, 0);
+}
+
+int td_replication_server_restart(td_replication_connect_t *t)
+{
+	switch (t->status) {
+	case connection_in_progress:
+		return 0;
+	case connection_established:
+		CLOSE_FD(t->fd);
+		t->status = connection_in_progress;
+		return 0;
+	case connection_none:
+	case connection_closed:
+		return td_replication_server_start(t);
+	default:
+		/* not reached */
+		return -1;
+	}
+}
+
+/* client */
+static void td_replication_retry_connect_event(event_id_t id, char mode,
+					       void *private);
+static void td_replication_connect_event(event_id_t id, char mode,
+					 void *private);
+int td_replication_client_start(td_replication_connect_t *t)
+{
+	event_id_t id;
+	int fd;
+	int rc;
+	int flags;
+	const char *log_prefix = t->log_prefix;
+
+	if (t->status == connection_in_progress ||
+	    t->status == connection_established)
+		return ERROR_INTERNAL;
+
+	DPRINTF("client connecting to %s:%d...\n",
+		inet_ntoa(t->sa.sin_addr), ntohs(t->sa.sin_port));
+
+	if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
+		EPRINTF("could not create client socket: %d\n", errno);
+		return ERROR_INTERNAL;
+	}
+
+	/* make socket nonblocking */
+	if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
+		flags = 0;
+	if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
+		EPRINTF("error setting fd %d to non block mode\n", fd);
+		goto err;
+	}
+
+	/*
+	 * once we have created the socket and populated the address,
+	 * we can now start our non-blocking connect. rather than
+	 * duplicating code we trigger a timeout on the socket fd,
+	 * which calls out nonblocking connect code
+	 */
+	id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0,
+					   td_replication_retry_connect_event,
+					   t);
+	if(id < 0) {
+		EPRINTF("error registering timeout client connection event handler: %s\n",
+			strerror(id));
+		goto err;
+	}
+
+	t->fd = fd;
+	t->id = id;
+	t->status = connection_in_progress;
+	return 0;
+
+err:
+	close(fd);
+	return ERROR_INTERNAL;
+}
+
+static void td_replication_client_failed(td_replication_connect_t *t, int rc)
+{
+	td_replication_connect_kill(t);
+	t->callback(t, rc);
+}
+
+static void td_replication_client_done(td_replication_connect_t *t)
+{
+	UNREGISTER_EVENT(t->id);
+	t->status = connection_established;
+	t->callback(t, 0);
+}
+
+static int td_replication_retry_connect(td_replication_connect_t *t)
+{
+	event_id_t id;
+	const char *log_prefix = t->log_prefix;
+
+	UNREGISTER_EVENT(t->id);
+
+	DPRINTF("connect to server 1 second later");
+	id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
+					   t->fd, t->retry_timeout_s,
+					   td_replication_retry_connect_event,
+					   t);
+	if (id < 0) {
+		EPRINTF("error registering timeout client connection event handler: %s\n",
+			strerror(id));
+		return ERROR_INTERNAL;
+	}
+
+	t->id = id;
+	return 0;
+}
+
+static int td_replication_wait_connect_done(td_replication_connect_t *t)
+{
+	event_id_t id;
+	const char *log_prefix = t->log_prefix;
+
+	UNREGISTER_EVENT(t->id);
+
+	id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD,
+					   t->fd, 0,
+					   td_replication_connect_event, t);
+	if (id < 0) {
+		EPRINTF("error registering client connection event handler: %s\n",
+			strerror(id));
+		return ERROR_INTERNAL;
+	}
+	t->id = id;
+
+	return 0;
+}
+
+/* return 1 if we need to reconnect to backup server */
+static int check_connect_errno(int err)
+{
+	/*
+	 * The fd is non-block, so we will not get ETIMEDOUT
+	 * after calling connect(). We only can get this errno
+	 * by getsockopt().
+	 */
+	if (err == ECONNREFUSED || err == ENETUNREACH ||
+	    err == EAGAIN || err == ECONNABORTED ||
+	    err == ETIMEDOUT)
+	    return 1;
+
+	return 0;
+}
+
+static void td_replication_retry_connect_event(event_id_t id, char mode,
+					       void *private)
+{
+	td_replication_connect_t *t = private;
+	int rc, ret;
+	const char *log_prefix = t->log_prefix;
+
+	/* do a non-blocking connect */
+	ret = connect(t->fd, (struct sockaddr *)&t->sa, sizeof(t->sa));
+	if (ret) {
+		if (errno == EINPROGRESS) {
+			/*
+			 * the connect returned EINPROGRESS (nonblocking
+			 * connect) we must wait for the fd to be writeable
+			 * to determine if the connect worked
+			 */
+			rc = td_replication_wait_connect_done(t);
+			if (rc)
+				goto fail;
+			return;
+		}
+
+		if (check_connect_errno(errno)) {
+			rc = td_replication_retry_connect(t);
+			if (rc)
+				goto fail;
+			return;
+		}
+
+		/* not recoverable */
+		EPRINTF("error connection to server %s\n", strerror(errno));
+		rc = ERROR_CONNECTION;
+		goto fail;
+	}
+
+	/* The connection is established unexpectedly */
+	td_replication_client_done(t);
+
+	return;
+
+fail:
+	td_replication_client_failed(t, rc);
+}
+
+/* callback when nonblocking connect() is finished */
+static void td_replication_connect_event(event_id_t id, char mode,
+					 void *private)
+{
+	int socket_errno;
+	socklen_t socket_errno_size;
+	td_replication_connect_t *t = private;
+	int rc;
+	const char *log_prefix = t->log_prefix;
+
+	/* check to see if the connect succeeded */
+	socket_errno_size = sizeof(socket_errno);
+	if (getsockopt(t->fd, SOL_SOCKET, SO_ERROR,
+		       &socket_errno, &socket_errno_size)) {
+		EPRINTF("error getting socket errno\n");
+		return;
+	}
+
+	DPRINTF("socket connect returned %d\n", socket_errno);
+
+	if (socket_errno) {
+		/* the connect did not succeed */
+		if (check_connect_errno(socket_errno)) {
+			/*
+			 * we can probably assume that the backup is down.
+			 * just try again later
+			 */
+			rc = td_replication_retry_connect(t);
+			if (rc)
+				goto fail;
+
+			return;
+		} else {
+			EPRINTF("socket connect returned %d, giving up\n",
+				socket_errno);
+			rc = ERROR_CONNECTION;
+			goto fail;
+		}
+	}
+
+	td_replication_client_done(t);
+
+	return;
+
+fail:
+	td_replication_client_failed(t, rc);
+}
diff --git a/tools/blktap2/drivers/block-replication.h b/tools/blktap2/drivers/block-replication.h
new file mode 100644
index 0000000..0bd6e71
--- /dev/null
+++ b/tools/blktap2/drivers/block-replication.h
@@ -0,0 +1,113 @@
+/*
+ * Copyright (C) 2014 FUJITSU LIMITED
+ * Author: Wen Congyang <wency@cn.fujitsu.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; version 2.1 only. with the special
+ * exception on linking described in file LICENSE.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Lesser General Public License for more details.
+ */
+
+#ifndef BLOCK_REPLICATION_H
+#define BLOCK_REPLICATION_H
+
+#include "scheduler.h"
+#include <sys/socket.h>
+#include <netdb.h>
+
+#define CONTAINER_OF(inner_ptr, outer, member_name)			\
+	({								\
+		typeof(outer) *container_of_;				\
+		container_of_ = (void*)((char*)(inner_ptr) -		\
+				offsetof(typeof(outer), member_name));	\
+		(void)(&container_of_->member_name ==			\
+		       (typeof(inner_ptr))0) /* type check */;		\
+		container_of_;						\
+	})
+
+#define UNREGISTER_EVENT(id)					\
+	do {							\
+		if (id >= 0) {					\
+			tapdisk_server_unregister_event(id);	\
+			id = -1;				\
+		}						\
+	} while (0)
+#define CLOSE_FD(fd)			\
+	do {				\
+		if (fd >= 0) {		\
+			close(fd);	\
+			fd = -1;	\
+		}			\
+	} while (0)
+
+enum {
+	ERROR_INTERNAL = -1,
+	ERROR_IO = -2,
+	ERROR_CONNECTION = -3,
+	ERROR_CLOSE = -4,
+};
+
+typedef struct td_replication_connect td_replication_connect_t;
+typedef void td_replication_callback(td_replication_connect_t *r, int rc);
+
+struct td_replication_connect {
+	/*
+	 * caller must fill these in before calling
+	 * td_replication_connect_init()
+	 */
+	const char *log_prefix;
+	td_replication_callback *callback;
+	int retry_timeout_s;
+	int max_connections;
+	/*
+	 * The caller uses this fd to read/write after
+	 * the connection is established
+	 */
+	int fd;
+
+	/* private */
+	struct sockaddr_in sa;
+	int listen_fd;
+	event_id_t id;
+
+	int status;
+};
+
+/* return -errno if failure happened, otherwise return 0 */
+int td_replication_connect_init(td_replication_connect_t *t, const char *name);
+/*
+ * Return value:
+ *   -1: connection is closed or not connected
+ *    0: connection is in progress
+ *    1: connection is established
+ */
+int td_replication_connect_status(td_replication_connect_t *t);
+void td_replication_connect_kill(td_replication_connect_t *t);
+
+/*
+ * Return value:
+ *   -2: this caller should be client
+ *   -1: error
+ *    0: connection is in progress
+ */
+int td_replication_server_start(td_replication_connect_t *t);
+/*
+ * Return value:
+ *   -2: this caller should be client
+ *   -1: error
+ *    0: connection is in progress
+ */
+int td_replication_server_restart(td_replication_connect_t *t);
+/*
+ * Return value:
+ *   -1: error
+ *    0: connection is in progress
+ */
+int td_replication_client_start(td_replication_connect_t *t);
+
+#endif
-- 
1.9.3

  parent reply	other threads:[~2014-09-05  9:25 UTC|newest]

Thread overview: 37+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2014-09-05  9:25 [RFC Patch v3 00/22] COarse-grain LOck-stepping Virtual Machines for Non-stop Service Wen Congyang
2014-09-05  9:25 ` [RFC Patch v3 01/22] move remus related codes to libxl_remus.c Wen Congyang
2014-09-05  9:25 ` [RFC Patch v3 02/22] rename remus device to checkpoint device Wen Congyang
2014-09-05  9:25 ` [RFC Patch v3 03/22] adjust the indentation Wen Congyang
2014-09-05  9:25 ` [RFC Patch v3 04/22] don't touch remus in checkpoint_device Wen Congyang
2014-09-05  9:25 ` [RFC Patch v3 05/22] Update libxl_save_msgs_gen.pl to support return data from xl to xc Wen Congyang
2014-09-05  9:25 ` [RFC Patch v3 06/22] Allow slave sends data to master Wen Congyang
2014-09-05  9:25 ` [RFC Patch v3 07/22] secondary vm suspend/resume/checkpoint code Wen Congyang
2014-09-05  9:25 ` [RFC Patch v3 08/22] primary vm suspend/get_dirty_pfn/resume/checkpoint code Wen Congyang
2014-09-05  9:25 ` [RFC Patch v3 09/22] xc_domain_save: flush cache before calling callbacks->postcopy() in colo mode Wen Congyang
2014-09-05  9:25 ` [RFC Patch v3 10/22] COLO: xc related codes Wen Congyang
2014-09-05  9:25 ` [RFC Patch v3 11/22] send store mfn and console mfn to xl before resuming secondary vm Wen Congyang
2014-09-05  9:25 ` [RFC Patch v3 12/22] implement the cmdline for COLO Wen Congyang
2014-09-05  9:25 ` [RFC Patch v3 13/22] blktap2: connect to backup asynchronously Wen Congyang
2014-09-24 19:11   ` Shriram Rajagopalan
2014-09-25  5:40     ` Wen Congyang
2014-09-05  9:25 ` [RFC Patch v3 14/22] switch to unprotected mode before closing Wen Congyang
2014-09-05  9:25 ` Wen Congyang [this message]
2014-09-24 18:48   ` [RFC Patch v3 15/22] blktap2: move async connect related codes to block-replication.c Shriram Rajagopalan
2014-09-05  9:25 ` [RFC Patch v3 16/22] blktap2: move ramdisk " Wen Congyang
2014-09-24 18:44   ` Shriram Rajagopalan
2014-09-26  5:18     ` Wen Congyang
2014-09-05  9:25 ` [RFC Patch v3 17/22] block-colo: implement colo disk replication Wen Congyang
2014-09-05  9:25 ` [RFC Patch v3 18/22] support blktap COLO in xl: Wen Congyang
2014-09-05  9:25 ` [RFC Patch v3 19/22] libxl/colo: setup and control disk replication for blktap2 backends Wen Congyang
2014-09-05  9:25 ` [RFC Patch v3 20/22] setup and control colo-agent for primary vm Wen Congyang
2014-09-05  9:25 ` [RFC Patch v3 21/22] setup and control colo-agent for secondary vm Wen Congyang
2014-09-05  9:25 ` [RFC Patch v3 22/22] colo: cmdline switches and config vars to control colo-agent Wen Congyang
2014-09-05  9:25 ` [RFC Patch v3 23/22] Introduce "xen-load-devices-state" Wen Congyang
2014-09-05 21:57   ` Stefano Stabellini
2014-09-05 21:57   ` [Qemu-devel] [Xen-devel] " Stefano Stabellini
2014-09-09  2:47     ` Wen Congyang
2014-09-09  2:47     ` [Qemu-devel] [Xen-devel] " Wen Congyang
2014-09-10 19:15       ` Stefano Stabellini
2014-09-10 19:15       ` [Qemu-devel] [Xen-devel] " Stefano Stabellini
2014-09-11  5:03         ` Wen Congyang
2014-09-11  5:03         ` Wen Congyang

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1409909158-19243-16-git-send-email-wency@cn.fujitsu.com \
    --to=wency@cn.fujitsu.com \
    --cc=Ian.Campbell@citrix.com \
    --cc=Ian.Jackson@eu.citrix.com \
    --cc=eddie.dong@intel.com \
    --cc=laijs@cn.fujitsu.com \
    --cc=rshriram@cs.ubc.ca \
    --cc=xen-devel@lists.xen.org \
    --cc=yanghy@cn.fujitsu.com \
    --cc=yunhong.jiang@intel.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.