All of lore.kernel.org
 help / color / mirror / Atom feed
* [GIT PULL nf-next] IPVS
@ 2012-03-21  8:56 Simon Horman
  2012-03-21  8:56 ` [PATCH 1/9] ipvs: ignore IP_VS_CONN_F_NOOUTPUT in backup server Simon Horman
                   ` (8 more replies)
  0 siblings, 9 replies; 18+ messages in thread
From: Simon Horman @ 2012-03-21  8:56 UTC (permalink / raw)
  To: Pablo Neira Ayuso
  Cc: lvs-devel, netdev, netfilter-devel, Wensong Zhang, Julian Anastasov

please consider pulling
git://git.kernel.org/pub/scm/linux/kernel/git/horms/ipvs.git master
to get the following enhancements to IPVS connection synchronisation
from Julian and a minor clean-up from myself.

The following changes since commit ace30d73ef09fd5f95b24c5c1c5aa11963981494:

  netfilter: xt_LOG: add __printf() to sb_add() (2012-03-07 17:41:52 +0100)

are available in the git repository at:
  git://git.kernel.org/pub/scm/linux/kernel/git/horms/ipvs-next.git master

Julian Anastasov (8):
      ipvs: ignore IP_VS_CONN_F_NOOUTPUT in backup server
      ipvs: remove check for IP_VS_CONN_F_SYNC from ip_vs_bind_dest
      ipvs: fix ip_vs_try_bind_dest to rebind app and transmitter
      ipvs: always update some of the flags bits in backup
      ipvs: use adaptive pause in master thread
      ipvs: reduce sync rate with time thresholds
      ipvs: add support for sync threads
      ipvs: optimize the use of flags in ip_vs_bind_dest

Simon Horman (1):
      ipvs: Provide a generic ip_vs_bind_xmit()

 include/linux/ip_vs.h           |    5 +
 include/net/ip_vs.h             |   59 ++++-
 net/netfilter/ipvs/ip_vs_conn.c |   83 ++++--
 net/netfilter/ipvs/ip_vs_core.c |   30 +--
 net/netfilter/ipvs/ip_vs_ctl.c  |   54 ++++-
 net/netfilter/ipvs/ip_vs_sync.c |  585 +++++++++++++++++++++++++--------------
 6 files changed, 547 insertions(+), 269 deletions(-)

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [PATCH 1/9] ipvs: ignore IP_VS_CONN_F_NOOUTPUT in backup server
  2012-03-21  8:56 [GIT PULL nf-next] IPVS Simon Horman
@ 2012-03-21  8:56 ` Simon Horman
  2012-03-21  8:56 ` [PATCH 2/9] ipvs: remove check for IP_VS_CONN_F_SYNC from ip_vs_bind_dest Simon Horman
                   ` (7 subsequent siblings)
  8 siblings, 0 replies; 18+ messages in thread
From: Simon Horman @ 2012-03-21  8:56 UTC (permalink / raw)
  To: Pablo Neira Ayuso
  Cc: lvs-devel, netdev, netfilter-devel, Wensong Zhang,
	Julian Anastasov, Simon Horman

From: Julian Anastasov <ja@ssi.bg>

	As IP_VS_CONN_F_NOOUTPUT is derived from the
forwarding method we should get it from conn_flags just
like we do it for IP_VS_CONN_F_FWD_MASK bits when binding
to real server.

Signed-off-by: Julian Anastasov <ja@ssi.bg>
Signed-off-by: Simon Horman <horms@verge.net.au>
---
 net/netfilter/ipvs/ip_vs_conn.c |    2 +-
 1 files changed, 1 insertions(+), 1 deletions(-)

diff --git a/net/netfilter/ipvs/ip_vs_conn.c b/net/netfilter/ipvs/ip_vs_conn.c
index 29fa5ba..6413d58 100644
--- a/net/netfilter/ipvs/ip_vs_conn.c
+++ b/net/netfilter/ipvs/ip_vs_conn.c
@@ -567,7 +567,7 @@ ip_vs_bind_dest(struct ip_vs_conn *cp, struct ip_vs_dest *dest)
 		if (!(cp->flags & IP_VS_CONN_F_TEMPLATE))
 			conn_flags &= ~IP_VS_CONN_F_INACTIVE;
 		/* connections inherit forwarding method from dest */
-		cp->flags &= ~IP_VS_CONN_F_FWD_MASK;
+		cp->flags &= ~(IP_VS_CONN_F_FWD_MASK | IP_VS_CONN_F_NOOUTPUT);
 	}
 	cp->flags |= conn_flags;
 	cp->dest = dest;
-- 
1.7.6.3


^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH 2/9] ipvs: remove check for IP_VS_CONN_F_SYNC from ip_vs_bind_dest
  2012-03-21  8:56 [GIT PULL nf-next] IPVS Simon Horman
  2012-03-21  8:56 ` [PATCH 1/9] ipvs: ignore IP_VS_CONN_F_NOOUTPUT in backup server Simon Horman
@ 2012-03-21  8:56 ` Simon Horman
  2012-03-21  8:56 ` [PATCH 3/9] ipvs: fix ip_vs_try_bind_dest to rebind app and transmitter Simon Horman
                   ` (6 subsequent siblings)
  8 siblings, 0 replies; 18+ messages in thread
From: Simon Horman @ 2012-03-21  8:56 UTC (permalink / raw)
  To: Pablo Neira Ayuso
  Cc: lvs-devel, netdev, netfilter-devel, Wensong Zhang,
	Julian Anastasov, Simon Horman

From: Julian Anastasov <ja@ssi.bg>

	As the IP_VS_CONN_F_INACTIVE bit is properly set
in cp->flags for all kind of connections we do not need to
add special checks for synced connections when updating
the activeconns/inactconns counters for first time. Now
logic will look just like in ip_vs_unbind_dest.

Signed-off-by: Julian Anastasov <ja@ssi.bg>
Signed-off-by: Simon Horman <horms@verge.net.au>
---
 net/netfilter/ipvs/ip_vs_conn.c |    9 ++++-----
 1 files changed, 4 insertions(+), 5 deletions(-)

diff --git a/net/netfilter/ipvs/ip_vs_conn.c b/net/netfilter/ipvs/ip_vs_conn.c
index 6413d58..18f58a08 100644
--- a/net/netfilter/ipvs/ip_vs_conn.c
+++ b/net/netfilter/ipvs/ip_vs_conn.c
@@ -585,11 +585,10 @@ ip_vs_bind_dest(struct ip_vs_conn *cp, struct ip_vs_dest *dest)
 
 	/* Update the connection counters */
 	if (!(cp->flags & IP_VS_CONN_F_TEMPLATE)) {
-		/* It is a normal connection, so increase the inactive
-		   connection counter because it is in TCP SYNRECV
-		   state (inactive) or other protocol inacive state */
-		if ((cp->flags & IP_VS_CONN_F_SYNC) &&
-		    (!(cp->flags & IP_VS_CONN_F_INACTIVE)))
+		/* It is a normal connection, so modify the counters
+		 * according to the flags, later the protocol can
+		 * update them on state change */
+		if (!(cp->flags & IP_VS_CONN_F_INACTIVE))
 			atomic_inc(&dest->activeconns);
 		else
 			atomic_inc(&dest->inactconns);
-- 
1.7.6.3

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH 3/9] ipvs: fix ip_vs_try_bind_dest to rebind app and transmitter
  2012-03-21  8:56 [GIT PULL nf-next] IPVS Simon Horman
  2012-03-21  8:56 ` [PATCH 1/9] ipvs: ignore IP_VS_CONN_F_NOOUTPUT in backup server Simon Horman
  2012-03-21  8:56 ` [PATCH 2/9] ipvs: remove check for IP_VS_CONN_F_SYNC from ip_vs_bind_dest Simon Horman
@ 2012-03-21  8:56 ` Simon Horman
  2012-03-21  8:56 ` [PATCH 4/9] ipvs: always update some of the flags bits in backup Simon Horman
                   ` (5 subsequent siblings)
  8 siblings, 0 replies; 18+ messages in thread
From: Simon Horman @ 2012-03-21  8:56 UTC (permalink / raw)
  To: Pablo Neira Ayuso
  Cc: lvs-devel, netdev, netfilter-devel, Wensong Zhang,
	Julian Anastasov, Simon Horman

From: Julian Anastasov <ja@ssi.bg>

	Initially, when the synced connection is created we
use the forwarding method provided by master but once we
bind to destination it can be changed. As result, we must
update the application and the transmitter.

	As ip_vs_try_bind_dest is called always for connections
that require dest binding, there is no need to validate the
cp and dest pointers.

Signed-off-by: Julian Anastasov <ja@ssi.bg>
Signed-off-by: Simon Horman <horms@verge.net.au>
---
 net/netfilter/ipvs/ip_vs_conn.c |   33 ++++++++++++++++++++++++++-------
 1 files changed, 26 insertions(+), 7 deletions(-)

diff --git a/net/netfilter/ipvs/ip_vs_conn.c b/net/netfilter/ipvs/ip_vs_conn.c
index 18f58a08..a7c6bc8 100644
--- a/net/netfilter/ipvs/ip_vs_conn.c
+++ b/net/netfilter/ipvs/ip_vs_conn.c
@@ -612,14 +612,33 @@ struct ip_vs_dest *ip_vs_try_bind_dest(struct ip_vs_conn *cp)
 {
 	struct ip_vs_dest *dest;
 
-	if ((cp) && (!cp->dest)) {
-		dest = ip_vs_find_dest(ip_vs_conn_net(cp), cp->af, &cp->daddr,
-				       cp->dport, &cp->vaddr, cp->vport,
-				       cp->protocol, cp->fwmark, cp->flags);
+	dest = ip_vs_find_dest(ip_vs_conn_net(cp), cp->af, &cp->daddr,
+			       cp->dport, &cp->vaddr, cp->vport,
+			       cp->protocol, cp->fwmark, cp->flags);
+	if (dest) {
+		struct ip_vs_proto_data *pd;
+
+		/* Applications work depending on the forwarding method
+		 * but better to reassign them always when binding dest */
+		if (cp->app)
+			ip_vs_unbind_app(cp);
+
 		ip_vs_bind_dest(cp, dest);
-		return dest;
-	} else
-		return NULL;
+
+		/* Update its packet transmitter */
+		cp->packet_xmit = NULL;
+#ifdef CONFIG_IP_VS_IPV6
+		if (cp->af == AF_INET6)
+			ip_vs_bind_xmit_v6(cp);
+		else
+#endif
+			ip_vs_bind_xmit(cp);
+
+		pd = ip_vs_proto_data_get(ip_vs_conn_net(cp), cp->protocol);
+		if (pd && atomic_read(&pd->appcnt))
+			ip_vs_bind_app(cp, pd->pp);
+	}
+	return dest;
 }
 
 
-- 
1.7.6.3


^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH 4/9] ipvs: always update some of the flags bits in backup
  2012-03-21  8:56 [GIT PULL nf-next] IPVS Simon Horman
                   ` (2 preceding siblings ...)
  2012-03-21  8:56 ` [PATCH 3/9] ipvs: fix ip_vs_try_bind_dest to rebind app and transmitter Simon Horman
@ 2012-03-21  8:56 ` Simon Horman
  2012-03-21  8:56 ` [PATCH 5/9] ipvs: use adaptive pause in master thread Simon Horman
                   ` (4 subsequent siblings)
  8 siblings, 0 replies; 18+ messages in thread
From: Simon Horman @ 2012-03-21  8:56 UTC (permalink / raw)
  To: Pablo Neira Ayuso
  Cc: lvs-devel, netdev, netfilter-devel, Wensong Zhang,
	Julian Anastasov, Simon Horman

From: Julian Anastasov <ja@ssi.bg>

	As the goal is to mirror the inactconns/activeconns
counters in the backup server, make sure the cp->flags are
updated even if cp is still not bound to dest. If cp->flags
are not updated ip_vs_bind_dest will rely only on the initial
flags when updating the counters. To avoid mistakes and
complicated checks for protocol state rely only on the
IP_VS_CONN_F_INACTIVE bit when updating the counters.

Signed-off-by: Julian Anastasov <ja@ssi.bg>
Tested-by: Aleksey Chudov <aleksey.chudov@gmail.com>
Signed-off-by: Simon Horman <horms@verge.net.au>
---
 include/linux/ip_vs.h           |    5 +++
 net/netfilter/ipvs/ip_vs_sync.c |   65 ++++++++++++++-------------------------
 2 files changed, 28 insertions(+), 42 deletions(-)

diff --git a/include/linux/ip_vs.h b/include/linux/ip_vs.h
index 4deb383..ac31ef2 100644
--- a/include/linux/ip_vs.h
+++ b/include/linux/ip_vs.h
@@ -89,6 +89,7 @@
 #define IP_VS_CONN_F_TEMPLATE	0x1000		/* template, not connection */
 #define IP_VS_CONN_F_ONE_PACKET	0x2000		/* forward only one packet */
 
+/* Initial bits allowed in backup server */
 #define IP_VS_CONN_F_BACKUP_MASK (IP_VS_CONN_F_FWD_MASK | \
 				  IP_VS_CONN_F_NOOUTPUT | \
 				  IP_VS_CONN_F_INACTIVE | \
@@ -97,6 +98,10 @@
 				  IP_VS_CONN_F_TEMPLATE \
 				 )
 
+/* Bits allowed to update in backup server */
+#define IP_VS_CONN_F_BACKUP_UPD_MASK (IP_VS_CONN_F_INACTIVE | \
+				      IP_VS_CONN_F_SEQ_MASK)
+
 /* Flags that are not sent to backup server start from bit 16 */
 #define IP_VS_CONN_F_NFCT	(1 << 16)	/* use netfilter conntrack */
 
diff --git a/net/netfilter/ipvs/ip_vs_sync.c b/net/netfilter/ipvs/ip_vs_sync.c
index 8a0d6d6..0e36679 100644
--- a/net/netfilter/ipvs/ip_vs_sync.c
+++ b/net/netfilter/ipvs/ip_vs_sync.c
@@ -731,9 +731,30 @@ static void ip_vs_proc_conn(struct net *net, struct ip_vs_conn_param *param,
 	else
 		cp = ip_vs_ct_in_get(param);
 
-	if (cp && param->pe_data) 	/* Free pe_data */
+	if (cp) {
+		/* Free pe_data */
 		kfree(param->pe_data);
-	if (!cp) {
+
+		dest = cp->dest;
+		if ((cp->flags ^ flags) & IP_VS_CONN_F_INACTIVE &&
+		    !(flags & IP_VS_CONN_F_TEMPLATE) && dest) {
+			if (flags & IP_VS_CONN_F_INACTIVE) {
+				atomic_dec(&dest->activeconns);
+				atomic_inc(&dest->inactconns);
+			} else {
+				atomic_inc(&dest->activeconns);
+				atomic_dec(&dest->inactconns);
+			}
+		}
+		flags &= IP_VS_CONN_F_BACKUP_UPD_MASK;
+		flags |= cp->flags & ~IP_VS_CONN_F_BACKUP_UPD_MASK;
+		cp->flags = flags;
+		if (!dest) {
+			dest = ip_vs_try_bind_dest(cp);
+			if (dest)
+				atomic_dec(&dest->refcnt);
+		}
+	} else {
 		/*
 		 * Find the appropriate destination for the connection.
 		 * If it is not found the connection will remain unbound
@@ -742,18 +763,6 @@ static void ip_vs_proc_conn(struct net *net, struct ip_vs_conn_param *param,
 		dest = ip_vs_find_dest(net, type, daddr, dport, param->vaddr,
 				       param->vport, protocol, fwmark, flags);
 
-		/*  Set the approprite ativity flag */
-		if (protocol == IPPROTO_TCP) {
-			if (state != IP_VS_TCP_S_ESTABLISHED)
-				flags |= IP_VS_CONN_F_INACTIVE;
-			else
-				flags &= ~IP_VS_CONN_F_INACTIVE;
-		} else if (protocol == IPPROTO_SCTP) {
-			if (state != IP_VS_SCTP_S_ESTABLISHED)
-				flags |= IP_VS_CONN_F_INACTIVE;
-			else
-				flags &= ~IP_VS_CONN_F_INACTIVE;
-		}
 		cp = ip_vs_conn_new(param, daddr, dport, flags, dest, fwmark);
 		if (dest)
 			atomic_dec(&dest->refcnt);
@@ -763,34 +772,6 @@ static void ip_vs_proc_conn(struct net *net, struct ip_vs_conn_param *param,
 			IP_VS_DBG(2, "BACKUP, add new conn. failed\n");
 			return;
 		}
-	} else if (!cp->dest) {
-		dest = ip_vs_try_bind_dest(cp);
-		if (dest)
-			atomic_dec(&dest->refcnt);
-	} else if ((cp->dest) && (cp->protocol == IPPROTO_TCP) &&
-		(cp->state != state)) {
-		/* update active/inactive flag for the connection */
-		dest = cp->dest;
-		if (!(cp->flags & IP_VS_CONN_F_INACTIVE) &&
-			(state != IP_VS_TCP_S_ESTABLISHED)) {
-			atomic_dec(&dest->activeconns);
-			atomic_inc(&dest->inactconns);
-			cp->flags |= IP_VS_CONN_F_INACTIVE;
-		} else if ((cp->flags & IP_VS_CONN_F_INACTIVE) &&
-			(state == IP_VS_TCP_S_ESTABLISHED)) {
-			atomic_inc(&dest->activeconns);
-			atomic_dec(&dest->inactconns);
-			cp->flags &= ~IP_VS_CONN_F_INACTIVE;
-		}
-	} else if ((cp->dest) && (cp->protocol == IPPROTO_SCTP) &&
-		(cp->state != state)) {
-		dest = cp->dest;
-		if (!(cp->flags & IP_VS_CONN_F_INACTIVE) &&
-		(state != IP_VS_SCTP_S_ESTABLISHED)) {
-			atomic_dec(&dest->activeconns);
-			atomic_inc(&dest->inactconns);
-			cp->flags &= ~IP_VS_CONN_F_INACTIVE;
-		}
 	}
 
 	if (opt)
-- 
1.7.6.3


^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH 5/9] ipvs: use adaptive pause in master thread
  2012-03-21  8:56 [GIT PULL nf-next] IPVS Simon Horman
                   ` (3 preceding siblings ...)
  2012-03-21  8:56 ` [PATCH 4/9] ipvs: always update some of the flags bits in backup Simon Horman
@ 2012-03-21  8:56 ` Simon Horman
  2012-04-02 11:11   ` Pablo Neira Ayuso
  2012-03-21  8:56 ` [PATCH 6/9] ipvs: reduce sync rate with time thresholds Simon Horman
                   ` (3 subsequent siblings)
  8 siblings, 1 reply; 18+ messages in thread
From: Simon Horman @ 2012-03-21  8:56 UTC (permalink / raw)
  To: Pablo Neira Ayuso
  Cc: lvs-devel, netdev, netfilter-devel, Wensong Zhang,
	Julian Anastasov, Simon Horman

From: Julian Anastasov <ja@ssi.bg>

	High rate of sync messages in master can lead to
overflowing the socket buffer and dropping the messages.
Instead of using fixed pause try to adapt to the sync
rate, so that we do not wakeup too often while at the same
time staying below the socket buffer limit.

Signed-off-by: Julian Anastasov <ja@ssi.bg>
Tested-by: Aleksey Chudov <aleksey.chudov@gmail.com>
Signed-off-by: Simon Horman <horms@verge.net.au>
---
 net/netfilter/ipvs/ip_vs_sync.c |   60 +++++++++++++++++++++++++++++---------
 1 files changed, 46 insertions(+), 14 deletions(-)

diff --git a/net/netfilter/ipvs/ip_vs_sync.c b/net/netfilter/ipvs/ip_vs_sync.c
index 0e36679..9201c43 100644
--- a/net/netfilter/ipvs/ip_vs_sync.c
+++ b/net/netfilter/ipvs/ip_vs_sync.c
@@ -1392,18 +1392,22 @@ ip_vs_send_async(struct socket *sock, const char *buffer, const size_t length)
 	return len;
 }
 
-static void
+static int
 ip_vs_send_sync_msg(struct socket *sock, struct ip_vs_sync_mesg *msg)
 {
 	int msize;
+	int ret;
 
 	msize = msg->size;
 
 	/* Put size in network byte order */
 	msg->size = htons(msg->size);
 
-	if (ip_vs_send_async(sock, (char *)msg, msize) != msize)
-		pr_err("ip_vs_send_async error\n");
+	ret = ip_vs_send_async(sock, (char *)msg, msize);
+	if (ret >= 0 || ret == -EAGAIN)
+		return ret;
+	pr_err("ip_vs_send_async error %d\n", ret);
+	return 0;
 }
 
 static int
@@ -1428,33 +1432,61 @@ ip_vs_receive(struct socket *sock, char *buffer, const size_t buflen)
 	return len;
 }
 
+/* Get next buffer to send */
+static inline struct ip_vs_sync_buff *
+next_sync_buff(struct netns_ipvs *ipvs)
+{
+	struct ip_vs_sync_buff *sb;
+
+	sb = sb_dequeue(ipvs);
+	if (sb)
+		return sb;
+	/* Do not delay entries in buffer for more than 2 seconds */
+	return get_curr_sync_buff(ipvs, 2 * HZ);
+}
 
 static int sync_thread_master(void *data)
 {
 	struct ip_vs_sync_thread_data *tinfo = data;
 	struct netns_ipvs *ipvs = net_ipvs(tinfo->net);
-	struct ip_vs_sync_buff *sb;
+	struct ip_vs_sync_buff *sb = NULL;
+	int pause = HZ / 10;
 
 	pr_info("sync thread started: state = MASTER, mcast_ifn = %s, "
 		"syncid = %d\n",
 		ipvs->master_mcast_ifn, ipvs->master_syncid);
 
 	while (!kthread_should_stop()) {
-		while ((sb = sb_dequeue(ipvs))) {
-			ip_vs_send_sync_msg(tinfo->sock, sb->mesg);
-			ip_vs_sync_buff_release(sb);
-		}
+		int count = 0;
 
-		/* check if entries stay in ipvs->sync_buff for 2 seconds */
-		sb = get_curr_sync_buff(ipvs, 2 * HZ);
-		if (sb) {
-			ip_vs_send_sync_msg(tinfo->sock, sb->mesg);
-			ip_vs_sync_buff_release(sb);
+		for (;;) {
+			if (!sb) {
+				sb = next_sync_buff(ipvs);
+				if (!sb)
+					break;
+			}
+			if (ip_vs_send_sync_msg(tinfo->sock, sb->mesg) >= 0 ||
+			    pause == 1) {
+				ip_vs_sync_buff_release(sb);
+				sb = NULL;
+				count++;
+			} else {
+				pause = max(1, (pause >> 1));
+				break;
+			}
 		}
 
-		schedule_timeout_interruptible(HZ);
+		/* Max packets to send at once */
+		if (count > 200)
+			pause = max(1, (pause - HZ / 20));
+		else if (count < 20)
+			pause = min(HZ / 4, (pause + HZ / 20));
+		schedule_timeout_interruptible(sb ? 1 : pause);
 	}
 
+	if (sb)
+		ip_vs_sync_buff_release(sb);
+
 	/* clean up the sync_buff queue */
 	while ((sb = sb_dequeue(ipvs)))
 		ip_vs_sync_buff_release(sb);
-- 
1.7.6.3

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH 6/9] ipvs: reduce sync rate with time thresholds
  2012-03-21  8:56 [GIT PULL nf-next] IPVS Simon Horman
                   ` (4 preceding siblings ...)
  2012-03-21  8:56 ` [PATCH 5/9] ipvs: use adaptive pause in master thread Simon Horman
@ 2012-03-21  8:56 ` Simon Horman
  2012-03-21  8:56 ` [PATCH 7/9] ipvs: add support for sync threads Simon Horman
                   ` (2 subsequent siblings)
  8 siblings, 0 replies; 18+ messages in thread
From: Simon Horman @ 2012-03-21  8:56 UTC (permalink / raw)
  To: Pablo Neira Ayuso
  Cc: lvs-devel, netdev, netfilter-devel, Wensong Zhang,
	Julian Anastasov, Simon Horman

From: Julian Anastasov <ja@ssi.bg>

	Add two new sysctl vars to control the sync rate with the
main idea to reduce the rate for connection templates because
currently it depends on the packet rate for controlled connections.
This mechanism should be useful also for normal connections
with high traffic.

sync_refresh_period: in seconds, difference in reported connection
	timer that triggers new sync message. It can be used to
	avoid sync messages for the specified period (or half of
	the connection timeout if it is lower) if connection state
	is not changed from last sync.

sync_retries: integer, 0..3, defines sync retries with period of
	sync_refresh_period/8. Useful to protect against loss of
	sync messages.

	Allow sysctl_sync_threshold to be used with
sysctl_sync_period=0, so that only single sync message is sent
if sync_refresh_period is also 0.

	Add new field "sync_endtime" in connection structure to
hold the reported time when connection expires. The 2 lowest
bits will represent the retry count.

	As the sysctl_sync_period now can be 0 use ACCESS_ONCE to
avoid division by zero.

Special thanks to Aleksey Chudov for being patient with me,
for his extensive reports and helping in all tests.

Signed-off-by: Julian Anastasov <ja@ssi.bg>
Tested-by: Aleksey Chudov <aleksey.chudov@gmail.com>
Signed-off-by: Simon Horman <horms@verge.net.au>
---
 include/net/ip_vs.h             |   36 ++++++++++--
 net/netfilter/ipvs/ip_vs_conn.c |    7 ++-
 net/netfilter/ipvs/ip_vs_core.c |   30 +---------
 net/netfilter/ipvs/ip_vs_ctl.c  |   25 ++++++++-
 net/netfilter/ipvs/ip_vs_sync.c |  121 +++++++++++++++++++++++++++++++++-----
 5 files changed, 168 insertions(+), 51 deletions(-)

diff --git a/include/net/ip_vs.h b/include/net/ip_vs.h
index ebe517f..8bee5d0 100644
--- a/include/net/ip_vs.h
+++ b/include/net/ip_vs.h
@@ -504,6 +504,7 @@ struct ip_vs_conn {
 						 * state transition triggerd
 						 * synchronization
 						 */
+	unsigned long		sync_endtime;	/* jiffies + sent_retries */
 
 	/* Control members */
 	struct ip_vs_conn       *control;       /* Master control connection */
@@ -873,6 +874,8 @@ struct netns_ipvs {
 	int			sysctl_expire_nodest_conn;
 	int			sysctl_expire_quiescent_template;
 	int			sysctl_sync_threshold[2];
+	unsigned int		sysctl_sync_refresh_period;
+	int			sysctl_sync_retries;
 	int			sysctl_nat_icmp_send;
 
 	/* ip_vs_lblc */
@@ -908,9 +911,12 @@ struct netns_ipvs {
 	struct net		*net;            /* Needed by timer routines */
 };
 
-#define DEFAULT_SYNC_THRESHOLD	3
-#define DEFAULT_SYNC_PERIOD	50
-#define DEFAULT_SYNC_VER	1
+#define DEFAULT_SYNC_THRESHOLD		3
+#define DEFAULT_SYNC_PERIOD		50
+#define DEFAULT_SYNC_REFRESH_PERIOD	(0U * HZ)
+#define DEFAULT_SYNC_RETRIES		0
+#define DEFAULT_SYNC_VER		1
+#define IPVS_SYNC_FLUSH_TIME		(HZ * 2)
 
 #ifdef CONFIG_SYSCTL
 
@@ -921,7 +927,17 @@ static inline int sysctl_sync_threshold(struct netns_ipvs *ipvs)
 
 static inline int sysctl_sync_period(struct netns_ipvs *ipvs)
 {
-	return ipvs->sysctl_sync_threshold[1];
+	return ACCESS_ONCE(ipvs->sysctl_sync_threshold[1]);
+}
+
+static inline unsigned int sysctl_sync_refresh_period(struct netns_ipvs *ipvs)
+{
+	return ACCESS_ONCE(ipvs->sysctl_sync_refresh_period);
+}
+
+static inline int sysctl_sync_retries(struct netns_ipvs *ipvs)
+{
+	return ipvs->sysctl_sync_retries;
 }
 
 static inline int sysctl_sync_ver(struct netns_ipvs *ipvs)
@@ -941,6 +957,16 @@ static inline int sysctl_sync_period(struct netns_ipvs *ipvs)
 	return DEFAULT_SYNC_PERIOD;
 }
 
+static inline unsigned int sysctl_sync_refresh_period(struct netns_ipvs *ipvs)
+{
+	return DEFAULT_SYNC_REFRESH_PERIOD;
+}
+
+static inline int sysctl_sync_retries(struct netns_ipvs *ipvs)
+{
+	return DEFAULT_SYNC_RETRIES & 3;
+}
+
 static inline int sysctl_sync_ver(struct netns_ipvs *ipvs)
 {
 	return DEFAULT_SYNC_VER;
@@ -1218,7 +1244,7 @@ extern struct ip_vs_dest *ip_vs_try_bind_dest(struct ip_vs_conn *cp);
 extern int start_sync_thread(struct net *net, int state, char *mcast_ifn,
 			     __u8 syncid);
 extern int stop_sync_thread(struct net *net, int state);
-extern void ip_vs_sync_conn(struct net *net, struct ip_vs_conn *cp);
+extern void ip_vs_sync_conn(struct net *net, struct ip_vs_conn *cp, int pkts);
 
 
 /*
diff --git a/net/netfilter/ipvs/ip_vs_conn.c b/net/netfilter/ipvs/ip_vs_conn.c
index a7c6bc8..126971c 100644
--- a/net/netfilter/ipvs/ip_vs_conn.c
+++ b/net/netfilter/ipvs/ip_vs_conn.c
@@ -761,7 +761,8 @@ int ip_vs_check_template(struct ip_vs_conn *ct)
 static void ip_vs_conn_expire(unsigned long data)
 {
 	struct ip_vs_conn *cp = (struct ip_vs_conn *)data;
-	struct netns_ipvs *ipvs = net_ipvs(ip_vs_conn_net(cp));
+	struct net *net = ip_vs_conn_net(cp);
+	struct netns_ipvs *ipvs = net_ipvs(net);
 
 	cp->timeout = 60*HZ;
 
@@ -826,6 +827,9 @@ static void ip_vs_conn_expire(unsigned long data)
 		  atomic_read(&cp->refcnt)-1,
 		  atomic_read(&cp->n_control));
 
+	if (ipvs->sync_state & IP_VS_STATE_MASTER)
+		ip_vs_sync_conn(net, cp, sysctl_sync_threshold(ipvs));
+
 	ip_vs_conn_put(cp);
 }
 
@@ -899,6 +903,7 @@ ip_vs_conn_new(const struct ip_vs_conn_param *p,
 	/* Set its state and timeout */
 	cp->state = 0;
 	cp->timeout = 3*HZ;
+	cp->sync_endtime = jiffies & ~3UL;
 
 	/* Bind its packet transmitter */
 #ifdef CONFIG_IP_VS_IPV6
diff --git a/net/netfilter/ipvs/ip_vs_core.c b/net/netfilter/ipvs/ip_vs_core.c
index 2555816..b3686d8 100644
--- a/net/netfilter/ipvs/ip_vs_core.c
+++ b/net/netfilter/ipvs/ip_vs_core.c
@@ -1613,34 +1613,8 @@ ip_vs_in(unsigned int hooknum, struct sk_buff *skb, int af)
 	else
 		pkts = atomic_add_return(1, &cp->in_pkts);
 
-	if ((ipvs->sync_state & IP_VS_STATE_MASTER) &&
-	    cp->protocol == IPPROTO_SCTP) {
-		if ((cp->state == IP_VS_SCTP_S_ESTABLISHED &&
-			(pkts % sysctl_sync_period(ipvs)
-			 == sysctl_sync_threshold(ipvs))) ||
-				(cp->old_state != cp->state &&
-				 ((cp->state == IP_VS_SCTP_S_CLOSED) ||
-				  (cp->state == IP_VS_SCTP_S_SHUT_ACK_CLI) ||
-				  (cp->state == IP_VS_SCTP_S_SHUT_ACK_SER)))) {
-			ip_vs_sync_conn(net, cp);
-			goto out;
-		}
-	}
-
-	/* Keep this block last: TCP and others with pp->num_states <= 1 */
-	else if ((ipvs->sync_state & IP_VS_STATE_MASTER) &&
-	    (((cp->protocol != IPPROTO_TCP ||
-	       cp->state == IP_VS_TCP_S_ESTABLISHED) &&
-	      (pkts % sysctl_sync_period(ipvs)
-	       == sysctl_sync_threshold(ipvs))) ||
-	     ((cp->protocol == IPPROTO_TCP) && (cp->old_state != cp->state) &&
-	      ((cp->state == IP_VS_TCP_S_FIN_WAIT) ||
-	       (cp->state == IP_VS_TCP_S_CLOSE) ||
-	       (cp->state == IP_VS_TCP_S_CLOSE_WAIT) ||
-	       (cp->state == IP_VS_TCP_S_TIME_WAIT)))))
-		ip_vs_sync_conn(net, cp);
-out:
-	cp->old_state = cp->state;
+	if (ipvs->sync_state & IP_VS_STATE_MASTER)
+		ip_vs_sync_conn(net, cp, pkts);
 
 	ip_vs_conn_put(cp);
 	return ret;
diff --git a/net/netfilter/ipvs/ip_vs_ctl.c b/net/netfilter/ipvs/ip_vs_ctl.c
index b3afe18..da922c7 100644
--- a/net/netfilter/ipvs/ip_vs_ctl.c
+++ b/net/netfilter/ipvs/ip_vs_ctl.c
@@ -1599,6 +1599,10 @@ static int ip_vs_zero_all(struct net *net)
 }
 
 #ifdef CONFIG_SYSCTL
+
+static int zero;
+static int three = 3;
+
 static int
 proc_do_defense_mode(ctl_table *table, int write,
 		     void __user *buffer, size_t *lenp, loff_t *ppos)
@@ -1632,7 +1636,8 @@ proc_do_sync_threshold(ctl_table *table, int write,
 	memcpy(val, valp, sizeof(val));
 
 	rc = proc_dointvec(table, write, buffer, lenp, ppos);
-	if (write && (valp[0] < 0 || valp[1] < 0 || valp[0] >= valp[1])) {
+	if (write && (valp[0] < 0 || valp[1] < 0 ||
+	    (valp[0] >= valp[1] && valp[1]))) {
 		/* Restore the correct value */
 		memcpy(valp, val, sizeof(val));
 	}
@@ -1743,6 +1748,20 @@ static struct ctl_table vs_vars[] = {
 		.proc_handler	= proc_do_sync_threshold,
 	},
 	{
+		.procname	= "sync_refresh_period",
+		.maxlen		= sizeof(int),
+		.mode		= 0644,
+		.proc_handler	= proc_dointvec_jiffies,
+	},
+	{
+		.procname	= "sync_retries",
+		.maxlen		= sizeof(int),
+		.mode		= 0644,
+		.proc_handler	= proc_dointvec_minmax,
+		.extra1		= &zero,
+		.extra2		= &three,
+	},
+	{
 		.procname	= "nat_icmp_send",
 		.maxlen		= sizeof(int),
 		.mode		= 0644,
@@ -3661,6 +3680,10 @@ int __net_init ip_vs_control_net_init_sysctl(struct net *net)
 	ipvs->sysctl_sync_threshold[1] = DEFAULT_SYNC_PERIOD;
 	tbl[idx].data = &ipvs->sysctl_sync_threshold;
 	tbl[idx++].maxlen = sizeof(ipvs->sysctl_sync_threshold);
+	ipvs->sysctl_sync_refresh_period = DEFAULT_SYNC_REFRESH_PERIOD;
+	tbl[idx++].data = &ipvs->sysctl_sync_refresh_period;
+	ipvs->sysctl_sync_retries = clamp_t(int, DEFAULT_SYNC_RETRIES, 0, 3);
+	tbl[idx++].data = &ipvs->sysctl_sync_retries;
 	tbl[idx++].data = &ipvs->sysctl_nat_icmp_send;
 
 
diff --git a/net/netfilter/ipvs/ip_vs_sync.c b/net/netfilter/ipvs/ip_vs_sync.c
index 9201c43..8ee8a57 100644
--- a/net/netfilter/ipvs/ip_vs_sync.c
+++ b/net/netfilter/ipvs/ip_vs_sync.c
@@ -442,11 +442,94 @@ ip_vs_sync_buff_create_v0(struct netns_ipvs *ipvs)
 	return sb;
 }
 
+/* Check if conn should be synced.
+ * pkts: conn packets, use sysctl_sync_threshold to avoid packet check
+ * - (1) sync_refresh_period: reduce sync rate. Additionally, retry
+ *	sync_retries times with period of sync_refresh_period/8
+ * - (2) if both sync_refresh_period and sync_period are 0 send sync only
+ *	for state changes or only once when pkts matches sync_threshold
+ * - (3) templates: rate can be reduced only with sync_refresh_period or
+ *	with (2)
+ */
+static int ip_vs_sync_conn_needed(struct netns_ipvs *ipvs,
+				  struct ip_vs_conn *cp, int pkts)
+{
+	unsigned long orig = ACCESS_ONCE(cp->sync_endtime);
+	unsigned long now = jiffies;
+	unsigned long n = (now + cp->timeout) & ~3UL;
+	unsigned int sync_refresh_period;
+	int sync_period;
+	int force;
+
+	/* Check if we sync in current state */
+	if (unlikely(cp->flags & IP_VS_CONN_F_TEMPLATE))
+		force = 0;
+	else if (likely(cp->protocol == IPPROTO_TCP)) {
+		if (!((1 << cp->state) &
+		      ((1 << IP_VS_TCP_S_ESTABLISHED) |
+		       (1 << IP_VS_TCP_S_FIN_WAIT) |
+		       (1 << IP_VS_TCP_S_CLOSE) |
+		       (1 << IP_VS_TCP_S_CLOSE_WAIT) |
+		       (1 << IP_VS_TCP_S_TIME_WAIT))))
+			return 0;
+		force = cp->state != cp->old_state;
+		if (force && cp->state != IP_VS_TCP_S_ESTABLISHED)
+			goto set;
+	} else if (unlikely(cp->protocol == IPPROTO_SCTP)) {
+		if (!((1 << cp->state) &
+		      ((1 << IP_VS_SCTP_S_ESTABLISHED) |
+		       (1 << IP_VS_SCTP_S_CLOSED) |
+		       (1 << IP_VS_SCTP_S_SHUT_ACK_CLI) |
+		       (1 << IP_VS_SCTP_S_SHUT_ACK_SER))))
+			return 0;
+		force = cp->state != cp->old_state;
+		if (force && cp->state != IP_VS_SCTP_S_ESTABLISHED)
+			goto set;
+	} else {
+		/* UDP or another protocol with single state */
+		force = 0;
+	}
+
+	sync_refresh_period = sysctl_sync_refresh_period(ipvs);
+	if (sync_refresh_period > 0) {
+		long diff = n - orig;
+		long min_diff = max(cp->timeout >> 1, 10UL * HZ);
+
+		/* Avoid sync if difference is below sync_refresh_period
+		 * and below the half timeout.
+		 */
+		if (abs(diff) < min_t(long, sync_refresh_period, min_diff)) {
+			int retries = orig & 3;
+
+			if (retries >= sysctl_sync_retries(ipvs))
+				return 0;
+			if (time_before(now, orig - cp->timeout +
+					(sync_refresh_period >> 3)))
+				return 0;
+			n |= retries + 1;
+		}
+	}
+	sync_period = sysctl_sync_period(ipvs);
+	if (sync_period > 0) {
+		if (!(cp->flags & IP_VS_CONN_F_TEMPLATE) &&
+		    pkts % sync_period != sysctl_sync_threshold(ipvs))
+			return 0;
+	} else if (sync_refresh_period <= 0 &&
+		   pkts != sysctl_sync_threshold(ipvs))
+		return 0;
+
+set:
+	cp->old_state = cp->state;
+	n = cmpxchg(&cp->sync_endtime, orig, n);
+	return n == orig || force;
+}
+
 /*
  *      Version 0 , could be switched in by sys_ctl.
  *      Add an ip_vs_conn information into the current sync_buff.
  */
-void ip_vs_sync_conn_v0(struct net *net, struct ip_vs_conn *cp)
+static void ip_vs_sync_conn_v0(struct net *net, struct ip_vs_conn *cp,
+			       int pkts)
 {
 	struct netns_ipvs *ipvs = net_ipvs(net);
 	struct ip_vs_sync_mesg_v0 *m;
@@ -459,6 +542,9 @@ void ip_vs_sync_conn_v0(struct net *net, struct ip_vs_conn *cp)
 	if (cp->flags & IP_VS_CONN_F_ONE_PACKET)
 		return;
 
+	if (!ip_vs_sync_conn_needed(ipvs, cp, pkts))
+		return;
+
 	spin_lock(&ipvs->sync_buff_lock);
 	if (!ipvs->sync_buff) {
 		ipvs->sync_buff =
@@ -504,8 +590,14 @@ void ip_vs_sync_conn_v0(struct net *net, struct ip_vs_conn *cp)
 	spin_unlock(&ipvs->sync_buff_lock);
 
 	/* synchronize its controller if it has */
-	if (cp->control)
-		ip_vs_sync_conn(net, cp->control);
+	cp = cp->control;
+	if (cp) {
+		if (cp->flags & IP_VS_CONN_F_TEMPLATE)
+			pkts = atomic_add_return(1, &cp->in_pkts);
+		else
+			pkts = sysctl_sync_threshold(ipvs);
+		ip_vs_sync_conn(net, cp->control, pkts);
+	}
 }
 
 /*
@@ -513,7 +605,7 @@ void ip_vs_sync_conn_v0(struct net *net, struct ip_vs_conn *cp)
  *      Called by ip_vs_in.
  *      Sending Version 1 messages
  */
-void ip_vs_sync_conn(struct net *net, struct ip_vs_conn *cp)
+void ip_vs_sync_conn(struct net *net, struct ip_vs_conn *cp, int pkts)
 {
 	struct netns_ipvs *ipvs = net_ipvs(net);
 	struct ip_vs_sync_mesg *m;
@@ -523,13 +615,16 @@ void ip_vs_sync_conn(struct net *net, struct ip_vs_conn *cp)
 
 	/* Handle old version of the protocol */
 	if (sysctl_sync_ver(ipvs) == 0) {
-		ip_vs_sync_conn_v0(net, cp);
+		ip_vs_sync_conn_v0(net, cp, pkts);
 		return;
 	}
 	/* Do not sync ONE PACKET */
 	if (cp->flags & IP_VS_CONN_F_ONE_PACKET)
 		goto control;
 sloop:
+	if (!ip_vs_sync_conn_needed(ipvs, cp, pkts))
+		goto control;
+
 	/* Sanity checks */
 	pe_name_len = 0;
 	if (cp->pe_data_len) {
@@ -644,16 +739,10 @@ control:
 	cp = cp->control;
 	if (!cp)
 		return;
-	/*
-	 * Reduce sync rate for templates
-	 * i.e only increment in_pkts for Templates.
-	 */
-	if (cp->flags & IP_VS_CONN_F_TEMPLATE) {
-		int pkts = atomic_add_return(1, &cp->in_pkts);
-
-		if (pkts % sysctl_sync_period(ipvs) != 1)
-			return;
-	}
+	if (cp->flags & IP_VS_CONN_F_TEMPLATE)
+		pkts = atomic_add_return(1, &cp->in_pkts);
+	else
+		pkts = sysctl_sync_threshold(ipvs);
 	goto sloop;
 }
 
@@ -1442,7 +1531,7 @@ next_sync_buff(struct netns_ipvs *ipvs)
 	if (sb)
 		return sb;
 	/* Do not delay entries in buffer for more than 2 seconds */
-	return get_curr_sync_buff(ipvs, 2 * HZ);
+	return get_curr_sync_buff(ipvs, IPVS_SYNC_FLUSH_TIME);
 }
 
 static int sync_thread_master(void *data)
-- 
1.7.6.3

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH 7/9] ipvs: add support for sync threads
  2012-03-21  8:56 [GIT PULL nf-next] IPVS Simon Horman
                   ` (5 preceding siblings ...)
  2012-03-21  8:56 ` [PATCH 6/9] ipvs: reduce sync rate with time thresholds Simon Horman
@ 2012-03-21  8:56 ` Simon Horman
  2012-03-21  8:56 ` [PATCH 8/9] ipvs: optimize the use of flags in ip_vs_bind_dest Simon Horman
  2012-03-21  8:56 ` [PATCH 9/9] ipvs: Provide a generic ip_vs_bind_xmit() Simon Horman
  8 siblings, 0 replies; 18+ messages in thread
From: Simon Horman @ 2012-03-21  8:56 UTC (permalink / raw)
  To: Pablo Neira Ayuso
  Cc: lvs-devel, netdev, netfilter-devel, Wensong Zhang,
	Julian Anastasov, Simon Horman

From: Julian Anastasov <ja@ssi.bg>

	Allow master and backup servers to use many threads
for sync traffic. Add sysctl var "sync_ports" to define the
number of threads. Every thread will use single UDP port,
thread 0 will use the default port 8848 while last thread
will use port 8848+sync_ports-1.

	The sync traffic for connections is scheduled to many
master threads based on the cp address but one connection is
always assigned to same thread to avoid reordering of the
sync messages.

	Remove ip_vs_sync_switch_mode because this check
for sync mode change is still risky. Instead, check for mode
change under sync_buff_lock.

	Make sure the backup socks do not block on reading.

Special thanks to Aleksey Chudov for helping in all tests.

Signed-off-by: Julian Anastasov <ja@ssi.bg>
Tested-by: Aleksey Chudov <aleksey.chudov@gmail.com>
Signed-off-by: Simon Horman <horms@verge.net.au>
---
 include/net/ip_vs.h             |   23 ++-
 net/netfilter/ipvs/ip_vs_conn.c |    7 +
 net/netfilter/ipvs/ip_vs_ctl.c  |   29 +++-
 net/netfilter/ipvs/ip_vs_sync.c |  349 ++++++++++++++++++++++++---------------
 4 files changed, 265 insertions(+), 143 deletions(-)

diff --git a/include/net/ip_vs.h b/include/net/ip_vs.h
index 8bee5d0..082e12d 100644
--- a/include/net/ip_vs.h
+++ b/include/net/ip_vs.h
@@ -870,6 +870,7 @@ struct netns_ipvs {
 #endif
 	int			sysctl_snat_reroute;
 	int			sysctl_sync_ver;
+	int			sysctl_sync_ports;
 	int			sysctl_cache_bypass;
 	int			sysctl_expire_nodest_conn;
 	int			sysctl_expire_quiescent_template;
@@ -891,13 +892,13 @@ struct netns_ipvs {
 	spinlock_t		est_lock;
 	struct timer_list	est_timer;	/* Estimation timer */
 	/* ip_vs_sync */
-	struct list_head	sync_queue;
+	struct list_head	*sync_queues;
 	spinlock_t		sync_lock;
-	struct ip_vs_sync_buff  *sync_buff;
+	struct ip_vs_sync_buff  **sync_buffs;
 	spinlock_t		sync_buff_lock;
-	struct sockaddr_in	sync_mcast_addr;
-	struct task_struct	*master_thread;
-	struct task_struct	*backup_thread;
+	struct task_struct	**master_threads;
+	struct task_struct	**backup_threads;
+	int			threads_mask;
 	int			send_mesg_maxlen;
 	int			recv_mesg_maxlen;
 	volatile int		sync_state;
@@ -917,6 +918,7 @@ struct netns_ipvs {
 #define DEFAULT_SYNC_RETRIES		0
 #define DEFAULT_SYNC_VER		1
 #define IPVS_SYNC_FLUSH_TIME		(HZ * 2)
+#define IPVS_SYNC_PORTS_MAX		(1 << 6)
 
 #ifdef CONFIG_SYSCTL
 
@@ -945,6 +947,11 @@ static inline int sysctl_sync_ver(struct netns_ipvs *ipvs)
 	return ipvs->sysctl_sync_ver;
 }
 
+static inline int sysctl_sync_ports(struct netns_ipvs *ipvs)
+{
+	return ACCESS_ONCE(ipvs->sysctl_sync_ports);
+}
+
 #else
 
 static inline int sysctl_sync_threshold(struct netns_ipvs *ipvs)
@@ -972,6 +979,11 @@ static inline int sysctl_sync_ver(struct netns_ipvs *ipvs)
 	return DEFAULT_SYNC_VER;
 }
 
+static inline int sysctl_sync_ports(struct netns_ipvs *ipvs)
+{
+	return 1;
+}
+
 #endif
 
 /*
@@ -1212,7 +1224,6 @@ extern struct ip_vs_stats ip_vs_stats;
 extern const struct ctl_path net_vs_ctl_path[];
 extern int sysctl_ip_vs_sync_ver;
 
-extern void ip_vs_sync_switch_mode(struct net *net, int mode);
 extern struct ip_vs_service *
 ip_vs_service_get(struct net *net, int af, __u32 fwmark, __u16 protocol,
 		  const union nf_inet_addr *vaddr, __be16 vport);
diff --git a/net/netfilter/ipvs/ip_vs_conn.c b/net/netfilter/ipvs/ip_vs_conn.c
index 126971c..7478b66 100644
--- a/net/netfilter/ipvs/ip_vs_conn.c
+++ b/net/netfilter/ipvs/ip_vs_conn.c
@@ -618,12 +618,19 @@ struct ip_vs_dest *ip_vs_try_bind_dest(struct ip_vs_conn *cp)
 	if (dest) {
 		struct ip_vs_proto_data *pd;
 
+		spin_lock(&cp->lock);
+		if (cp->dest) {
+			spin_unlock(&cp->lock);
+			return dest;
+		}
+
 		/* Applications work depending on the forwarding method
 		 * but better to reassign them always when binding dest */
 		if (cp->app)
 			ip_vs_unbind_app(cp);
 
 		ip_vs_bind_dest(cp, dest);
+		spin_unlock(&cp->lock);
 
 		/* Update its packet transmitter */
 		cp->packet_xmit = NULL;
diff --git a/net/netfilter/ipvs/ip_vs_ctl.c b/net/netfilter/ipvs/ip_vs_ctl.c
index da922c7..6c532d6 100644
--- a/net/netfilter/ipvs/ip_vs_ctl.c
+++ b/net/netfilter/ipvs/ip_vs_ctl.c
@@ -1657,9 +1657,24 @@ proc_do_sync_mode(ctl_table *table, int write,
 		if ((*valp < 0) || (*valp > 1)) {
 			/* Restore the correct value */
 			*valp = val;
-		} else {
-			struct net *net = current->nsproxy->net_ns;
-			ip_vs_sync_switch_mode(net, val);
+		}
+	}
+	return rc;
+}
+
+static int
+proc_do_sync_ports(ctl_table *table, int write,
+		   void __user *buffer, size_t *lenp, loff_t *ppos)
+{
+	int *valp = table->data;
+	int val = *valp;
+	int rc;
+
+	rc = proc_dointvec(table, write, buffer, lenp, ppos);
+	if (write && (*valp != val)) {
+		if (*valp < 1 || !is_power_of_2(*valp)) {
+			/* Restore the correct value */
+			*valp = val;
 		}
 	}
 	return rc;
@@ -1723,6 +1738,12 @@ static struct ctl_table vs_vars[] = {
 		.proc_handler	= &proc_do_sync_mode,
 	},
 	{
+		.procname	= "sync_ports",
+		.maxlen		= sizeof(int),
+		.mode		= 0644,
+		.proc_handler	= &proc_do_sync_ports,
+	},
+	{
 		.procname	= "cache_bypass",
 		.maxlen		= sizeof(int),
 		.mode		= 0644,
@@ -3673,6 +3694,8 @@ int __net_init ip_vs_control_net_init_sysctl(struct net *net)
 	tbl[idx++].data = &ipvs->sysctl_snat_reroute;
 	ipvs->sysctl_sync_ver = 1;
 	tbl[idx++].data = &ipvs->sysctl_sync_ver;
+	ipvs->sysctl_sync_ports = 1;
+	tbl[idx++].data = &ipvs->sysctl_sync_ports;
 	tbl[idx++].data = &ipvs->sysctl_cache_bypass;
 	tbl[idx++].data = &ipvs->sysctl_expire_nodest_conn;
 	tbl[idx++].data = &ipvs->sysctl_expire_quiescent_template;
diff --git a/net/netfilter/ipvs/ip_vs_sync.c b/net/netfilter/ipvs/ip_vs_sync.c
index 8ee8a57..33f4cf4 100644
--- a/net/netfilter/ipvs/ip_vs_sync.c
+++ b/net/netfilter/ipvs/ip_vs_sync.c
@@ -196,6 +196,7 @@ struct ip_vs_sync_thread_data {
 	struct net *net;
 	struct socket *sock;
 	char *buf;
+	int id;
 };
 
 /* Version 0 definition of packet sizes */
@@ -271,13 +272,6 @@ struct ip_vs_sync_buff {
 	unsigned char           *end;
 };
 
-/* multicast addr */
-static struct sockaddr_in mcast_addr = {
-	.sin_family		= AF_INET,
-	.sin_port		= cpu_to_be16(IP_VS_SYNC_PORT),
-	.sin_addr.s_addr	= cpu_to_be32(IP_VS_SYNC_GROUP),
-};
-
 /*
  * Copy of struct ip_vs_seq
  * From unaligned network order to aligned host order
@@ -300,15 +294,16 @@ static void hton_seq(struct ip_vs_seq *ho, struct ip_vs_seq *no)
 	put_unaligned_be32(ho->previous_delta, &no->previous_delta);
 }
 
-static inline struct ip_vs_sync_buff *sb_dequeue(struct netns_ipvs *ipvs)
+static inline struct ip_vs_sync_buff *sb_dequeue(struct netns_ipvs *ipvs,
+						 int id)
 {
 	struct ip_vs_sync_buff *sb;
 
 	spin_lock_bh(&ipvs->sync_lock);
-	if (list_empty(&ipvs->sync_queue)) {
+	if (list_empty(&ipvs->sync_queues[id])) {
 		sb = NULL;
 	} else {
-		sb = list_entry(ipvs->sync_queue.next,
+		sb = list_entry(ipvs->sync_queues[id].next,
 				struct ip_vs_sync_buff,
 				list);
 		list_del(&sb->list);
@@ -334,7 +329,7 @@ ip_vs_sync_buff_create(struct netns_ipvs *ipvs)
 		kfree(sb);
 		return NULL;
 	}
-	sb->mesg->reserved = 0;  /* old nr_conns i.e. must be zeo now */
+	sb->mesg->reserved = 0;  /* old nr_conns i.e. must be zero now */
 	sb->mesg->version = SYNC_PROTO_VER;
 	sb->mesg->syncid = ipvs->master_syncid;
 	sb->mesg->size = sizeof(struct ip_vs_sync_mesg);
@@ -353,13 +348,13 @@ static inline void ip_vs_sync_buff_release(struct ip_vs_sync_buff *sb)
 	kfree(sb);
 }
 
-static inline void sb_queue_tail(struct netns_ipvs *ipvs)
+static inline void sb_queue_tail(struct netns_ipvs *ipvs, int id)
 {
-	struct ip_vs_sync_buff *sb = ipvs->sync_buff;
+	struct ip_vs_sync_buff *sb = ipvs->sync_buffs[id];
 
 	spin_lock(&ipvs->sync_lock);
 	if (ipvs->sync_state & IP_VS_STATE_MASTER)
-		list_add_tail(&sb->list, &ipvs->sync_queue);
+		list_add_tail(&sb->list, &ipvs->sync_queues[id]);
 	else
 		ip_vs_sync_buff_release(sb);
 	spin_unlock(&ipvs->sync_lock);
@@ -370,49 +365,25 @@ static inline void sb_queue_tail(struct netns_ipvs *ipvs)
  *	than the specified time or the specified time is zero.
  */
 static inline struct ip_vs_sync_buff *
-get_curr_sync_buff(struct netns_ipvs *ipvs, unsigned long time)
+get_curr_sync_buff(struct netns_ipvs *ipvs, int id, unsigned long time)
 {
 	struct ip_vs_sync_buff *sb;
 
 	spin_lock_bh(&ipvs->sync_buff_lock);
-	if (ipvs->sync_buff &&
-	    time_after_eq(jiffies - ipvs->sync_buff->firstuse, time)) {
-		sb = ipvs->sync_buff;
-		ipvs->sync_buff = NULL;
+	if (ipvs->sync_buffs[id] &&
+	    time_after_eq(jiffies - ipvs->sync_buffs[id]->firstuse, time)) {
+		sb = ipvs->sync_buffs[id];
+		ipvs->sync_buffs[id] = NULL;
 	} else
 		sb = NULL;
 	spin_unlock_bh(&ipvs->sync_buff_lock);
 	return sb;
 }
 
-/*
- * Switch mode from sending version 0 or 1
- *  - must handle sync_buf
- */
-void ip_vs_sync_switch_mode(struct net *net, int mode)
+static inline int
+select_master_thread_id(struct netns_ipvs *ipvs, struct ip_vs_conn *cp)
 {
-	struct netns_ipvs *ipvs = net_ipvs(net);
-
-	if (!(ipvs->sync_state & IP_VS_STATE_MASTER))
-		return;
-	if (mode == sysctl_sync_ver(ipvs) || !ipvs->sync_buff)
-		return;
-
-	spin_lock_bh(&ipvs->sync_buff_lock);
-	/* Buffer empty ? then let buf_create do the job  */
-	if (ipvs->sync_buff->mesg->size <=  sizeof(struct ip_vs_sync_mesg)) {
-		kfree(ipvs->sync_buff);
-		ipvs->sync_buff = NULL;
-	} else {
-		spin_lock_bh(&ipvs->sync_lock);
-		if (ipvs->sync_state & IP_VS_STATE_MASTER)
-			list_add_tail(&ipvs->sync_buff->list,
-				      &ipvs->sync_queue);
-		else
-			ip_vs_sync_buff_release(ipvs->sync_buff);
-		spin_unlock_bh(&ipvs->sync_lock);
-	}
-	spin_unlock_bh(&ipvs->sync_buff_lock);
+	return ((int) cp >> (1 + ilog2(sizeof(*cp)))) & ipvs->threads_mask;
 }
 
 /*
@@ -532,8 +503,10 @@ static void ip_vs_sync_conn_v0(struct net *net, struct ip_vs_conn *cp,
 			       int pkts)
 {
 	struct netns_ipvs *ipvs = net_ipvs(net);
+	struct ip_vs_sync_buff *buff;
 	struct ip_vs_sync_mesg_v0 *m;
 	struct ip_vs_sync_conn_v0 *s;
+	int id;
 	int len;
 
 	if (unlikely(cp->af != AF_INET))
@@ -546,20 +519,36 @@ static void ip_vs_sync_conn_v0(struct net *net, struct ip_vs_conn *cp,
 		return;
 
 	spin_lock(&ipvs->sync_buff_lock);
-	if (!ipvs->sync_buff) {
-		ipvs->sync_buff =
-			ip_vs_sync_buff_create_v0(ipvs);
-		if (!ipvs->sync_buff) {
+	if (!(ipvs->sync_state & IP_VS_STATE_MASTER)) {
+		spin_unlock(&ipvs->sync_buff_lock);
+		return;
+	}
+
+	id = select_master_thread_id(ipvs, cp);
+	buff = ipvs->sync_buffs[id];
+	if (buff) {
+		m = (struct ip_vs_sync_mesg_v0 *) buff->mesg;
+		/* Send buffer if it is for v1 */
+		if (!m->nr_conns) {
+			sb_queue_tail(ipvs, id);
+			ipvs->sync_buffs[id] = NULL;
+			buff = NULL;
+		}
+	}
+	if (!buff) {
+		buff = ip_vs_sync_buff_create_v0(ipvs);
+		if (!buff) {
 			spin_unlock(&ipvs->sync_buff_lock);
 			pr_err("ip_vs_sync_buff_create failed.\n");
 			return;
 		}
+		ipvs->sync_buffs[id] = buff;
 	}
 
 	len = (cp->flags & IP_VS_CONN_F_SEQ_MASK) ? FULL_CONN_SIZE :
 		SIMPLE_CONN_SIZE;
-	m = (struct ip_vs_sync_mesg_v0 *)ipvs->sync_buff->mesg;
-	s = (struct ip_vs_sync_conn_v0 *)ipvs->sync_buff->head;
+	m = (struct ip_vs_sync_mesg_v0 *) buff->mesg;
+	s = (struct ip_vs_sync_conn_v0 *) buff->head;
 
 	/* copy members */
 	s->reserved = 0;
@@ -580,12 +569,12 @@ static void ip_vs_sync_conn_v0(struct net *net, struct ip_vs_conn *cp,
 
 	m->nr_conns++;
 	m->size += len;
-	ipvs->sync_buff->head += len;
+	buff->head += len;
 
 	/* check if there is a space for next one */
-	if (ipvs->sync_buff->head + FULL_CONN_SIZE > ipvs->sync_buff->end) {
-		sb_queue_tail(ipvs);
-		ipvs->sync_buff = NULL;
+	if (buff->head + FULL_CONN_SIZE > buff->end) {
+		sb_queue_tail(ipvs, id);
+		ipvs->sync_buffs[id] = NULL;
 	}
 	spin_unlock(&ipvs->sync_buff_lock);
 
@@ -608,10 +597,12 @@ static void ip_vs_sync_conn_v0(struct net *net, struct ip_vs_conn *cp,
 void ip_vs_sync_conn(struct net *net, struct ip_vs_conn *cp, int pkts)
 {
 	struct netns_ipvs *ipvs = net_ipvs(net);
+	struct ip_vs_sync_buff *buff;
 	struct ip_vs_sync_mesg *m;
 	union ip_vs_sync_conn *s;
 	__u8 *p;
 	unsigned int len, pe_name_len, pad;
+	int id;
 
 	/* Handle old version of the protocol */
 	if (sysctl_sync_ver(ipvs) == 0) {
@@ -636,6 +627,12 @@ sloop:
 	}
 
 	spin_lock(&ipvs->sync_buff_lock);
+	if (!(ipvs->sync_state & IP_VS_STATE_MASTER)) {
+		spin_unlock(&ipvs->sync_buff_lock);
+		return;
+	}
+
+	id = select_master_thread_id(ipvs, cp);
 
 #ifdef CONFIG_IP_VS_IPV6
 	if (cp->af == AF_INET6)
@@ -654,27 +651,32 @@ sloop:
 
 	/* check if there is a space for this one  */
 	pad = 0;
-	if (ipvs->sync_buff) {
-		pad = (4 - (size_t)ipvs->sync_buff->head) & 3;
-		if (ipvs->sync_buff->head + len + pad > ipvs->sync_buff->end) {
-			sb_queue_tail(ipvs);
-			ipvs->sync_buff = NULL;
+	buff = ipvs->sync_buffs[id];
+	if (buff) {
+		m = buff->mesg;
+		pad = (4 - (size_t) buff->head) & 3;
+		/* Send buffer if it is for v0 */
+		if (buff->head + len + pad > buff->end || m->reserved) {
+			sb_queue_tail(ipvs, id);
+			ipvs->sync_buffs[id] = NULL;
+			buff = NULL;
 			pad = 0;
 		}
 	}
 
-	if (!ipvs->sync_buff) {
-		ipvs->sync_buff = ip_vs_sync_buff_create(ipvs);
-		if (!ipvs->sync_buff) {
+	if (!buff) {
+		buff = ip_vs_sync_buff_create(ipvs);
+		if (!buff) {
 			spin_unlock(&ipvs->sync_buff_lock);
 			pr_err("ip_vs_sync_buff_create failed.\n");
 			return;
 		}
+		ipvs->sync_buffs[id] = buff;
+		m = buff->mesg;
 	}
 
-	m = ipvs->sync_buff->mesg;
-	p = ipvs->sync_buff->head;
-	ipvs->sync_buff->head += pad + len;
+	p = buff->head;
+	buff->head += pad + len;
 	m->size += pad + len;
 	/* Add ev. padding from prev. sync_conn */
 	while (pad--)
@@ -825,6 +827,7 @@ static void ip_vs_proc_conn(struct net *net, struct ip_vs_conn_param *param,
 		kfree(param->pe_data);
 
 		dest = cp->dest;
+		spin_lock(&cp->lock);
 		if ((cp->flags ^ flags) & IP_VS_CONN_F_INACTIVE &&
 		    !(flags & IP_VS_CONN_F_TEMPLATE) && dest) {
 			if (flags & IP_VS_CONN_F_INACTIVE) {
@@ -838,6 +841,7 @@ static void ip_vs_proc_conn(struct net *net, struct ip_vs_conn_param *param,
 		flags &= IP_VS_CONN_F_BACKUP_UPD_MASK;
 		flags |= cp->flags & ~IP_VS_CONN_F_BACKUP_UPD_MASK;
 		cp->flags = flags;
+		spin_unlock(&cp->lock);
 		if (!dest) {
 			dest = ip_vs_try_bind_dest(cp);
 			if (dest)
@@ -1368,9 +1372,15 @@ static int bind_mcastif_addr(struct socket *sock, char *ifname)
 /*
  *      Set up sending multicast socket over UDP
  */
-static struct socket *make_send_sock(struct net *net)
+static struct socket *make_send_sock(struct net *net, int id)
 {
 	struct netns_ipvs *ipvs = net_ipvs(net);
+	/* multicast addr */
+	struct sockaddr_in mcast_addr = {
+		.sin_family		= AF_INET,
+		.sin_port		= cpu_to_be16(IP_VS_SYNC_PORT + id),
+		.sin_addr.s_addr	= cpu_to_be32(IP_VS_SYNC_GROUP),
+	};
 	struct socket *sock;
 	int result;
 
@@ -1419,9 +1429,15 @@ error:
 /*
  *      Set up receiving multicast socket over UDP
  */
-static struct socket *make_receive_sock(struct net *net)
+static struct socket *make_receive_sock(struct net *net, int id)
 {
 	struct netns_ipvs *ipvs = net_ipvs(net);
+	/* multicast addr */
+	struct sockaddr_in mcast_addr = {
+		.sin_family		= AF_INET,
+		.sin_port		= cpu_to_be16(IP_VS_SYNC_PORT + id),
+		.sin_addr.s_addr	= cpu_to_be32(IP_VS_SYNC_GROUP),
+	};
 	struct socket *sock;
 	int result;
 
@@ -1512,10 +1528,10 @@ ip_vs_receive(struct socket *sock, char *buffer, const size_t buflen)
 	iov.iov_base     = buffer;
 	iov.iov_len      = (size_t)buflen;
 
-	len = kernel_recvmsg(sock, &msg, &iov, 1, buflen, 0);
+	len = kernel_recvmsg(sock, &msg, &iov, 1, buflen, MSG_DONTWAIT);
 
 	if (len < 0)
-		return -1;
+		return len;
 
 	LeaveFunction(7);
 	return len;
@@ -1523,15 +1539,15 @@ ip_vs_receive(struct socket *sock, char *buffer, const size_t buflen)
 
 /* Get next buffer to send */
 static inline struct ip_vs_sync_buff *
-next_sync_buff(struct netns_ipvs *ipvs)
+next_sync_buff(struct netns_ipvs *ipvs, int id)
 {
 	struct ip_vs_sync_buff *sb;
 
-	sb = sb_dequeue(ipvs);
+	sb = sb_dequeue(ipvs, id);
 	if (sb)
 		return sb;
 	/* Do not delay entries in buffer for more than 2 seconds */
-	return get_curr_sync_buff(ipvs, IPVS_SYNC_FLUSH_TIME);
+	return get_curr_sync_buff(ipvs, id, IPVS_SYNC_FLUSH_TIME);
 }
 
 static int sync_thread_master(void *data)
@@ -1542,15 +1558,15 @@ static int sync_thread_master(void *data)
 	int pause = HZ / 10;
 
 	pr_info("sync thread started: state = MASTER, mcast_ifn = %s, "
-		"syncid = %d\n",
-		ipvs->master_mcast_ifn, ipvs->master_syncid);
+		"syncid = %d, id = %d\n",
+		ipvs->master_mcast_ifn, ipvs->master_syncid, tinfo->id);
 
 	while (!kthread_should_stop()) {
 		int count = 0;
 
 		for (;;) {
 			if (!sb) {
-				sb = next_sync_buff(ipvs);
+				sb = next_sync_buff(ipvs, tinfo->id);
 				if (!sb)
 					break;
 			}
@@ -1577,11 +1593,11 @@ static int sync_thread_master(void *data)
 		ip_vs_sync_buff_release(sb);
 
 	/* clean up the sync_buff queue */
-	while ((sb = sb_dequeue(ipvs)))
+	while ((sb = sb_dequeue(ipvs, tinfo->id)))
 		ip_vs_sync_buff_release(sb);
 
 	/* clean up the current sync_buff */
-	sb = get_curr_sync_buff(ipvs, 0);
+	sb = get_curr_sync_buff(ipvs, tinfo->id, 0);
 	if (sb)
 		ip_vs_sync_buff_release(sb);
 
@@ -1600,8 +1616,8 @@ static int sync_thread_backup(void *data)
 	int len;
 
 	pr_info("sync thread started: state = BACKUP, mcast_ifn = %s, "
-		"syncid = %d\n",
-		ipvs->backup_mcast_ifn, ipvs->backup_syncid);
+		"syncid = %d, id = %d\n",
+		ipvs->backup_mcast_ifn, ipvs->backup_syncid, tinfo->id);
 
 	while (!kthread_should_stop()) {
 		wait_event_interruptible(*sk_sleep(tinfo->sock->sk),
@@ -1613,7 +1629,8 @@ static int sync_thread_backup(void *data)
 			len = ip_vs_receive(tinfo->sock, tinfo->buf,
 					ipvs->recv_mesg_maxlen);
 			if (len <= 0) {
-				pr_err("receiving message error\n");
+				if (len != -EAGAIN)
+					pr_err("receiving message error\n");
 				break;
 			}
 
@@ -1637,86 +1654,135 @@ static int sync_thread_backup(void *data)
 int start_sync_thread(struct net *net, int state, char *mcast_ifn, __u8 syncid)
 {
 	struct ip_vs_sync_thread_data *tinfo;
-	struct task_struct **realtask, *task;
+	struct task_struct **array = NULL, *task;
 	struct socket *sock;
 	struct netns_ipvs *ipvs = net_ipvs(net);
-	char *name, *buf = NULL;
+	char *name;
 	int (*threadfn)(void *data);
+	int id, count;
 	int result = -ENOMEM;
 
 	IP_VS_DBG(7, "%s(): pid %d\n", __func__, task_pid_nr(current));
 	IP_VS_DBG(7, "Each ip_vs_sync_conn entry needs %Zd bytes\n",
 		  sizeof(struct ip_vs_sync_conn_v0));
 
+	if (!ipvs->sync_state) {
+		count = clamp(sysctl_sync_ports(ipvs), 1, IPVS_SYNC_PORTS_MAX);
+		ipvs->threads_mask = count - 1;
+	} else
+		count = ipvs->threads_mask + 1;
 
 	if (state == IP_VS_STATE_MASTER) {
-		if (ipvs->master_thread)
+		if (ipvs->master_threads)
 			return -EEXIST;
 
 		strlcpy(ipvs->master_mcast_ifn, mcast_ifn,
 			sizeof(ipvs->master_mcast_ifn));
 		ipvs->master_syncid = syncid;
-		realtask = &ipvs->master_thread;
-		name = "ipvs_master:%d";
+		name = "ipvs-m:%d:%d";
 		threadfn = sync_thread_master;
-		sock = make_send_sock(net);
 	} else if (state == IP_VS_STATE_BACKUP) {
-		if (ipvs->backup_thread)
+		if (ipvs->backup_threads)
 			return -EEXIST;
 
 		strlcpy(ipvs->backup_mcast_ifn, mcast_ifn,
 			sizeof(ipvs->backup_mcast_ifn));
 		ipvs->backup_syncid = syncid;
-		realtask = &ipvs->backup_thread;
-		name = "ipvs_backup:%d";
+		name = "ipvs-b:%d:%d";
 		threadfn = sync_thread_backup;
-		sock = make_receive_sock(net);
 	} else {
 		return -EINVAL;
 	}
 
-	if (IS_ERR(sock)) {
-		result = PTR_ERR(sock);
-		goto out;
+	if (state == IP_VS_STATE_MASTER) {
+		ipvs->sync_queues = kmalloc(count * sizeof(struct list_head),
+					    GFP_KERNEL);
+		if (!ipvs->sync_queues)
+			goto out;
+		for (id = 0; id < count; id++)
+			INIT_LIST_HEAD(&ipvs->sync_queues[id]);
+		ipvs->sync_buffs = kzalloc(count *
+					   sizeof(struct ip_vs_sync_buff *),
+					   GFP_KERNEL);
+		if (!ipvs->sync_buffs)
+			goto out;
 	}
 
+	array = kzalloc(count * sizeof(struct task_struct *), GFP_KERNEL);
+	if (!array)
+		goto out;
+
 	set_sync_mesg_maxlen(net, state);
-	if (state == IP_VS_STATE_BACKUP) {
-		buf = kmalloc(ipvs->recv_mesg_maxlen, GFP_KERNEL);
-		if (!buf)
-			goto outsocket;
-	}
 
-	tinfo = kmalloc(sizeof(*tinfo), GFP_KERNEL);
-	if (!tinfo)
-		goto outbuf;
+	tinfo = NULL;
+	for (id = 0; id < count; id++) {
+		if (state == IP_VS_STATE_MASTER)
+			sock = make_send_sock(net, id);
+		else
+			sock = make_receive_sock(net, id);
+		if (IS_ERR(sock)) {
+			result = PTR_ERR(sock);
+			goto outtinfo;
+		}
 
-	tinfo->net = net;
-	tinfo->sock = sock;
-	tinfo->buf = buf;
+		tinfo = kmalloc(sizeof(*tinfo), GFP_KERNEL);
+		if (!tinfo)
+			goto outsocket;
+		tinfo->net = net;
+		tinfo->sock = sock;
+		if (state == IP_VS_STATE_BACKUP) {
+			tinfo->buf = kmalloc(ipvs->recv_mesg_maxlen,
+					     GFP_KERNEL);
+			if (!tinfo->buf)
+				goto outtinfo;
+		}
+		tinfo->id = id;
 
-	task = kthread_run(threadfn, tinfo, name, ipvs->gen);
-	if (IS_ERR(task)) {
-		result = PTR_ERR(task);
-		goto outtinfo;
+		task = kthread_run(threadfn, tinfo, name, ipvs->gen, id);
+		if (IS_ERR(task)) {
+			result = PTR_ERR(task);
+			goto outtinfo;
+		}
+		tinfo = NULL;
+		array[id] = task;
 	}
 
 	/* mark as active */
-	*realtask = task;
+
+	if (state == IP_VS_STATE_MASTER)
+		ipvs->master_threads = array;
+	else
+		ipvs->backup_threads = array;
+	spin_lock_bh(&ipvs->sync_buff_lock);
 	ipvs->sync_state |= state;
+	spin_unlock_bh(&ipvs->sync_buff_lock);
 
 	/* increase the module use count */
 	ip_vs_use_count_inc();
 
 	return 0;
 
-outtinfo:
-	kfree(tinfo);
-outbuf:
-	kfree(buf);
 outsocket:
 	sk_release_kernel(sock->sk);
+
+outtinfo:
+	if (tinfo) {
+		sk_release_kernel(tinfo->sock->sk);
+		kfree(tinfo->buf);
+		kfree(tinfo);
+	}
+	count = id;
+	while (count-- > 0)
+		kthread_stop(array[count]);
+	kfree(array);
+
 out:
+	if (!(ipvs->sync_state & IP_VS_STATE_MASTER)) {
+		kfree(ipvs->sync_queues);
+		ipvs->sync_queues = NULL;
+		kfree(ipvs->sync_buffs);
+		ipvs->sync_buffs = NULL;
+	}
 	return result;
 }
 
@@ -1724,38 +1790,58 @@ out:
 int stop_sync_thread(struct net *net, int state)
 {
 	struct netns_ipvs *ipvs = net_ipvs(net);
+	struct task_struct **array;
+	int id;
 	int retc = -EINVAL;
 
 	IP_VS_DBG(7, "%s(): pid %d\n", __func__, task_pid_nr(current));
 
 	if (state == IP_VS_STATE_MASTER) {
-		if (!ipvs->master_thread)
+		if (!ipvs->master_threads)
 			return -ESRCH;
 
-		pr_info("stopping master sync thread %d ...\n",
-			task_pid_nr(ipvs->master_thread));
-
 		/*
 		 * The lock synchronizes with sb_queue_tail(), so that we don't
 		 * add sync buffers to the queue, when we are already in
 		 * progress of stopping the master sync daemon.
 		 */
 
-		spin_lock_bh(&ipvs->sync_lock);
+		spin_lock_bh(&ipvs->sync_buff_lock);
+		spin_lock(&ipvs->sync_lock);
 		ipvs->sync_state &= ~IP_VS_STATE_MASTER;
-		spin_unlock_bh(&ipvs->sync_lock);
-		retc = kthread_stop(ipvs->master_thread);
-		ipvs->master_thread = NULL;
+		spin_unlock(&ipvs->sync_lock);
+		spin_unlock_bh(&ipvs->sync_buff_lock);
+		array = ipvs->master_threads;
+		retc = 0;
+		for (id = ipvs->threads_mask; id >= 0; id--) {
+			int ret;
+
+			pr_info("stopping master sync thread %d ...\n",
+				task_pid_nr(array[id]));
+			ret = kthread_stop(array[id]);
+			if (retc >= 0)
+				retc = ret;
+		}
+		kfree(array);
+		ipvs->master_threads = NULL;
 	} else if (state == IP_VS_STATE_BACKUP) {
-		if (!ipvs->backup_thread)
+		if (!ipvs->backup_threads)
 			return -ESRCH;
 
-		pr_info("stopping backup sync thread %d ...\n",
-			task_pid_nr(ipvs->backup_thread));
-
 		ipvs->sync_state &= ~IP_VS_STATE_BACKUP;
-		retc = kthread_stop(ipvs->backup_thread);
-		ipvs->backup_thread = NULL;
+		array = ipvs->backup_threads;
+		retc = 0;
+		for (id = ipvs->threads_mask; id >= 0; id--) {
+			int ret;
+
+			pr_info("stopping backup sync thread %d ...\n",
+				task_pid_nr(array[id]));
+			ret = kthread_stop(array[id]);
+			if (retc >= 0)
+				retc = ret;
+		}
+		kfree(array);
+		ipvs->backup_threads = NULL;
 	}
 
 	/* decrease the module use count */
@@ -1772,13 +1858,8 @@ int __net_init ip_vs_sync_net_init(struct net *net)
 	struct netns_ipvs *ipvs = net_ipvs(net);
 
 	__mutex_init(&ipvs->sync_mutex, "ipvs->sync_mutex", &__ipvs_sync_key);
-	INIT_LIST_HEAD(&ipvs->sync_queue);
 	spin_lock_init(&ipvs->sync_lock);
 	spin_lock_init(&ipvs->sync_buff_lock);

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH 8/9] ipvs: optimize the use of flags in ip_vs_bind_dest
  2012-03-21  8:56 [GIT PULL nf-next] IPVS Simon Horman
                   ` (6 preceding siblings ...)
  2012-03-21  8:56 ` [PATCH 7/9] ipvs: add support for sync threads Simon Horman
@ 2012-03-21  8:56 ` Simon Horman
  2012-03-21  8:56 ` [PATCH 9/9] ipvs: Provide a generic ip_vs_bind_xmit() Simon Horman
  8 siblings, 0 replies; 18+ messages in thread
From: Simon Horman @ 2012-03-21  8:56 UTC (permalink / raw)
  To: Pablo Neira Ayuso
  Cc: lvs-devel, netdev, netfilter-devel, Wensong Zhang,
	Julian Anastasov, Simon Horman

From: Julian Anastasov <ja@ssi.bg>

	cp->flags is marked volatile but ip_vs_bind_dest
can safely modify the flags, so save some CPU cycles by
using temp variable.

Signed-off-by: Julian Anastasov <ja@ssi.bg>
Signed-off-by: Simon Horman <horms@verge.net.au>
---
 net/netfilter/ipvs/ip_vs_conn.c |   15 +++++++++------
 1 files changed, 9 insertions(+), 6 deletions(-)

diff --git a/net/netfilter/ipvs/ip_vs_conn.c b/net/netfilter/ipvs/ip_vs_conn.c
index 7478b66..1eaec99 100644
--- a/net/netfilter/ipvs/ip_vs_conn.c
+++ b/net/netfilter/ipvs/ip_vs_conn.c
@@ -548,6 +548,7 @@ static inline void
 ip_vs_bind_dest(struct ip_vs_conn *cp, struct ip_vs_dest *dest)
 {
 	unsigned int conn_flags;
+	__u32 flags;
 
 	/* if dest is NULL, then return directly */
 	if (!dest)
@@ -559,17 +560,19 @@ ip_vs_bind_dest(struct ip_vs_conn *cp, struct ip_vs_dest *dest)
 	conn_flags = atomic_read(&dest->conn_flags);
 	if (cp->protocol != IPPROTO_UDP)
 		conn_flags &= ~IP_VS_CONN_F_ONE_PACKET;
+	flags = cp->flags;
 	/* Bind with the destination and its corresponding transmitter */
-	if (cp->flags & IP_VS_CONN_F_SYNC) {
+	if (flags & IP_VS_CONN_F_SYNC) {
 		/* if the connection is not template and is created
 		 * by sync, preserve the activity flag.
 		 */
-		if (!(cp->flags & IP_VS_CONN_F_TEMPLATE))
+		if (!(flags & IP_VS_CONN_F_TEMPLATE))
 			conn_flags &= ~IP_VS_CONN_F_INACTIVE;
 		/* connections inherit forwarding method from dest */
-		cp->flags &= ~(IP_VS_CONN_F_FWD_MASK | IP_VS_CONN_F_NOOUTPUT);
+		flags &= ~(IP_VS_CONN_F_FWD_MASK | IP_VS_CONN_F_NOOUTPUT);
 	}
-	cp->flags |= conn_flags;
+	flags |= conn_flags;
+	cp->flags = flags;
 	cp->dest = dest;
 
 	IP_VS_DBG_BUF(7, "Bind-dest %s c:%s:%d v:%s:%d "
@@ -584,11 +587,11 @@ ip_vs_bind_dest(struct ip_vs_conn *cp, struct ip_vs_dest *dest)
 		      atomic_read(&dest->refcnt));
 
 	/* Update the connection counters */
-	if (!(cp->flags & IP_VS_CONN_F_TEMPLATE)) {
+	if (!(flags & IP_VS_CONN_F_TEMPLATE)) {
 		/* It is a normal connection, so modify the counters
 		 * according to the flags, later the protocol can
 		 * update them on state change */
-		if (!(cp->flags & IP_VS_CONN_F_INACTIVE))
+		if (!(flags & IP_VS_CONN_F_INACTIVE))
 			atomic_inc(&dest->activeconns);
 		else
 			atomic_inc(&dest->inactconns);
-- 
1.7.6.3


^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH 9/9] ipvs: Provide a generic ip_vs_bind_xmit()
  2012-03-21  8:56 [GIT PULL nf-next] IPVS Simon Horman
                   ` (7 preceding siblings ...)
  2012-03-21  8:56 ` [PATCH 8/9] ipvs: optimize the use of flags in ip_vs_bind_dest Simon Horman
@ 2012-03-21  8:56 ` Simon Horman
  8 siblings, 0 replies; 18+ messages in thread
From: Simon Horman @ 2012-03-21  8:56 UTC (permalink / raw)
  To: Pablo Neira Ayuso
  Cc: lvs-devel, netdev, netfilter-devel, Wensong Zhang,
	Julian Anastasov, Simon Horman

This logic is now used twice so it seems worthwhile avoiding
the albeit minor code duplication.

Acked-by: Julian Anastasov <ja@ssi.bg>
Signed-off-by: Simon Horman <horms@verge.net.au>
---
 net/netfilter/ipvs/ip_vs_conn.c |   26 +++++++++++++-------------
 1 files changed, 13 insertions(+), 13 deletions(-)

diff --git a/net/netfilter/ipvs/ip_vs_conn.c b/net/netfilter/ipvs/ip_vs_conn.c
index 1eaec99..2fc9e4f 100644
--- a/net/netfilter/ipvs/ip_vs_conn.c
+++ b/net/netfilter/ipvs/ip_vs_conn.c
@@ -481,7 +481,7 @@ void ip_vs_conn_fill_cport(struct ip_vs_conn *cp, __be16 cport)
  *	Bind a connection entry with the corresponding packet_xmit.
  *	Called by ip_vs_conn_new.
  */
-static inline void ip_vs_bind_xmit(struct ip_vs_conn *cp)
+static inline void ip_vs_bind_xmit_v4(struct ip_vs_conn *cp)
 {
 	switch (IP_VS_FWD_METHOD(cp)) {
 	case IP_VS_CONN_F_MASQ:
@@ -533,6 +533,16 @@ static inline void ip_vs_bind_xmit_v6(struct ip_vs_conn *cp)
 }
 #endif
 
+static inline void ip_vs_bind_xmit(struct ip_vs_conn *cp)
+{
+#ifdef CONFIG_IP_VS_IPV6
+	if (cp->af == AF_INET6)
+		ip_vs_bind_xmit_v6(cp);
+	else
+#endif
+		ip_vs_bind_xmit_v4(cp);
+}
+
 
 static inline int ip_vs_dest_totalconns(struct ip_vs_dest *dest)
 {
@@ -637,12 +647,7 @@ struct ip_vs_dest *ip_vs_try_bind_dest(struct ip_vs_conn *cp)
 
 		/* Update its packet transmitter */
 		cp->packet_xmit = NULL;
-#ifdef CONFIG_IP_VS_IPV6
-		if (cp->af == AF_INET6)
-			ip_vs_bind_xmit_v6(cp);
-		else
-#endif
-			ip_vs_bind_xmit(cp);
+		ip_vs_bind_xmit(cp);
 
 		pd = ip_vs_proto_data_get(ip_vs_conn_net(cp), cp->protocol);
 		if (pd && atomic_read(&pd->appcnt))
@@ -916,12 +921,7 @@ ip_vs_conn_new(const struct ip_vs_conn_param *p,
 	cp->sync_endtime = jiffies & ~3UL;
 
 	/* Bind its packet transmitter */
-#ifdef CONFIG_IP_VS_IPV6
-	if (p->af == AF_INET6)
-		ip_vs_bind_xmit_v6(cp);
-	else
-#endif
-		ip_vs_bind_xmit(cp);
+	ip_vs_bind_xmit(cp);
 
 	if (unlikely(pd && atomic_read(&pd->appcnt)))
 		ip_vs_bind_app(cp, pd->pp);
-- 
1.7.6.3


^ permalink raw reply related	[flat|nested] 18+ messages in thread

* Re: [PATCH 5/9] ipvs: use adaptive pause in master thread
  2012-03-21  8:56 ` [PATCH 5/9] ipvs: use adaptive pause in master thread Simon Horman
@ 2012-04-02 11:11   ` Pablo Neira Ayuso
  2012-04-03 21:16     ` Julian Anastasov
  0 siblings, 1 reply; 18+ messages in thread
From: Pablo Neira Ayuso @ 2012-04-02 11:11 UTC (permalink / raw)
  To: Simon Horman
  Cc: lvs-devel, netdev, netfilter-devel, Wensong Zhang, Julian Anastasov

Hi Simon et al.

On Wed, Mar 21, 2012 at 05:56:20PM +0900, Simon Horman wrote:
> From: Julian Anastasov <ja@ssi.bg>
> 
> 	High rate of sync messages in master can lead to
> overflowing the socket buffer and dropping the messages.
> Instead of using fixed pause try to adapt to the sync
> rate, so that we do not wakeup too often while at the same
> time staying below the socket buffer limit.

I see. You are avoiding the congestion in the socket queue by putting
the pressure in your sync_buff queue  (I don't find any limit in
the code for the amount of memory that it may consume btw).

Please, correct me if I'm wrong, but from this I can se you're
assuming that there's always room in the syn_buff queue to store
messages. This is valid if the high peak of traffic lasts for short
time. However, if it lasts long, the worker thread may not be able to
consume all the messages in time under high stress situation that
lasts long and the sync_buffer may keep growing more and more.

Moreover, the backup may receive sync messages talking about old
states that are not useful anymore to recover the load-balancing in
case of failover.

One more concern, please see below.

> Signed-off-by: Julian Anastasov <ja@ssi.bg>
> Tested-by: Aleksey Chudov <aleksey.chudov@gmail.com>
> Signed-off-by: Simon Horman <horms@verge.net.au>
> ---
>  net/netfilter/ipvs/ip_vs_sync.c |   60 +++++++++++++++++++++++++++++---------
>  1 files changed, 46 insertions(+), 14 deletions(-)
> 
> diff --git a/net/netfilter/ipvs/ip_vs_sync.c b/net/netfilter/ipvs/ip_vs_sync.c
> index 0e36679..9201c43 100644
> --- a/net/netfilter/ipvs/ip_vs_sync.c
> +++ b/net/netfilter/ipvs/ip_vs_sync.c
> @@ -1392,18 +1392,22 @@ ip_vs_send_async(struct socket *sock, const char *buffer, const size_t length)
>  	return len;
>  }
>  
> -static void
> +static int
>  ip_vs_send_sync_msg(struct socket *sock, struct ip_vs_sync_mesg *msg)
>  {
>  	int msize;
> +	int ret;
>  
>  	msize = msg->size;
>  
>  	/* Put size in network byte order */
>  	msg->size = htons(msg->size);
>  
> -	if (ip_vs_send_async(sock, (char *)msg, msize) != msize)
> -		pr_err("ip_vs_send_async error\n");
> +	ret = ip_vs_send_async(sock, (char *)msg, msize);
> +	if (ret >= 0 || ret == -EAGAIN)
> +		return ret;
> +	pr_err("ip_vs_send_async error %d\n", ret);
> +	return 0;
>  }
>  
>  static int
> @@ -1428,33 +1432,61 @@ ip_vs_receive(struct socket *sock, char *buffer, const size_t buflen)
>  	return len;
>  }
>  
> +/* Get next buffer to send */
> +static inline struct ip_vs_sync_buff *
> +next_sync_buff(struct netns_ipvs *ipvs)
> +{
> +	struct ip_vs_sync_buff *sb;
> +
> +	sb = sb_dequeue(ipvs);
> +	if (sb)
> +		return sb;
> +	/* Do not delay entries in buffer for more than 2 seconds */
> +	return get_curr_sync_buff(ipvs, 2 * HZ);
> +}
>  
>  static int sync_thread_master(void *data)
>  {
>  	struct ip_vs_sync_thread_data *tinfo = data;
>  	struct netns_ipvs *ipvs = net_ipvs(tinfo->net);
> -	struct ip_vs_sync_buff *sb;
> +	struct ip_vs_sync_buff *sb = NULL;
> +	int pause = HZ / 10;
>  
>  	pr_info("sync thread started: state = MASTER, mcast_ifn = %s, "
>  		"syncid = %d\n",
>  		ipvs->master_mcast_ifn, ipvs->master_syncid);
>  
>  	while (!kthread_should_stop()) {
> -		while ((sb = sb_dequeue(ipvs))) {
> -			ip_vs_send_sync_msg(tinfo->sock, sb->mesg);
> -			ip_vs_sync_buff_release(sb);
> -		}
> +		int count = 0;
>  
> -		/* check if entries stay in ipvs->sync_buff for 2 seconds */
> -		sb = get_curr_sync_buff(ipvs, 2 * HZ);
> -		if (sb) {
> -			ip_vs_send_sync_msg(tinfo->sock, sb->mesg);
> -			ip_vs_sync_buff_release(sb);
> +		for (;;) {
> +			if (!sb) {
> +				sb = next_sync_buff(ipvs);
> +				if (!sb)
> +					break;
> +			}
> +			if (ip_vs_send_sync_msg(tinfo->sock, sb->mesg) >= 0 ||
> +			    pause == 1) {
> +				ip_vs_sync_buff_release(sb);
> +				sb = NULL;
> +				count++;
> +			} else {
> +				pause = max(1, (pause >> 1));
> +				break;
> +			}
>  		}
>  
> -		schedule_timeout_interruptible(HZ);
> +		/* Max packets to send at once */
> +		if (count > 200)
> +			pause = max(1, (pause - HZ / 20));
> +		else if (count < 20)
> +			pause = min(HZ / 4, (pause + HZ / 20));
> +		schedule_timeout_interruptible(sb ? 1 : pause);

This rate-limits the amount of times the worker is woken up.

Still, this seems to me like an ad-hoc congestion solution. There's no
justification why those numbers has been chosed, eg. why do we assume
that we're congested if we've reached 200 packets in one single loop?

To me, congestion control is a complicated thing (there is plenty of
literature for TCP avoid it). I'm not sure how many patches will
follow after this one to try to improve your congestion control
solution.

So, my question is, are you sure you want to enter this domain?

IMO, better to stick to some simple solution, ie. just drop messages
if the worker thread receives a high peak of work, than trying to
define some sort of rate-limit solution based on assumptions that are
not generic enough for every setup.

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH 5/9] ipvs: use adaptive pause in master thread
  2012-04-02 11:11   ` Pablo Neira Ayuso
@ 2012-04-03 21:16     ` Julian Anastasov
  2012-04-05 15:05       ` Pablo Neira Ayuso
  0 siblings, 1 reply; 18+ messages in thread
From: Julian Anastasov @ 2012-04-03 21:16 UTC (permalink / raw)
  To: Pablo Neira Ayuso
  Cc: Simon Horman, lvs-devel, netdev, netfilter-devel, Wensong Zhang


	Hello,

On Mon, 2 Apr 2012, Pablo Neira Ayuso wrote:

> > From: Julian Anastasov <ja@ssi.bg>
> > 
> > 	High rate of sync messages in master can lead to
> > overflowing the socket buffer and dropping the messages.
> > Instead of using fixed pause try to adapt to the sync
> > rate, so that we do not wakeup too often while at the same
> > time staying below the socket buffer limit.
> 
> I see. You are avoiding the congestion in the socket queue by putting
> the pressure in your sync_buff queue  (I don't find any limit in
> the code for the amount of memory that it may consume btw).
> 
> Please, correct me if I'm wrong, but from this I can se you're
> assuming that there's always room in the syn_buff queue to store
> messages. This is valid if the high peak of traffic lasts for short
> time. However, if it lasts long, the worker thread may not be able to
> consume all the messages in time under high stress situation that
> lasts long and the sync_buffer may keep growing more and more.

	Agreed, packet processing does not check for any
limits for the sync_queues lists. This can be addressed
additionally, may be there can be some wakeup if threshold
is reached, so that we wake up the master thread early.
But there is always the problem that we do not know how
deep/free is the socket buffer, after how much time the
master thread will wakeup... We have to select some arbitrary
number again.

> Moreover, the backup may receive sync messages talking about old
> states that are not useful anymore to recover the load-balancing in
> case of failover.

	Can you explain more? Are you referring to the
2-second buffer delay?

> One more concern, please see below.

> > -		schedule_timeout_interruptible(HZ);
> > +		/* Max packets to send at once */
> > +		if (count > 200)
> > +			pause = max(1, (pause - HZ / 20));
> > +		else if (count < 20)
> > +			pause = min(HZ / 4, (pause + HZ / 20));
> > +		schedule_timeout_interruptible(sb ? 1 : pause);
> 
> This rate-limits the amount of times the worker is woken up.
> 
> Still, this seems to me like an ad-hoc congestion solution. There's no
> justification why those numbers has been chosed, eg. why do we assume
> that we're congested if we've reached 200 packets in one single loop?

	200 is arbitrary, 20% of txqueuelen, a way to avoid
large bursts. It is the 'pause = max(1, (pause >> 1));' part
that also matters because we try to increase the wakeup rate, so
that socket buffer is below 50%. We assume sync traffic rate
does not change much but we are still prepared to avoid
drops while pause > 1 jiffie. If sync rate is too high may be
we should wakeup the master thread in sb_queue_tail.

	The above logic should be read like this:

- limit bursts to min(200, half of guessed socket buffer size)
- limit pause to HZ/4 jiffies in idle periods
- if socket buffer is full try again after 1 jiffie and
increase the wakeups twice, so that we do not hit the
socket limit again

> To me, congestion control is a complicated thing (there is plenty of
> literature for TCP avoid it). I'm not sure how many patches will
> follow after this one to try to improve your congestion control
> solution.
> 
> So, my question is, are you sure you want to enter this domain?

	We should start with something. Better ideas are
always welcome.

> IMO, better to stick to some simple solution, ie. just drop messages
> if the worker thread receives a high peak of work, than trying to
> define some sort of rate-limit solution based on assumptions that are
> not generic enough for every setup.

	Before now we dropped messages due to large fixed sleep time.
They were dropped when socket buffer was full. Dropping messages is
not nice just because we decided to sleep longer than needed.
With the change we still drop messages but not before pause
reaches 1 jiffie. As we can not use lower sleep time only
additional wakeup in sb_queue_tail can keep up with higher
sync rates. May be we should decide to drop messages only
when some max limit for packets in sync_queues is reached.

Regards

--
Julian Anastasov <ja@ssi.bg>

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH 5/9] ipvs: use adaptive pause in master thread
  2012-04-03 21:16     ` Julian Anastasov
@ 2012-04-05 15:05       ` Pablo Neira Ayuso
  2012-04-08 20:12         ` Julian Anastasov
  0 siblings, 1 reply; 18+ messages in thread
From: Pablo Neira Ayuso @ 2012-04-05 15:05 UTC (permalink / raw)
  To: Julian Anastasov
  Cc: Simon Horman, lvs-devel, netdev, netfilter-devel, Wensong Zhang

Hi Julian,

On Wed, Apr 04, 2012 at 12:16:15AM +0300, Julian Anastasov wrote:
> 
> 	Hello,
> 
> On Mon, 2 Apr 2012, Pablo Neira Ayuso wrote:
> 
> > > From: Julian Anastasov <ja@ssi.bg>
> > > 
> > > 	High rate of sync messages in master can lead to
> > > overflowing the socket buffer and dropping the messages.
> > > Instead of using fixed pause try to adapt to the sync
> > > rate, so that we do not wakeup too often while at the same
> > > time staying below the socket buffer limit.
> > 
> > I see. You are avoiding the congestion in the socket queue by putting
> > the pressure in your sync_buff queue  (I don't find any limit in
> > the code for the amount of memory that it may consume btw).
> > 
> > Please, correct me if I'm wrong, but from this I can se you're
> > assuming that there's always room in the syn_buff queue to store
> > messages. This is valid if the high peak of traffic lasts for short
> > time. However, if it lasts long, the worker thread may not be able to
> > consume all the messages in time under high stress situation that
> > lasts long and the sync_buffer may keep growing more and more.
> 
> 	Agreed, packet processing does not check for any
> limits for the sync_queues lists. This can be addressed
> additionally, may be there can be some wakeup if threshold
> is reached, so that we wake up the master thread early.
> But there is always the problem that we do not know how
> deep/free is the socket buffer, after how much time the
> master thread will wakeup... We have to select some arbitrary
> number again.

I see, see below.

> > Moreover, the backup may receive sync messages talking about old
> > states that are not useful anymore to recover the load-balancing in
> > case of failover.
> 
> 	Can you explain more? Are you referring to the
> 2-second buffer delay?

I was refering to the situation in which you decide to delay sync
messages due to congestion. State sync needs to be soft real-time to
ensure that the backup is one copy-equivalence of the master (at
least, close to it, even if that's not possible). Otherwise, the
backup may contain useless out-of-date states.

> > One more concern, please see below.
> 
> > > -		schedule_timeout_interruptible(HZ);
> > > +		/* Max packets to send at once */
> > > +		if (count > 200)
> > > +			pause = max(1, (pause - HZ / 20));
> > > +		else if (count < 20)
> > > +			pause = min(HZ / 4, (pause + HZ / 20));
> > > +		schedule_timeout_interruptible(sb ? 1 : pause);
> > 
> > This rate-limits the amount of times the worker is woken up.
> > 
> > Still, this seems to me like an ad-hoc congestion solution. There's no
> > justification why those numbers has been chosed, eg. why do we assume
> > that we're congested if we've reached 200 packets in one single loop?
> 
> 	200 is arbitrary, 20% of txqueuelen, a way to avoid
> large bursts. It is the 'pause = max(1, (pause >> 1));' part
> that also matters because we try to increase the wakeup rate, so
> that socket buffer is below 50%. We assume sync traffic rate
> does not change much but we are still prepared to avoid
> drops while pause > 1 jiffie. If sync rate is too high may be
> we should wakeup the master thread in sb_queue_tail.
> 
> 	The above logic should be read like this:
> 
> - limit bursts to min(200, half of guessed socket buffer size)
> - limit pause to HZ/4 jiffies in idle periods
> - if socket buffer is full try again after 1 jiffie and
> increase the wakeups twice, so that we do not hit the
> socket limit again

I think you can control when the kernel thread is woken up with a
counting semaphore. The counter of that semaphore will be initially
set to zero. Then, you can up() the semaphore once per new buffer
that you enqueue to the sender.

feeder:
        add message to sync buffer
        if buffer full:
                enqueue buffer to sender_thread
                up(s)

sender_thread:
        while (1) {
                down(s)
                retrieve message from queue
                send message
        }

It seems to me like the classical producer/consumer problem that you
can resolve with semaphores.

Thus, you don't need to poll periodically to check if there's
messages. You won't need to adapt "pause" time.

> > To me, congestion control is a complicated thing (there is plenty of
> > literature for TCP avoid it). I'm not sure how many patches will
> > follow after this one to try to improve your congestion control
> > solution.
> > 
> > So, my question is, are you sure you want to enter this domain?
> 
> 	We should start with something. Better ideas are
> always welcome.
> 
> > IMO, better to stick to some simple solution, ie. just drop messages
> > if the worker thread receives a high peak of work, than trying to
> > define some sort of rate-limit solution based on assumptions that are
> > not generic enough for every setup.
> 
> 	Before now we dropped messages due to large fixed sleep time.
> They were dropped when socket buffer was full. Dropping messages is
> not nice just because we decided to sleep longer than needed.
> With the change we still drop messages but not before pause
> reaches 1 jiffie. As we can not use lower sleep time only
> additional wakeup in sb_queue_tail can keep up with higher
> sync rates. May be we should decide to drop messages only
> when some max limit for packets in sync_queues is reached.

Under congestion the situation is complicated. At some point you'll
end up dropping messages.

You may want to increase the socket queue to delay the moment at which
we start dropping messages. You can expose the socke buffer length via
/proc interface I guess (not sure if you're already doing that or
suggesting to use the global socket buffer length).

You also can define some mechanism to reduce the amount of events,
some state filtering so you only propagate important states.

Some partially reliable protocol, so the backup can request messages
that got lost in a smart way would can also in handy. Basically, the
master only retransmits the current state, not the whole sequence of
messages (this is good under congestion, since you save messages).
I implement that in conntrackd, but that's more complex solution,
of course. I'd start with something simple.

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH 5/9] ipvs: use adaptive pause in master thread
  2012-04-05 15:05       ` Pablo Neira Ayuso
@ 2012-04-08 20:12         ` Julian Anastasov
  2012-04-09 23:08           ` Pablo Neira Ayuso
  0 siblings, 1 reply; 18+ messages in thread
From: Julian Anastasov @ 2012-04-08 20:12 UTC (permalink / raw)
  To: Pablo Neira Ayuso
  Cc: Simon Horman, lvs-devel, netdev, netfilter-devel, Wensong Zhang


	Hello,

On Thu, 5 Apr 2012, Pablo Neira Ayuso wrote:

> I think you can control when the kernel thread is woken up with a
> counting semaphore. The counter of that semaphore will be initially
> set to zero. Then, you can up() the semaphore once per new buffer
> that you enqueue to the sender.
> 
> feeder:
>         add message to sync buffer
>         if buffer full:
>                 enqueue buffer to sender_thread
>                 up(s)
> 
> sender_thread:
>         while (1) {
>                 down(s)
>                 retrieve message from queue
>                 send message
>         }
> 
> It seems to me like the classical producer/consumer problem that you
> can resolve with semaphores.

	May be it is possible to use up/down but we
have to handle the kthread_should_stop check and also
I prefer to reduce the wakeup events. So, I'm trying
another solution which is appended just for review.

> Under congestion the situation is complicated. At some point you'll
> end up dropping messages.
> 
> You may want to increase the socket queue to delay the moment at which
> we start dropping messages. You can expose the socke buffer length via
> /proc interface I guess (not sure if you're already doing that or
> suggesting to use the global socket buffer length).

	I'm still thinking if sndbuf value should be exported,
currently users have to modify the global default/max value.
But in below version I'm trying to handle the sndbuf overflow
by blocking for write_space event. By this way we should work
with any sndbuf configuration.

> You also can define some mechanism to reduce the amount of events,
> some state filtering so you only propagate important states.
> 
> Some partially reliable protocol, so the backup can request messages
> that got lost in a smart way would can also in handy. Basically, the
> master only retransmits the current state, not the whole sequence of
> messages (this is good under congestion, since you save messages).
> I implement that in conntrackd, but that's more complex solution,
> of course. I'd start with something simple.

	The patch "reduce sync rate with time thresholds"
that follows the discussed one in the changeset has such
purpose to reduce the events, in tests the sync traffic is
reduced ~10 times. But it does not modify the current
protocol, it adds a very limited logic for retransmissions.

	Here is the new version for review. It compiles
and does not crash in simple tests.

ipvs: wakeup master thread

	High rate of sync messages in master can lead to
overflowing the socket buffer and dropping the messages.
Fixed pause of 1 second is not suitable for loaded masters,
so allow packet processing to wakeup the master thread
once per 32 messages (IPVS_SYNC_WAKEUP_RATE), so that
we do not send in long bursts even when socket buffer is
very large.

	Add hard limit for the queued messages before sending
by using "sync_qlen_max" sysctl var. It defaults to 1/32 of
the memory pages but actually represents number of messages.
It will protect us from allocating large parts of memory
when the sending rate is lower than the queuing rate.

	Change the master thread to detect and block on
SNDBUF overflow, so that we do not drop messages when
the socket limit is low but the sync_qlen_max limit is
not reached. On ENOBUFS or other errors just drop the
messages.

	Use lower pause (200ms) to wait for messages, it
takes effect when the sync rate is low. It has two purposes,
to limit the delay for queued messages and to avoid
many wakeups when no messages are queued.

	Finally, make sure kthread_should_stop is checked
properly in TASK_INTERRUPTIBLE state when going to sleep
to avoid delays in thread stopping.

Signed-off-by: Julian Anastasov <ja@ssi.bg>
---
 include/net/ip_vs.h             |   15 ++++++++
 net/netfilter/ipvs/ip_vs_ctl.c  |    8 ++++
 net/netfilter/ipvs/ip_vs_sync.c |   74 +++++++++++++++++++++++++++++---------
 3 files changed, 79 insertions(+), 18 deletions(-)

diff --git a/include/net/ip_vs.h b/include/net/ip_vs.h
index 2bdee51..539f557 100644
--- a/include/net/ip_vs.h
+++ b/include/net/ip_vs.h
@@ -870,6 +870,7 @@ struct netns_ipvs {
 #endif
 	int			sysctl_snat_reroute;
 	int			sysctl_sync_ver;
+	int			sysctl_sync_qlen_max;
 	int			sysctl_cache_bypass;
 	int			sysctl_expire_nodest_conn;
 	int			sysctl_expire_quiescent_template;
@@ -890,6 +891,8 @@ struct netns_ipvs {
 	struct timer_list	est_timer;	/* Estimation timer */
 	/* ip_vs_sync */
 	struct list_head	sync_queue;
+	int			sync_queue_len;
+	unsigned int		sync_queue_sent;
 	spinlock_t		sync_lock;
 	struct ip_vs_sync_buff  *sync_buff;
 	spinlock_t		sync_buff_lock;
@@ -912,6 +915,8 @@ struct netns_ipvs {
 #define DEFAULT_SYNC_THRESHOLD	3
 #define DEFAULT_SYNC_PERIOD	50
 #define DEFAULT_SYNC_VER	1
+#define IPVS_SYNC_WAKEUP_RATE	32
+#define IPVS_SYNC_QLEN_MAX	(IPVS_SYNC_WAKEUP_RATE * 4)
 
 #ifdef CONFIG_SYSCTL
 
@@ -930,6 +935,11 @@ static inline int sysctl_sync_ver(struct netns_ipvs *ipvs)
 	return ipvs->sysctl_sync_ver;
 }
 
+static inline int sysctl_sync_qlen_max(struct netns_ipvs *ipvs)
+{
+	return ipvs->sysctl_sync_qlen_max;
+}
+
 #else
 
 static inline int sysctl_sync_threshold(struct netns_ipvs *ipvs)
@@ -947,6 +957,11 @@ static inline int sysctl_sync_ver(struct netns_ipvs *ipvs)
 	return DEFAULT_SYNC_VER;
 }
 
+static inline int sysctl_sync_qlen_max(struct netns_ipvs *ipvs)
+{
+	return IPVS_SYNC_QLEN_MAX;
+}
+
 #endif
 
 /*
diff --git a/net/netfilter/ipvs/ip_vs_ctl.c b/net/netfilter/ipvs/ip_vs_ctl.c
index 964d426..e3280ae 100644
--- a/net/netfilter/ipvs/ip_vs_ctl.c
+++ b/net/netfilter/ipvs/ip_vs_ctl.c
@@ -1718,6 +1718,12 @@ static struct ctl_table vs_vars[] = {
 		.proc_handler	= &proc_do_sync_mode,
 	},
 	{
+		.procname	= "sync_qlen_max",
+		.maxlen		= sizeof(int),
+		.mode		= 0644,
+		.proc_handler	= proc_dointvec,
+	},
+	{
 		.procname	= "cache_bypass",
 		.maxlen		= sizeof(int),
 		.mode		= 0644,
@@ -3662,6 +3668,8 @@ int __net_init ip_vs_control_net_init_sysctl(struct net *net)
 	tbl[idx++].data = &ipvs->sysctl_snat_reroute;
 	ipvs->sysctl_sync_ver = 1;
 	tbl[idx++].data = &ipvs->sysctl_sync_ver;
+	ipvs->sysctl_sync_qlen_max = nr_free_buffer_pages() / 32;
+	tbl[idx++].data = &ipvs->sysctl_sync_qlen_max;
 	tbl[idx++].data = &ipvs->sysctl_cache_bypass;
 	tbl[idx++].data = &ipvs->sysctl_expire_nodest_conn;
 	tbl[idx++].data = &ipvs->sysctl_expire_quiescent_template;
diff --git a/net/netfilter/ipvs/ip_vs_sync.c b/net/netfilter/ipvs/ip_vs_sync.c
index 0e36679..b36f89b 100644
--- a/net/netfilter/ipvs/ip_vs_sync.c
+++ b/net/netfilter/ipvs/ip_vs_sync.c
@@ -312,6 +312,7 @@ static inline struct ip_vs_sync_buff *sb_dequeue(struct netns_ipvs *ipvs)
 				struct ip_vs_sync_buff,
 				list);
 		list_del(&sb->list);
+		ipvs->sync_queue_len--;
 	}
 	spin_unlock_bh(&ipvs->sync_lock);
 
@@ -358,9 +359,13 @@ static inline void sb_queue_tail(struct netns_ipvs *ipvs)
 	struct ip_vs_sync_buff *sb = ipvs->sync_buff;
 
 	spin_lock(&ipvs->sync_lock);
-	if (ipvs->sync_state & IP_VS_STATE_MASTER)
+	if (ipvs->sync_state & IP_VS_STATE_MASTER &&
+	    ipvs->sync_queue_len < sysctl_sync_qlen_max(ipvs)) {
 		list_add_tail(&sb->list, &ipvs->sync_queue);
-	else
+		ipvs->sync_queue_len++;
+		if (!((++ipvs->sync_queue_sent) & (IPVS_SYNC_WAKEUP_RATE-1)))
+			wake_up_process(ipvs->master_thread);
+	} else
 		ip_vs_sync_buff_release(sb);
 	spin_unlock(&ipvs->sync_lock);
 }
@@ -405,10 +410,11 @@ void ip_vs_sync_switch_mode(struct net *net, int mode)
 		ipvs->sync_buff = NULL;
 	} else {
 		spin_lock_bh(&ipvs->sync_lock);
-		if (ipvs->sync_state & IP_VS_STATE_MASTER)
+		if (ipvs->sync_state & IP_VS_STATE_MASTER) {
 			list_add_tail(&ipvs->sync_buff->list,
 				      &ipvs->sync_queue);
-		else
+			ipvs->sync_queue_len++;
+		} else
 			ip_vs_sync_buff_release(ipvs->sync_buff);
 		spin_unlock_bh(&ipvs->sync_lock);
 	}
@@ -1392,18 +1398,22 @@ ip_vs_send_async(struct socket *sock, const char *buffer, const size_t length)
 	return len;
 }
 
-static void
+static int
 ip_vs_send_sync_msg(struct socket *sock, struct ip_vs_sync_mesg *msg)
 {
 	int msize;
+	int ret;
 
 	msize = msg->size;
 
 	/* Put size in network byte order */
 	msg->size = htons(msg->size);
 
-	if (ip_vs_send_async(sock, (char *)msg, msize) != msize)
-		pr_err("ip_vs_send_async error\n");
+	ret = ip_vs_send_async(sock, (char *)msg, msize);
+	if (ret >= 0 || ret == -EAGAIN)
+		return ret;
+	pr_err("ip_vs_send_async error %d\n", ret);
+	return 0;
 }
 
 static int
@@ -1428,32 +1438,58 @@ ip_vs_receive(struct socket *sock, char *buffer, const size_t buflen)
 	return len;
 }
 
+/* Get next buffer to send */
+static inline struct ip_vs_sync_buff *
+next_sync_buff(struct netns_ipvs *ipvs)
+{
+	struct ip_vs_sync_buff *sb;
+
+	sb = sb_dequeue(ipvs);
+	if (sb)
+		return sb;
+	/* Do not delay entries in buffer for more than 2 seconds */
+	return get_curr_sync_buff(ipvs, 2 * HZ);
+}
 
 static int sync_thread_master(void *data)
 {
 	struct ip_vs_sync_thread_data *tinfo = data;
 	struct netns_ipvs *ipvs = net_ipvs(tinfo->net);
+	struct sock *sk = tinfo->sock->sk;
 	struct ip_vs_sync_buff *sb;
 
 	pr_info("sync thread started: state = MASTER, mcast_ifn = %s, "
 		"syncid = %d\n",
 		ipvs->master_mcast_ifn, ipvs->master_syncid);
 
-	while (!kthread_should_stop()) {
-		while ((sb = sb_dequeue(ipvs))) {
-			ip_vs_send_sync_msg(tinfo->sock, sb->mesg);
-			ip_vs_sync_buff_release(sb);
+	for (;;) {
+		sb = next_sync_buff(ipvs);
+		if (!sb) {
+			set_current_state(TASK_INTERRUPTIBLE);
+			if (kthread_should_stop())
+				break;
+			schedule_timeout(HZ / 5);
+			continue;
 		}
 
-		/* check if entries stay in ipvs->sync_buff for 2 seconds */
-		sb = get_curr_sync_buff(ipvs, 2 * HZ);
-		if (sb) {
-			ip_vs_send_sync_msg(tinfo->sock, sb->mesg);
-			ip_vs_sync_buff_release(sb);
+retry:
+		if (unlikely(kthread_should_stop()))
+			break;
+		if (ip_vs_send_sync_msg(tinfo->sock, sb->mesg) < 0) {
+			int ret = 0;
+
+			__wait_event_interruptible(*sk_sleep(sk),
+						   sock_writeable(sk) ||
+						   kthread_should_stop(),
+						   ret);
+			goto retry;
 		}

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* Re: [PATCH 5/9] ipvs: use adaptive pause in master thread
  2012-04-08 20:12         ` Julian Anastasov
@ 2012-04-09 23:08           ` Pablo Neira Ayuso
  2012-04-11 20:02             ` Julian Anastasov
  0 siblings, 1 reply; 18+ messages in thread
From: Pablo Neira Ayuso @ 2012-04-09 23:08 UTC (permalink / raw)
  To: Julian Anastasov
  Cc: Simon Horman, lvs-devel, netdev, netfilter-devel, Wensong Zhang

Hi Julian,

On Sun, Apr 08, 2012 at 11:12:53PM +0300, Julian Anastasov wrote:
> 
> 	Hello,
> 
> On Thu, 5 Apr 2012, Pablo Neira Ayuso wrote:
> 
> > I think you can control when the kernel thread is woken up with a
> > counting semaphore. The counter of that semaphore will be initially
> > set to zero. Then, you can up() the semaphore once per new buffer
> > that you enqueue to the sender.
> > 
> > feeder:
> >         add message to sync buffer
> >         if buffer full:
> >                 enqueue buffer to sender_thread
> >                 up(s)
> > 
> > sender_thread:
> >         while (1) {
> >                 down(s)
> >                 retrieve message from queue
> >                 send message
> >         }
> > 
> > It seems to me like the classical producer/consumer problem that you
> > can resolve with semaphores.
> 
> 	May be it is possible to use up/down but we
> have to handle the kthread_should_stop check and also
> I prefer to reduce the wakeup events. So, I'm trying
> another solution which is appended just for review.

You can still use kthread_should_stop inside a wrapper function
that calls kthread_stop and up() the semaphore.

sync_stop:
        kthread_stop(k)
        up(s)

kthread_routine:
        while(1) {
                down(s)
                if (kthread_should_stop(k))
                        break;

                get sync msg
                send sync msg
        }

BTW, each up() does not necessarily mean one wakeup event. up() will
delivery only one wakeup event for one process that has been already
awaken.

> > Under congestion the situation is complicated. At some point you'll
> > end up dropping messages.
> > 
> > You may want to increase the socket queue to delay the moment at which
> > we start dropping messages. You can expose the socke buffer length via
> > /proc interface I guess (not sure if you're already doing that or
> > suggesting to use the global socket buffer length).
> 
> 	I'm still thinking if sndbuf value should be exported,
> currently users have to modify the global default/max value.

I think it's a good idea.

> But in below version I'm trying to handle the sndbuf overflow
> by blocking for write_space event. By this way we should work
> with any sndbuf configuration.

You seem to be defering the overrun problem by using a longer
intermediate queue than the socket buffer. Then, that queue can be
tuned by the user via sysctl. It may happen under heavy stress that
your intermediate queue gets full again, then you'll have to drop
packets at some point.

> > You also can define some mechanism to reduce the amount of events,
> > some state filtering so you only propagate important states.
> > 
> > Some partially reliable protocol, so the backup can request messages
> > that got lost in a smart way would can also in handy. Basically, the
> > master only retransmits the current state, not the whole sequence of
> > messages (this is good under congestion, since you save messages).
> > I implement that in conntrackd, but that's more complex solution,
> > of course. I'd start with something simple.
> 
> 	The patch "reduce sync rate with time thresholds"
> that follows the discussed one in the changeset has such
> purpose to reduce the events, in tests the sync traffic is
> reduced ~10 times. But it does not modify the current
> protocol, it adds a very limited logic for retransmissions.

Not directly related to this, but I'd prefer if any retransmission
support (or any new feature) gets added in follow-up patches. So we
can things separated in logic pieces. Thanks.

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH 5/9] ipvs: use adaptive pause in master thread
  2012-04-09 23:08           ` Pablo Neira Ayuso
@ 2012-04-11 20:02             ` Julian Anastasov
  2012-04-12  0:13               ` Pablo Neira Ayuso
  0 siblings, 1 reply; 18+ messages in thread
From: Julian Anastasov @ 2012-04-11 20:02 UTC (permalink / raw)
  To: Pablo Neira Ayuso
  Cc: Simon Horman, lvs-devel, netdev, netfilter-devel, Wensong Zhang


	Hello,

On Tue, 10 Apr 2012, Pablo Neira Ayuso wrote:

> You can still use kthread_should_stop inside a wrapper function
> that calls kthread_stop and up() the semaphore.
> 
> sync_stop:
>         kthread_stop(k)
>         up(s)
> 
> kthread_routine:
>         while(1) {
>                 down(s)
>                 if (kthread_should_stop(k))
>                         break;
> 
>                 get sync msg
>                 send sync msg
>         }
> 
> BTW, each up() does not necessarily mean one wakeup event. up() will
> delivery only one wakeup event for one process that has been already
> awaken.

	OK, now I added up(). It will be called when
32 messages are queued after last sent by thread.

> > 	I'm still thinking if sndbuf value should be exported,
> > currently users have to modify the global default/max value.
> 
> I think it's a good idea.

	Done, used same value both for rcvbuf and sndbuf.

> > But in below version I'm trying to handle the sndbuf overflow
> > by blocking for write_space event. By this way we should work
> > with any sndbuf configuration.
> 
> You seem to be defering the overrun problem by using a longer
> intermediate queue than the socket buffer. Then, that queue can be
> tuned by the user via sysctl. It may happen under heavy stress that
> your intermediate queue gets full again, then you'll have to drop
> packets at some point.

	Yes, both values are for same thing, the problem is
that queue size is in messages while socket buffer is in bytes.
And as sndbuf config is optional, I'm not trying to derive
sync_qlen_max from sndbuf. May be we can do it after
socket is created but it will cause problem for systems
that do not configure sync_sock_size, they before now used
unlimited queue and may be default socket size, so using
some small default sndbuf as sync_qlen_max can cause message
drops. They will use reduced limits. So, now we provide
some large sync_qlen_max as default configuration
which probably exceeds the default socket buffer.

	Still, I think the down/up idea is not better.
We are adding two new vars: master_stopped and
master_sem.

	The problem is that kthread_stop() is a blocking
function. It waits thread to terminate. It can not wakeup
thread blocked in down(), so we add master_stopped flag
that will unblock the down() loop while kthread_stop() will also
unblock thread if waiting for write_space. I.e. up()+kthread_stop()
is racy without additional flag while kthread_stop()+up()
is not possible to work.

	I'm appending untested version with up+down but I think
we should use wake_up_process and schedule_timeout instead,
as in previous version.

diff --git a/include/net/ip_vs.h b/include/net/ip_vs.h
index 2bdee51..8ed41eb 100644
--- a/include/net/ip_vs.h
+++ b/include/net/ip_vs.h
@@ -870,6 +870,8 @@ struct netns_ipvs {
 #endif
 	int			sysctl_snat_reroute;
 	int			sysctl_sync_ver;
+	int			sysctl_sync_qlen_max;
+	int			sysctl_sync_sock_size;
 	int			sysctl_cache_bypass;
 	int			sysctl_expire_nodest_conn;
 	int			sysctl_expire_quiescent_template;
@@ -890,6 +892,10 @@ struct netns_ipvs {
 	struct timer_list	est_timer;	/* Estimation timer */
 	/* ip_vs_sync */
 	struct list_head	sync_queue;
+	int			sync_queue_len;
+	unsigned int		sync_queue_delay;
+	int			master_stopped;
+	struct semaphore	master_sem;
 	spinlock_t		sync_lock;
 	struct ip_vs_sync_buff  *sync_buff;
 	spinlock_t		sync_buff_lock;
@@ -912,6 +918,8 @@ struct netns_ipvs {
 #define DEFAULT_SYNC_THRESHOLD	3
 #define DEFAULT_SYNC_PERIOD	50
 #define DEFAULT_SYNC_VER	1
+#define IPVS_SYNC_WAKEUP_RATE	32
+#define IPVS_SYNC_QLEN_MAX	(IPVS_SYNC_WAKEUP_RATE * 4)
 
 #ifdef CONFIG_SYSCTL
 
@@ -930,6 +938,16 @@ static inline int sysctl_sync_ver(struct netns_ipvs *ipvs)
 	return ipvs->sysctl_sync_ver;
 }
 
+static inline int sysctl_sync_qlen_max(struct netns_ipvs *ipvs)
+{
+	return ipvs->sysctl_sync_qlen_max;
+}
+
+static inline int sysctl_sync_sock_size(struct netns_ipvs *ipvs)
+{
+	return ipvs->sysctl_sync_sock_size;
+}
+
 #else
 
 static inline int sysctl_sync_threshold(struct netns_ipvs *ipvs)
@@ -947,6 +965,16 @@ static inline int sysctl_sync_ver(struct netns_ipvs *ipvs)
 	return DEFAULT_SYNC_VER;
 }
 
+static inline int sysctl_sync_qlen_max(struct netns_ipvs *ipvs)
+{
+	return IPVS_SYNC_QLEN_MAX;
+}
+
+static inline int sysctl_sync_sock_size(struct netns_ipvs *ipvs)
+{
+	return 0;
+}
+
 #endif
 
 /*
diff --git a/net/netfilter/ipvs/ip_vs_ctl.c b/net/netfilter/ipvs/ip_vs_ctl.c
index 964d426..2172fcc 100644
--- a/net/netfilter/ipvs/ip_vs_ctl.c
+++ b/net/netfilter/ipvs/ip_vs_ctl.c
@@ -1718,6 +1718,18 @@ static struct ctl_table vs_vars[] = {
 		.proc_handler	= &proc_do_sync_mode,
 	},
 	{
+		.procname	= "sync_qlen_max",
+		.maxlen		= sizeof(int),
+		.mode		= 0644,
+		.proc_handler	= proc_dointvec,
+	},
+	{
+		.procname	= "sync_sock_size",
+		.maxlen		= sizeof(int),
+		.mode		= 0644,
+		.proc_handler	= proc_dointvec,
+	},
+	{
 		.procname	= "cache_bypass",
 		.maxlen		= sizeof(int),
 		.mode		= 0644,
@@ -3662,6 +3674,10 @@ int __net_init ip_vs_control_net_init_sysctl(struct net *net)
 	tbl[idx++].data = &ipvs->sysctl_snat_reroute;
 	ipvs->sysctl_sync_ver = 1;
 	tbl[idx++].data = &ipvs->sysctl_sync_ver;
+	ipvs->sysctl_sync_qlen_max = nr_free_buffer_pages() / 32;
+	tbl[idx++].data = &ipvs->sysctl_sync_qlen_max;
+	ipvs->sysctl_sync_sock_size = 0;
+	tbl[idx++].data = &ipvs->sysctl_sync_sock_size;
 	tbl[idx++].data = &ipvs->sysctl_cache_bypass;
 	tbl[idx++].data = &ipvs->sysctl_expire_nodest_conn;
 	tbl[idx++].data = &ipvs->sysctl_expire_quiescent_template;
diff --git a/net/netfilter/ipvs/ip_vs_sync.c b/net/netfilter/ipvs/ip_vs_sync.c
index 0e36679..0bd473c 100644
--- a/net/netfilter/ipvs/ip_vs_sync.c
+++ b/net/netfilter/ipvs/ip_vs_sync.c
@@ -312,6 +312,9 @@ static inline struct ip_vs_sync_buff *sb_dequeue(struct netns_ipvs *ipvs)
 				struct ip_vs_sync_buff,
 				list);
 		list_del(&sb->list);
+		ipvs->sync_queue_len--;
+		if (!ipvs->sync_queue_len)
+			ipvs->sync_queue_delay = 0;
 	}
 	spin_unlock_bh(&ipvs->sync_lock);
 
@@ -358,9 +361,13 @@ static inline void sb_queue_tail(struct netns_ipvs *ipvs)
 	struct ip_vs_sync_buff *sb = ipvs->sync_buff;
 
 	spin_lock(&ipvs->sync_lock);
-	if (ipvs->sync_state & IP_VS_STATE_MASTER)
+	if (ipvs->sync_state & IP_VS_STATE_MASTER &&
+	    ipvs->sync_queue_len < sysctl_sync_qlen_max(ipvs)) {
 		list_add_tail(&sb->list, &ipvs->sync_queue);
-	else
+		ipvs->sync_queue_len++;
+		if ((++ipvs->sync_queue_delay) == IPVS_SYNC_WAKEUP_RATE)
+			up(&ipvs->master_sem);
+	} else
 		ip_vs_sync_buff_release(sb);
 	spin_unlock(&ipvs->sync_lock);
 }
@@ -405,10 +412,11 @@ void ip_vs_sync_switch_mode(struct net *net, int mode)
 		ipvs->sync_buff = NULL;
 	} else {
 		spin_lock_bh(&ipvs->sync_lock);
-		if (ipvs->sync_state & IP_VS_STATE_MASTER)
+		if (ipvs->sync_state & IP_VS_STATE_MASTER) {
 			list_add_tail(&ipvs->sync_buff->list,
 				      &ipvs->sync_queue);
-		else
+			ipvs->sync_queue_len++;
+		} else
 			ip_vs_sync_buff_release(ipvs->sync_buff);
 		spin_unlock_bh(&ipvs->sync_lock);
 	}
@@ -1130,6 +1138,28 @@ static void ip_vs_process_message(struct net *net, __u8 *buffer,
 
 
 /*
+ *      Setup sndbuf (mode=1) or rcvbuf (mode=0)
+ */
+static void set_sock_size(struct sock *sk, int mode, int val)
+{
+	/* setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &val, sizeof(val)); */
+	/* setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &val, sizeof(val)); */
+	lock_sock(sk);
+	if (mode) {
+		val = clamp_t(int, val, (SOCK_MIN_SNDBUF + 1) / 2,
+			      sysctl_wmem_max);
+		sk->sk_sndbuf = val * 2;
+		sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
+	} else {
+		val = clamp_t(int, val, (SOCK_MIN_RCVBUF + 1) / 2,
+			      sysctl_rmem_max);
+		sk->sk_rcvbuf = val * 2;
+		sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
+	}
+	release_sock(sk);
+}
+
+/*
  *      Setup loopback of outgoing multicasts on a sending socket
  */
 static void set_mcast_loop(struct sock *sk, u_char loop)
@@ -1305,6 +1335,9 @@ static struct socket *make_send_sock(struct net *net)
 
 	set_mcast_loop(sock->sk, 0);
 	set_mcast_ttl(sock->sk, 1);
+	result = sysctl_sync_sock_size(ipvs);
+	if (result > 0)
+		set_sock_size(sock->sk, 1, result);
 
 	result = bind_mcastif_addr(sock, ipvs->master_mcast_ifn);
 	if (result < 0) {
@@ -1350,6 +1383,9 @@ static struct socket *make_receive_sock(struct net *net)
 	sk_change_net(sock->sk, net);
 	/* it is equivalent to the REUSEADDR option in user-space */
 	sock->sk->sk_reuse = 1;
+	result = sysctl_sync_sock_size(ipvs);
+	if (result > 0)
+		set_sock_size(sock->sk, 0, result);
 
 	result = sock->ops->bind(sock, (struct sockaddr *) &mcast_addr,
 			sizeof(struct sockaddr));
@@ -1392,18 +1428,22 @@ ip_vs_send_async(struct socket *sock, const char *buffer, const size_t length)
 	return len;
 }
 
-static void
+static int
 ip_vs_send_sync_msg(struct socket *sock, struct ip_vs_sync_mesg *msg)
 {
 	int msize;
+	int ret;
 
 	msize = msg->size;
 
 	/* Put size in network byte order */
 	msg->size = htons(msg->size);
 
-	if (ip_vs_send_async(sock, (char *)msg, msize) != msize)
-		pr_err("ip_vs_send_async error\n");
+	ret = ip_vs_send_async(sock, (char *)msg, msize);
+	if (ret >= 0 || ret == -EAGAIN)
+		return ret;
+	pr_err("ip_vs_send_async error %d\n", ret);
+	return 0;
 }
 
 static int
@@ -1428,33 +1468,57 @@ ip_vs_receive(struct socket *sock, char *buffer, const size_t buflen)
 	return len;
 }
 
+/* Get next buffer to send */
+static inline struct ip_vs_sync_buff *
+next_sync_buff(struct netns_ipvs *ipvs)
+{
+	struct ip_vs_sync_buff *sb;
+
+	sb = sb_dequeue(ipvs);
+	if (sb)
+		return sb;
+	/* Do not delay entries in buffer for more than 2 seconds */
+	return get_curr_sync_buff(ipvs, 2 * HZ);
+}
 
 static int sync_thread_master(void *data)
 {
 	struct ip_vs_sync_thread_data *tinfo = data;
 	struct netns_ipvs *ipvs = net_ipvs(tinfo->net);
+	struct sock *sk = tinfo->sock->sk;
 	struct ip_vs_sync_buff *sb;
 
 	pr_info("sync thread started: state = MASTER, mcast_ifn = %s, "
 		"syncid = %d\n",
 		ipvs->master_mcast_ifn, ipvs->master_syncid);
 
-	while (!kthread_should_stop()) {
-		while ((sb = sb_dequeue(ipvs))) {
-			ip_vs_send_sync_msg(tinfo->sock, sb->mesg);
-			ip_vs_sync_buff_release(sb);
+	for (;;) {
+		sb = next_sync_buff(ipvs);
+		if (!sb) {
+			down_timeout(&ipvs->master_sem, HZ / 5);
+			if (ipvs->master_stopped)
+				break;
+			continue;
 		}
 
-		/* check if entries stay in ipvs->sync_buff for 2 seconds */
-		sb = get_curr_sync_buff(ipvs, 2 * HZ);
-		if (sb) {
-			ip_vs_send_sync_msg(tinfo->sock, sb->mesg);
-			ip_vs_sync_buff_release(sb);
+retry:
+		if (unlikely(ipvs->master_stopped))
+			break;
+		if (ip_vs_send_sync_msg(tinfo->sock, sb->mesg) < 0) {
+			int ret = 0;
+
+			__wait_event_interruptible(*sk_sleep(sk),
+						   sock_writeable(sk) ||
+						   kthread_should_stop(),
+						   ret);
+			goto retry;
 		}
-
-		schedule_timeout_interruptible(HZ);
+		ip_vs_sync_buff_release(sb);
 	}
 
+	if (sb)
+		ip_vs_sync_buff_release(sb);
+
 	/* clean up the sync_buff queue */
 	while ((sb = sb_dequeue(ipvs)))
 		ip_vs_sync_buff_release(sb);
@@ -1538,6 +1602,10 @@ int start_sync_thread(struct net *net, int state, char *mcast_ifn, __u8 syncid)
 		realtask = &ipvs->master_thread;
 		name = "ipvs_master:%d";
 		threadfn = sync_thread_master;
+		ipvs->sync_queue_len = 0;
+		ipvs->sync_queue_delay = 0;
+		ipvs->master_stopped = 0;
+		sema_init(&ipvs->master_sem, 0);
 		sock = make_send_sock(net);
 	} else if (state == IP_VS_STATE_BACKUP) {
 		if (ipvs->backup_thread)
@@ -1623,6 +1691,8 @@ int stop_sync_thread(struct net *net, int state)
 		spin_lock_bh(&ipvs->sync_lock);
 		ipvs->sync_state &= ~IP_VS_STATE_MASTER;
 		spin_unlock_bh(&ipvs->sync_lock);
+		ipvs->master_stopped = 1;
+		up(&ipvs->master_sem);
 		retc = kthread_stop(ipvs->master_thread);
 		ipvs->master_thread = NULL;
 	} else if (state == IP_VS_STATE_BACKUP) {

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* Re: [PATCH 5/9] ipvs: use adaptive pause in master thread
  2012-04-11 20:02             ` Julian Anastasov
@ 2012-04-12  0:13               ` Pablo Neira Ayuso
  2012-04-19 22:51                 ` Julian Anastasov
  0 siblings, 1 reply; 18+ messages in thread
From: Pablo Neira Ayuso @ 2012-04-12  0:13 UTC (permalink / raw)
  To: Julian Anastasov
  Cc: Simon Horman, lvs-devel, netdev, netfilter-devel, Wensong Zhang

Hi Julian,

On Wed, Apr 11, 2012 at 11:02:39PM +0300, Julian Anastasov wrote:
> 
> 	Hello,
> 
> On Tue, 10 Apr 2012, Pablo Neira Ayuso wrote:
> 
> > You can still use kthread_should_stop inside a wrapper function
> > that calls kthread_stop and up() the semaphore.
> > 
> > sync_stop:
> >         kthread_stop(k)
> >         up(s)
> > 
> > kthread_routine:
> >         while(1) {
> >                 down(s)
> >                 if (kthread_should_stop(k))
> >                         break;
> > 
> >                 get sync msg
> >                 send sync msg
> >         }
> > 
> > BTW, each up() does not necessarily mean one wakeup event. up() will
> > delivery only one wakeup event for one process that has been already
> > awaken.
> 
> 	OK, now I added up(). It will be called when
> 32 messages are queued after last sent by thread.

Why 32?

If you do up() once per message, you will still get an arbitrary
number of messages in the queue until the scheduler selects your
thread to enter the running state.

In other works, if you do up() once per 32 messages, your thread will
get N+32 messages in its queue by the time the scheduler makes it
enter the running state. Being N that amount of arbitrary messages.
This seems to me like more chances to overrun the socket buffer under
high stress.

> > > 	I'm still thinking if sndbuf value should be exported,
> > > currently users have to modify the global default/max value.
> > 
> > I think it's a good idea.
> 
> 	Done, used same value both for rcvbuf and sndbuf.
> 
> > > But in below version I'm trying to handle the sndbuf overflow
> > > by blocking for write_space event. By this way we should work
> > > with any sndbuf configuration.
> > 
> > You seem to be defering the overrun problem by using a longer
> > intermediate queue than the socket buffer. Then, that queue can be
> > tuned by the user via sysctl. It may happen under heavy stress that
> > your intermediate queue gets full again, then you'll have to drop
> > packets at some point.
> 
> 	Yes, both values are for same thing, the problem is
> that queue size is in messages while socket buffer is in bytes.
> And as sndbuf config is optional, I'm not trying to derive
> sync_qlen_max from sndbuf. May be we can do it after
> socket is created but it will cause problem for systems
> that do not configure sync_sock_size, they before now used
> unlimited queue and may be default socket size, so using
> some small default sndbuf as sync_qlen_max can cause message
> drops. They will use reduced limits. So, now we provide
> some large sync_qlen_max as default configuration
> which probably exceeds the default socket buffer.
>
> 	Still, I think the down/up idea is not better.
> We are adding two new vars: master_stopped and
> master_sem.

Well, this is not exactly the idea I had in mind.

> 	The problem is that kthread_stop() is a blocking
> function. It waits thread to terminate. It can not wakeup
> thread blocked in down(), so we add master_stopped flag
> that will unblock the down() loop while kthread_stop() will also
> unblock thread if waiting for write_space. I.e. up()+kthread_stop()
> is racy without additional flag while kthread_stop()+up()
> is not possible to work.

I don't see the up+kthread_stop() race you mention.

> 	I'm appending untested version with up+down but I think
> we should use wake_up_process and schedule_timeout instead,
> as in previous version.

OK.

I still think that using an intermediate queue is *not* the way
to achieve reliability and congestion control, sorry.

But, you seem to persist on the idea and I don't want to block your
developments, I just wanted to show my point and provide some ideas.
After all, you maintain that part of the code.

Please, tell me what patch you want me to apply and I'll take it.

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH 5/9] ipvs: use adaptive pause in master thread
  2012-04-12  0:13               ` Pablo Neira Ayuso
@ 2012-04-19 22:51                 ` Julian Anastasov
  0 siblings, 0 replies; 18+ messages in thread
From: Julian Anastasov @ 2012-04-19 22:51 UTC (permalink / raw)
  To: Pablo Neira Ayuso
  Cc: Simon Horman, lvs-devel, netdev, netfilter-devel, Wensong Zhang


	Hello Pablo,

On Thu, 12 Apr 2012, Pablo Neira Ayuso wrote:

> > 	OK, now I added up(). It will be called when
> > 32 messages are queued after last sent by thread.
> 
> Why 32?
> 
> If you do up() once per message, you will still get an arbitrary
> number of messages in the queue until the scheduler selects your
> thread to enter the running state.

	I understand this. It will save wakeups while
master thread is busy with messages but not if the
messages come with such gap that causes master thread
to send them one by one for every wakeup.

> In other works, if you do up() once per 32 messages, your thread will
> get N+32 messages in its queue by the time the scheduler makes it
> enter the running state. Being N that amount of arbitrary messages.
> This seems to me like more chances to overrun the socket buffer under
> high stress.

	I now modified the constant (to 8 which should be
8*8KB data, below default sndbuf) and the algorithm. The idea of
IPVS_SYNC_WAKEUP_RATE is to avoid situation where
we send wakeup for every message. It should be better
for the caching to send messages in short bursts.

> > 	Still, I think the down/up idea is not better.
> > We are adding two new vars: master_stopped and
> > master_sem.
> 
> Well, this is not exactly the idea I had in mind.

	Currently, we need a _timeout version because
sync_buff can be ready after 2 seconds and we do not
get wakeup for such incomplete buffer, we have to check it
from time to time. IIRC, using uninterruptible version
causes the thread to bump the CPU load usage, so it
is not appropriate - this state contributes to load.
It is really for busy state, not for idle state
waiting for messages to send.

> > 	The problem is that kthread_stop() is a blocking
> > function. It waits thread to terminate. It can not wakeup
> > thread blocked in down(), so we add master_stopped flag
> > that will unblock the down() loop while kthread_stop() will also
> > unblock thread if waiting for write_space. I.e. up()+kthread_stop()
> > is racy without additional flag while kthread_stop()+up()
> > is not possible to work.
> 
> I don't see the up+kthread_stop() race you mention.

	The problem is that __down_common exits only
on waiter.up != 0 (set only by __up). Here is the race:

master_thread		stop_sync_thread
----------------------------------------
down*

			STOP MASTER
			up()
next_sync_buff()=NULL
- no buffer to send

kthread_should_stop()?
Not yet

down*()

			- some delay here

			kthread_stop()
wakeup. Is semaphore
up (waiter.up)?
No => block again
			- we are blocked in
			kthread_stop()

The race is that master_thread can block again with
down() before kthread_stop() is reached by stop_sync_thread.
If master uses down_timeout it can exit this block but
after the timeout (-ETIME) which is not very good.
down_timeout uses TASK_UNINTERRUPTIBLE state :(

> > 	I'm appending untested version with up+down but I think
> > we should use wake_up_process and schedule_timeout instead,
> > as in previous version.
> 
> OK.
> 
> I still think that using an intermediate queue is *not* the way
> to achieve reliability and congestion control, sorry.

	It seems the idea here is not to delay the packet
processing with sending sync traffic from softirq, so we use
thread and intermediate queue for sending of sync messages,
probably by using idle CPU for this. It is a compromise
for setups with overloaded CPU for packets and other
idle CPU. During our tests for some sync parameters the
sync traffic was 100-200mbit which is not a small thing to
offload to other CPUs, even by using multiple master threads.
In such cases the CPU speed is bigger problem than the
memory used for intermediate queue.

> But, you seem to persist on the idea and I don't want to block your
> developments, I just wanted to show my point and provide some ideas.
> After all, you maintain that part of the code.
> 
> Please, tell me what patch you want me to apply and I'll take it.

	I'm posting new patchset that includes new version
of this patch. I hope it should be better, it limits the
delay of queued messages, so that conn state is synced without
big delays (20ms).

Regards

--
Julian Anastasov <ja@ssi.bg>

^ permalink raw reply	[flat|nested] 18+ messages in thread

end of thread, other threads:[~2012-04-19 22:51 UTC | newest]

Thread overview: 18+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2012-03-21  8:56 [GIT PULL nf-next] IPVS Simon Horman
2012-03-21  8:56 ` [PATCH 1/9] ipvs: ignore IP_VS_CONN_F_NOOUTPUT in backup server Simon Horman
2012-03-21  8:56 ` [PATCH 2/9] ipvs: remove check for IP_VS_CONN_F_SYNC from ip_vs_bind_dest Simon Horman
2012-03-21  8:56 ` [PATCH 3/9] ipvs: fix ip_vs_try_bind_dest to rebind app and transmitter Simon Horman
2012-03-21  8:56 ` [PATCH 4/9] ipvs: always update some of the flags bits in backup Simon Horman
2012-03-21  8:56 ` [PATCH 5/9] ipvs: use adaptive pause in master thread Simon Horman
2012-04-02 11:11   ` Pablo Neira Ayuso
2012-04-03 21:16     ` Julian Anastasov
2012-04-05 15:05       ` Pablo Neira Ayuso
2012-04-08 20:12         ` Julian Anastasov
2012-04-09 23:08           ` Pablo Neira Ayuso
2012-04-11 20:02             ` Julian Anastasov
2012-04-12  0:13               ` Pablo Neira Ayuso
2012-04-19 22:51                 ` Julian Anastasov
2012-03-21  8:56 ` [PATCH 6/9] ipvs: reduce sync rate with time thresholds Simon Horman
2012-03-21  8:56 ` [PATCH 7/9] ipvs: add support for sync threads Simon Horman
2012-03-21  8:56 ` [PATCH 8/9] ipvs: optimize the use of flags in ip_vs_bind_dest Simon Horman
2012-03-21  8:56 ` [PATCH 9/9] ipvs: Provide a generic ip_vs_bind_xmit() Simon Horman

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.