All of lore.kernel.org
 help / color / mirror / Atom feed
* [Cluster-devel] [RFC dlm/next 00/15] fs: dlm: performance
@ 2021-06-23 15:14 Alexander Aring
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 01/15] fs: dlm: clear CF_APP_LIMITED on close Alexander Aring
                   ` (15 more replies)
  0 siblings, 16 replies; 17+ messages in thread
From: Alexander Aring @ 2021-06-23 15:14 UTC (permalink / raw)
  To: cluster-devel.redhat.com

Hi,

this patch series to some more generic handling for some protocols but
mainly it should improve the performance. Here is why:

Current state:
 - Two global ordered workqueues for sending and receiving
 - Each connection has a "sock" mutex
 - The receiving functionality also processes dlm messages

Problem with this behaviour:
 - Each workqueue can only handle one connection in an ordered queuing
   which ends that we can never handle a parallel receive/send or
   processing for "all connections" (not per connection).
 - The sock mutex will block other send/recv handling if the same
   connection is queued in both workqueues
 - The sock mutex is also hold in processing dlm messages to block sending
   dlm messages

New behaviour:
 - Having io_workqueue for send/recv and process workqueue which is not
   ordered globally (but will be ordered per connection)
 - Allow parallel receive and send per connection, mutexes for receive
   and send workers will take care that we have ordered queuing _per_
   connection (same for process work)
 - The process workqueue will process dlm messages ordered in the
   background of io (send/recv) handling. While processing receive
   can still fill the "processqueue" with new possible arrival messages.

Notes:

I would like to get rid of the process workqueue but I didn't find a way
that we can still ordered receive while doing a ordered processing. Also
I am not sure how the unordered workqueue really works, I hope that the
workqueue handling is so clever that it stops queuing workers when it sees
that one worker is blocked at a mutex. Remember that we have these workers
per connection, so if one blocks others are still allowed to execute which
should make some performance improvements that other connections can also
be served. The main thing is I think that we switch from an ordered
workqueue to an unordered workqueue which should parallel the serving of
each connection instead doing each one after one.

Maybe the process workqueue can be changed to a tasklet, but this
requires a lot of more changes in dlm (although ACK messages can be
handled in non-sleepable contextes).

WARNING:

First time ever we let DLM application layer process parallel dlm messages,
BUT processing is per node/connection in an ordered way (which is
required). I tested it and I did saw no problems and think that global/per
lockspace multiple access per nodeid is correct protected for mutual
access.

- Alex

Alexander Aring (15):
  fs: dlm: clear CF_APP_LIMITED on close
  fs: dlm: introduce con_next_wq helper
  fs: dlm: move to static proto ops
  fs: dlm: introduce generic listen
  fs: dlm: auto load sctp module
  fs: dlm: generic connect func
  fs: dlm: fix multiple empty writequeue alloc
  fs: dlm: move receive loop into receive handler
  fs: dlm: introduce io_workqueue
  fs: dlm: introduce reconnect work
  fs: dlm: introduce process workqueue
  fs: dlm: remove send starve
  fs: dlm: move writequeue init to sendcon only
  fs: dlm: flush listen con
  fs: dlm: move srcu into loop call

 fs/dlm/lowcomms.c | 1548 +++++++++++++++++++++++----------------------
 fs/dlm/midcomms.c |   38 +-
 fs/dlm/midcomms.h |    3 +-
 3 files changed, 837 insertions(+), 752 deletions(-)

-- 
2.26.3



^ permalink raw reply	[flat|nested] 17+ messages in thread

* [Cluster-devel] [RFC dlm/next 01/15] fs: dlm: clear CF_APP_LIMITED on close
  2021-06-23 15:14 [Cluster-devel] [RFC dlm/next 00/15] fs: dlm: performance Alexander Aring
@ 2021-06-23 15:14 ` Alexander Aring
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 02/15] fs: dlm: introduce con_next_wq helper Alexander Aring
                   ` (14 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Alexander Aring @ 2021-06-23 15:14 UTC (permalink / raw)
  To: cluster-devel.redhat.com

If send_to_sock() sets CF_APP_LIMITED limited bit and it has not been
cleared by a waiting lowcomms_write_space() yet and a close_connection()
apprears we should clear the CF_APP_LIMITED bit again because the
connection starts from a new state again at reconnect.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 1 +
 1 file changed, 1 insertion(+)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 0ea9ae35da0b..670c3d395709 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -801,6 +801,7 @@ static void close_connection(struct connection *con, bool and_other,
 
 	con->rx_leftover = 0;
 	con->retries = 0;
+	clear_bit(CF_APP_LIMITED, &con->flags);
 	clear_bit(CF_CONNECTED, &con->flags);
 	clear_bit(CF_DELAY_CONNECT, &con->flags);
 	clear_bit(CF_RECONNECT, &con->flags);
-- 
2.26.3



^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [Cluster-devel] [RFC dlm/next 02/15] fs: dlm: introduce con_next_wq helper
  2021-06-23 15:14 [Cluster-devel] [RFC dlm/next 00/15] fs: dlm: performance Alexander Aring
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 01/15] fs: dlm: clear CF_APP_LIMITED on close Alexander Aring
@ 2021-06-23 15:14 ` Alexander Aring
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 03/15] fs: dlm: move to static proto ops Alexander Aring
                   ` (13 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Alexander Aring @ 2021-06-23 15:14 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch introduce a function to determine if something is ready to
being send in the writequeue. It's not just that the writequeue is not
empty additional the first entry need to have a valid length field.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 57 +++++++++++++++++++++++++++++------------------
 1 file changed, 35 insertions(+), 22 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 670c3d395709..395789bfc467 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -175,6 +175,22 @@ static void sctp_connect_to_sock(struct connection *con);
 static void tcp_connect_to_sock(struct connection *con);
 static void dlm_tcp_shutdown(struct connection *con);
 
+/* need to held writequeue_lock */
+static struct writequeue_entry *con_next_wq(struct connection *con)
+{
+	struct writequeue_entry *e;
+
+	if (list_empty(&con->writequeue))
+		return NULL;
+
+	e = list_first_entry(&con->writequeue, struct writequeue_entry,
+			     list);
+	if (e->len == 0)
+		return NULL;
+
+	return e;
+}
+
 static struct connection *__find_con(int nodeid, int r)
 {
 	struct connection *con;
@@ -1647,10 +1663,9 @@ int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
 /* Send a message */
 static void send_to_sock(struct connection *con)
 {
-	int ret = 0;
 	const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
 	struct writequeue_entry *e;
-	int len, offset;
+	int len, offset, ret;
 	int count = 0;
 
 	mutex_lock(&con->sock_mutex);
@@ -1659,7 +1674,8 @@ static void send_to_sock(struct connection *con)
 
 	spin_lock(&con->writequeue_lock);
 	for (;;) {
-		if (list_empty(&con->writequeue))
+		e = con_next_wq(con);
+		if (!e)
 			break;
 
 		e = list_first_entry(&con->writequeue, struct writequeue_entry, list);
@@ -1668,25 +1684,22 @@ static void send_to_sock(struct connection *con)
 		BUG_ON(len == 0 && e->users == 0);
 		spin_unlock(&con->writequeue_lock);
 
-		ret = 0;
-		if (len) {
-			ret = kernel_sendpage(con->sock, e->page, offset, len,
-					      msg_flags);
-			if (ret == -EAGAIN || ret == 0) {
-				if (ret == -EAGAIN &&
-				    test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
-				    !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
-					/* Notify TCP that we're limited by the
-					 * application window size.
-					 */
-					set_bit(SOCK_NOSPACE, &con->sock->flags);
-					con->sock->sk->sk_write_pending++;
-				}
-				cond_resched();
-				goto out;
-			} else if (ret < 0)
-				goto out;
-		}
+		ret = kernel_sendpage(con->sock, e->page, offset, len,
+				      msg_flags);
+		if (ret == -EAGAIN || ret == 0) {
+			if (ret == -EAGAIN &&
+			    test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
+			    !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
+				/* Notify TCP that we're limited by the
+				 * application window size.
+				 */
+				set_bit(SOCK_NOSPACE, &con->sock->flags);
+				con->sock->sk->sk_write_pending++;
+			}
+			cond_resched();
+			goto out;
+		} else if (ret < 0)
+			goto out;
 
 		/* Don't starve people filling buffers */
 		if (++count >= MAX_SEND_MSG_COUNT) {
-- 
2.26.3



^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [Cluster-devel] [RFC dlm/next 03/15] fs: dlm: move to static proto ops
  2021-06-23 15:14 [Cluster-devel] [RFC dlm/next 00/15] fs: dlm: performance Alexander Aring
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 01/15] fs: dlm: clear CF_APP_LIMITED on close Alexander Aring
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 02/15] fs: dlm: introduce con_next_wq helper Alexander Aring
@ 2021-06-23 15:14 ` Alexander Aring
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 04/15] fs: dlm: introduce generic listen Alexander Aring
                   ` (12 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Alexander Aring @ 2021-06-23 15:14 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch moves the per transport socket callbacks to a static const
array. We can support only one transport socket for the init namespace
which will be determinted by reading the dlm config at lowcomms_start().

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 52 +++++++++++++++++++++++++++--------------------
 1 file changed, 30 insertions(+), 22 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 395789bfc467..30ee2d349375 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -84,9 +84,6 @@ struct connection {
 	struct list_head writequeue;  /* List of outgoing writequeue_entries */
 	spinlock_t writequeue_lock;
 	atomic_t writequeue_cnt;
-	void (*connect_action) (struct connection *);	/* What to do to connect */
-	void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
-	bool (*eof_condition)(struct connection *con); /* What to do to eof check */
 	int retries;
 #define MAX_CONNECT_RETRIES 3
 	struct hlist_node list;
@@ -145,6 +142,15 @@ struct dlm_node_addr {
 	struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
 };
 
+struct dlm_proto_ops {
+	/* What to do to connect */
+	void (*connect_action)(struct connection *con);
+	/* What to do to shutdown */
+	void (*shutdown_action)(struct connection *con);
+	/* What to do to eof check */
+	bool (*eof_condition)(struct connection *con);
+};
+
 static struct listen_sock_callbacks {
 	void (*sk_error_report)(struct sock *);
 	void (*sk_data_ready)(struct sock *);
@@ -168,12 +174,13 @@ static struct hlist_head connection_hash[CONN_HASH_SIZE];
 static DEFINE_SPINLOCK(connections_lock);
 DEFINE_STATIC_SRCU(connections_srcu);
 
+static const struct dlm_proto_ops *dlm_proto_ops;
+
 static void process_recv_sockets(struct work_struct *work);
 static void process_send_sockets(struct work_struct *work);
 
 static void sctp_connect_to_sock(struct connection *con);
 static void tcp_connect_to_sock(struct connection *con);
-static void dlm_tcp_shutdown(struct connection *con);
 
 /* need to held writequeue_lock */
 static struct writequeue_entry *con_next_wq(struct connection *con)
@@ -224,20 +231,6 @@ static int dlm_con_init(struct connection *con, int nodeid)
 	INIT_WORK(&con->rwork, process_recv_sockets);
 	init_waitqueue_head(&con->shutdown_wait);
 
-	switch (dlm_config.ci_protocol) {
-	case DLM_PROTO_TCP:
-		con->connect_action = tcp_connect_to_sock;
-		con->shutdown_action = dlm_tcp_shutdown;
-		con->eof_condition = tcp_eof_condition;
-		break;
-	case DLM_PROTO_SCTP:
-		con->connect_action = sctp_connect_to_sock;
-		break;
-	default:
-		kfree(con->rx_buf);
-		return -EINVAL;
-	}
-
 	return 0;
 }
 
@@ -963,7 +956,8 @@ static int receive_from_sock(struct connection *con)
 		log_print("connection %p got EOF from %d",
 			  con, con->nodeid);
 
-		if (con->eof_condition && con->eof_condition(con)) {
+		if (dlm_proto_ops->eof_condition &&
+		    dlm_proto_ops->eof_condition(con)) {
 			set_bit(CF_EOF, &con->flags);
 			mutex_unlock(&con->sock_mutex);
 		} else {
@@ -1814,7 +1808,7 @@ static void process_send_sockets(struct work_struct *work)
 	if (con->sock == NULL) { /* not mutex protected so check it inside too */
 		if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags))
 			msleep(1000);
-		con->connect_action(con);
+		dlm_proto_ops->connect_action(con);
 	}
 	if (!list_empty(&con->writequeue))
 		send_to_sock(con);
@@ -1854,8 +1848,8 @@ static int work_start(void)
 
 static void shutdown_conn(struct connection *con)
 {
-	if (con->shutdown_action)
-		con->shutdown_action(con);
+	if (dlm_proto_ops->shutdown_action)
+		dlm_proto_ops->shutdown_action(con);
 }
 
 void dlm_lowcomms_shutdown(void)
@@ -1962,8 +1956,20 @@ void dlm_lowcomms_stop(void)
 	srcu_read_unlock(&connections_srcu, idx);
 	work_stop();
 	deinit_local();
+
+	dlm_proto_ops = NULL;
 }
 
+static const struct dlm_proto_ops dlm_tcp_ops = {
+	.connect_action = tcp_connect_to_sock,
+	.shutdown_action = dlm_tcp_shutdown,
+	.eof_condition = tcp_eof_condition,
+};
+
+static const struct dlm_proto_ops dlm_sctp_ops = {
+	.connect_action = sctp_connect_to_sock,
+};
+
 int dlm_lowcomms_start(void)
 {
 	int error = -EINVAL;
@@ -1990,9 +1996,11 @@ int dlm_lowcomms_start(void)
 	/* Start listening */
 	switch (dlm_config.ci_protocol) {
 	case DLM_PROTO_TCP:
+		dlm_proto_ops = &dlm_tcp_ops;
 		error = tcp_listen_for_all();
 		break;
 	case DLM_PROTO_SCTP:
+		dlm_proto_ops = &dlm_sctp_ops;
 		error = sctp_listen_for_all(&listen_con);
 		break;
 	default:
-- 
2.26.3



^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [Cluster-devel] [RFC dlm/next 04/15] fs: dlm: introduce generic listen
  2021-06-23 15:14 [Cluster-devel] [RFC dlm/next 00/15] fs: dlm: performance Alexander Aring
                   ` (2 preceding siblings ...)
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 03/15] fs: dlm: move to static proto ops Alexander Aring
@ 2021-06-23 15:14 ` Alexander Aring
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 05/15] fs: dlm: auto load sctp module Alexander Aring
                   ` (11 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Alexander Aring @ 2021-06-23 15:14 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch combines each transport layer listen functionality into one
listen function. Per transport layer differences are provided by
additional callbacks in dlm_proto_ops.

This patch drops silently sock_set_keepalive() for listen tcp sockets
only. This socket option is not set at connecting sockets, I also don't
see the sense of set keepalive for sockets which are created by accept()
only.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 228 +++++++++++++++++++++++-----------------------
 1 file changed, 113 insertions(+), 115 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 30ee2d349375..783cfd5f63a5 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -143,6 +143,13 @@ struct dlm_node_addr {
 };
 
 struct dlm_proto_ops {
+	const char *name;
+	int proto;
+
+	int (*listen_validate)(void);
+	void (*listen_sockopts)(struct socket *sock);
+	int (*listen_bind)(struct socket *sock);
+
 	/* What to do to connect */
 	void (*connect_action)(struct connection *con);
 	/* What to do to shutdown */
@@ -1328,59 +1335,6 @@ static void tcp_connect_to_sock(struct connection *con)
 	return;
 }
 
-/* On error caller must run dlm_close_sock() for the
- * listen connection socket.
- */
-static int tcp_create_listen_sock(struct listen_connection *con,
-				  struct sockaddr_storage *saddr)
-{
-	struct socket *sock = NULL;
-	int result = 0;
-	int addr_len;
-
-	if (dlm_local_addr[0]->ss_family == AF_INET)
-		addr_len = sizeof(struct sockaddr_in);
-	else
-		addr_len = sizeof(struct sockaddr_in6);
-
-	/* Create a socket to communicate with */
-	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
-				  SOCK_STREAM, IPPROTO_TCP, &sock);
-	if (result < 0) {
-		log_print("Can't create listening comms socket");
-		goto create_out;
-	}
-
-	sock_set_mark(sock->sk, dlm_config.ci_mark);
-
-	/* Turn off Nagle's algorithm */
-	tcp_sock_set_nodelay(sock->sk);
-
-	sock_set_reuseaddr(sock->sk);
-
-	add_listen_sock(sock, con);
-
-	/* Bind to our port */
-	make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
-	result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
-	if (result < 0) {
-		log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
-		goto create_out;
-	}
-	sock_set_keepalive(sock->sk);
-
-	result = sock->ops->listen(sock, 5);
-	if (result < 0) {
-		log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
-		goto create_out;
-	}
-
-	return 0;
-
-create_out:
-	return result;
-}
-
 /* Get local addresses */
 static void init_local(void)
 {
@@ -1407,63 +1361,6 @@ static void deinit_local(void)
 		kfree(dlm_local_addr[i]);
 }
 
-/* Initialise SCTP socket and bind to all interfaces
- * On error caller must run dlm_close_sock() for the
- * listen connection socket.
- */
-static int sctp_listen_for_all(struct listen_connection *con)
-{
-	struct socket *sock = NULL;
-	int result = -EINVAL;
-
-	log_print("Using SCTP for communications");
-
-	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
-				  SOCK_STREAM, IPPROTO_SCTP, &sock);
-	if (result < 0) {
-		log_print("Can't create comms socket, check SCTP is loaded");
-		goto out;
-	}
-
-	sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
-	sock_set_mark(sock->sk, dlm_config.ci_mark);
-	sctp_sock_set_nodelay(sock->sk);
-
-	add_listen_sock(sock, con);
-
-	/* Bind to all addresses. */
-	result = sctp_bind_addrs(con->sock, dlm_config.ci_tcp_port);
-	if (result < 0)
-		goto out;
-
-	result = sock->ops->listen(sock, 5);
-	if (result < 0) {
-		log_print("Can't set socket listening");
-		goto out;
-	}
-
-	return 0;
-
-out:
-	return result;
-}
-
-static int tcp_listen_for_all(void)
-{
-	/* We don't support multi-homed hosts */
-	if (dlm_local_count > 1) {
-		log_print("TCP protocol can't handle multi-homed hosts, "
-			  "try SCTP");
-		return -EINVAL;
-	}
-
-	log_print("Using TCP for communications");
-
-	return tcp_create_listen_sock(&listen_con, dlm_local_addr[0]);
-}
-
-
-
 static struct writequeue_entry *new_writequeue_entry(struct connection *con,
 						     gfp_t allocation)
 {
@@ -1960,13 +1857,112 @@ void dlm_lowcomms_stop(void)
 	dlm_proto_ops = NULL;
 }
 
+static int dlm_listen_for_all(void)
+{
+	struct socket *sock;
+	int result;
+
+	log_print("Using %s for communications",
+		  dlm_proto_ops->name);
+
+	if (dlm_proto_ops->listen_validate) {
+		result = dlm_proto_ops->listen_validate();
+		if (result < 0)
+			return result;
+	}
+
+	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
+				  SOCK_STREAM, dlm_proto_ops->proto, &sock);
+	if (result < 0) {
+		log_print("Can't create comms socket, check SCTP is loaded");
+		goto out;
+	}
+
+	sock_set_mark(sock->sk, dlm_config.ci_mark);
+	dlm_proto_ops->listen_sockopts(sock);
+
+	result = dlm_proto_ops->listen_bind(sock);
+	if (result < 0)
+		goto out;
+
+	save_listen_callbacks(sock);
+	add_listen_sock(sock, &listen_con);
+
+	INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
+	result = sock->ops->listen(sock, 5);
+	if (result < 0) {
+		dlm_close_sock(&listen_con.sock);
+		goto out;
+	}
+
+	return 0;
+
+out:
+	sock_release(sock);
+	return result;
+}
+
+static int dlm_tcp_listen_validate(void)
+{
+	/* We don't support multi-homed hosts */
+	if (dlm_local_count > 1) {
+		log_print("TCP protocol can't handle multi-homed hosts, try SCTP");
+		return -EINVAL;
+	}
+
+	return 0;
+}
+
+static void dlm_tcp_sockopts(struct socket *sock)
+{
+	/* Turn off Nagle's algorithm */
+	tcp_sock_set_nodelay(sock->sk);
+}
+
+static void dlm_tcp_listen_sockopts(struct socket *sock)
+{
+	dlm_tcp_sockopts(sock);
+	sock_set_reuseaddr(sock->sk);
+}
+
+static int dlm_tcp_listen_bind(struct socket *sock)
+{
+	int addr_len;
+
+	/* Bind to our port */
+	make_sockaddr(dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len);
+	return sock->ops->bind(sock, (struct sockaddr *)dlm_local_addr[0],
+			       addr_len);
+}
+
 static const struct dlm_proto_ops dlm_tcp_ops = {
+	.name = "TCP",
+	.proto = IPPROTO_TCP,
+	.listen_validate = dlm_tcp_listen_validate,
+	.listen_sockopts = dlm_tcp_listen_sockopts,
+	.listen_bind = dlm_tcp_listen_bind,
 	.connect_action = tcp_connect_to_sock,
 	.shutdown_action = dlm_tcp_shutdown,
 	.eof_condition = tcp_eof_condition,
 };
 
+static int dlm_sctp_bind_listen(struct socket *sock)
+{
+	return sctp_bind_addrs(sock, dlm_config.ci_tcp_port);
+}
+
+static void dlm_sctp_sockopts(struct socket *sock)
+{
+	/* Turn off Nagle's algorithm */
+	sctp_sock_set_nodelay(sock->sk);
+	sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
+}
+
 static const struct dlm_proto_ops dlm_sctp_ops = {
+	.name = "SCTP",
+	.proto = IPPROTO_SCTP,
+	.listen_sockopts = dlm_sctp_sockopts,
+	.listen_bind = dlm_sctp_bind_listen,
 	.connect_action = sctp_connect_to_sock,
 };
 
@@ -1997,24 +1993,26 @@ int dlm_lowcomms_start(void)
 	switch (dlm_config.ci_protocol) {
 	case DLM_PROTO_TCP:
 		dlm_proto_ops = &dlm_tcp_ops;
-		error = tcp_listen_for_all();
 		break;
 	case DLM_PROTO_SCTP:
 		dlm_proto_ops = &dlm_sctp_ops;
-		error = sctp_listen_for_all(&listen_con);
 		break;
 	default:
 		log_print("Invalid protocol identifier %d set",
 			  dlm_config.ci_protocol);
 		error = -EINVAL;
-		break;
+		goto fail_proto_ops;
 	}
+
+	error = dlm_listen_for_all();
 	if (error)
-		goto fail_unlisten;
+		goto fail_listen;
 
 	return 0;
 
-fail_unlisten:
+fail_listen:
+	dlm_proto_ops = NULL;
+fail_proto_ops:
 	dlm_allow_conn = 0;
 	dlm_close_sock(&listen_con.sock);
 	work_stop();
-- 
2.26.3



^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [Cluster-devel] [RFC dlm/next 05/15] fs: dlm: auto load sctp module
  2021-06-23 15:14 [Cluster-devel] [RFC dlm/next 00/15] fs: dlm: performance Alexander Aring
                   ` (3 preceding siblings ...)
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 04/15] fs: dlm: introduce generic listen Alexander Aring
@ 2021-06-23 15:14 ` Alexander Aring
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 06/15] fs: dlm: generic connect func Alexander Aring
                   ` (10 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Alexander Aring @ 2021-06-23 15:14 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch adds a "for now" better handling of missing SCTP support in
the kernel and try to load the sctp module if SCTP is set.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 20 +++++++++++++++-----
 1 file changed, 15 insertions(+), 5 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 783cfd5f63a5..fcc1094f7417 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -1865,11 +1865,9 @@ static int dlm_listen_for_all(void)
 	log_print("Using %s for communications",
 		  dlm_proto_ops->name);
 
-	if (dlm_proto_ops->listen_validate) {
-		result = dlm_proto_ops->listen_validate();
-		if (result < 0)
-			return result;
-	}
+	result = dlm_proto_ops->listen_validate();
+	if (result < 0)
+		return result;
 
 	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
 				  SOCK_STREAM, dlm_proto_ops->proto, &sock);
@@ -1946,6 +1944,17 @@ static const struct dlm_proto_ops dlm_tcp_ops = {
 	.eof_condition = tcp_eof_condition,
 };
 
+static int dlm_sctp_listen_validate(void)
+{
+	if (!IS_ENABLED(CONFIG_IP_SCTP)) {
+		log_print("SCTP is not enabled by this kernel\n");
+		return -EOPNOTSUPP;
+	}
+
+	request_module("sctp");
+	return 0;
+}
+
 static int dlm_sctp_bind_listen(struct socket *sock)
 {
 	return sctp_bind_addrs(sock, dlm_config.ci_tcp_port);
@@ -1961,6 +1970,7 @@ static void dlm_sctp_sockopts(struct socket *sock)
 static const struct dlm_proto_ops dlm_sctp_ops = {
 	.name = "SCTP",
 	.proto = IPPROTO_SCTP,
+	.listen_validate = dlm_sctp_listen_validate,
 	.listen_sockopts = dlm_sctp_sockopts,
 	.listen_bind = dlm_sctp_bind_listen,
 	.connect_action = sctp_connect_to_sock,
-- 
2.26.3



^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [Cluster-devel] [RFC dlm/next 06/15] fs: dlm: generic connect func
  2021-06-23 15:14 [Cluster-devel] [RFC dlm/next 00/15] fs: dlm: performance Alexander Aring
                   ` (4 preceding siblings ...)
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 05/15] fs: dlm: auto load sctp module Alexander Aring
@ 2021-06-23 15:14 ` Alexander Aring
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 07/15] fs: dlm: fix multiple empty writequeue alloc Alexander Aring
                   ` (9 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Alexander Aring @ 2021-06-23 15:14 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch adds a generic connect function for TCP and SCTP. If the
connect functionality differs from each other additional callbacks in
dlm_proto_ops were added. The sockopts callback handling will guarantee
that sockets created by connect() will use the same options as sockets
created by accept().

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 340 ++++++++++++++++++++--------------------------
 1 file changed, 148 insertions(+), 192 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index fcc1094f7417..2919a868d19f 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -143,15 +143,17 @@ struct dlm_node_addr {
 };
 
 struct dlm_proto_ops {
+	bool try_new_addr;
 	const char *name;
 	int proto;
 
+	int (*connect)(struct connection *con, struct socket *sock,
+		       struct sockaddr *addr, int addr_len);
+	void (*sockopts)(struct socket *sock);
+	int (*bind)(struct socket *sock);
 	int (*listen_validate)(void);
 	void (*listen_sockopts)(struct socket *sock);
 	int (*listen_bind)(struct socket *sock);
-
-	/* What to do to connect */
-	void (*connect_action)(struct connection *con);
 	/* What to do to shutdown */
 	void (*shutdown_action)(struct connection *con);
 	/* What to do to eof check */
@@ -186,9 +188,6 @@ static const struct dlm_proto_ops *dlm_proto_ops;
 static void process_recv_sockets(struct work_struct *work);
 static void process_send_sockets(struct work_struct *work);
 
-static void sctp_connect_to_sock(struct connection *con);
-static void tcp_connect_to_sock(struct connection *con);
-
 /* need to held writequeue_lock */
 static struct writequeue_entry *con_next_wq(struct connection *con)
 {
@@ -1152,189 +1151,6 @@ static int sctp_bind_addrs(struct socket *sock, uint16_t port)
 	return result;
 }
 
-/* Initiate an SCTP association.
-   This is a special case of send_to_sock() in that we don't yet have a
-   peeled-off socket for this association, so we use the listening socket
-   and add the primary IP address of the remote node.
- */
-static void sctp_connect_to_sock(struct connection *con)
-{
-	struct sockaddr_storage daddr;
-	int result;
-	int addr_len;
-	struct socket *sock;
-	unsigned int mark;
-
-	mutex_lock(&con->sock_mutex);
-
-	/* Some odd races can cause double-connects, ignore them */
-	if (con->retries++ > MAX_CONNECT_RETRIES)
-		goto out;
-
-	if (con->sock) {
-		log_print("node %d already connected.", con->nodeid);
-		goto out;
-	}
-
-	memset(&daddr, 0, sizeof(daddr));
-	result = nodeid_to_addr(con->nodeid, &daddr, NULL, true, &mark);
-	if (result < 0) {
-		log_print("no address for nodeid %d", con->nodeid);
-		goto out;
-	}
-
-	/* Create a socket to communicate with */
-	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
-				  SOCK_STREAM, IPPROTO_SCTP, &sock);
-	if (result < 0)
-		goto socket_err;
-
-	sock_set_mark(sock->sk, mark);
-
-	add_sock(sock, con);
-
-	/* Bind to all addresses. */
-	if (sctp_bind_addrs(con->sock, 0))
-		goto bind_err;
-
-	make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len);
-
-	log_print_ratelimited("connecting to %d", con->nodeid);
-
-	/* Turn off Nagle's algorithm */
-	sctp_sock_set_nodelay(sock->sk);
-
-	/*
-	 * Make sock->ops->connect() function return in specified time,
-	 * since O_NONBLOCK argument in connect() function does not work here,
-	 * then, we should restore the default value of this attribute.
-	 */
-	sock_set_sndtimeo(sock->sk, 5);
-	result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len,
-				   0);
-	sock_set_sndtimeo(sock->sk, 0);
-
-	if (result == -EINPROGRESS)
-		result = 0;
-	if (result == 0) {
-		if (!test_and_set_bit(CF_CONNECTED, &con->flags))
-			log_print("successful connected to node %d", con->nodeid);
-		goto out;
-	}
-
-bind_err:
-	con->sock = NULL;
-	sock_release(sock);
-
-socket_err:
-	/*
-	 * Some errors are fatal and this list might need adjusting. For other
-	 * errors we try again until the max number of retries is reached.
-	 */
-	if (result != -EHOSTUNREACH &&
-	    result != -ENETUNREACH &&
-	    result != -ENETDOWN &&
-	    result != -EINVAL &&
-	    result != -EPROTONOSUPPORT) {
-		log_print("connect %d try %d error %d", con->nodeid,
-			  con->retries, result);
-		mutex_unlock(&con->sock_mutex);
-		msleep(1000);
-		lowcomms_connect_sock(con);
-		return;
-	}
-
-out:
-	mutex_unlock(&con->sock_mutex);
-}
-
-/* Connect a new socket to its peer */
-static void tcp_connect_to_sock(struct connection *con)
-{
-	struct sockaddr_storage saddr, src_addr;
-	unsigned int mark;
-	int addr_len;
-	struct socket *sock = NULL;
-	int result;
-
-	mutex_lock(&con->sock_mutex);
-	if (con->retries++ > MAX_CONNECT_RETRIES)
-		goto out;
-
-	/* Some odd races can cause double-connects, ignore them */
-	if (con->sock)
-		goto out;
-
-	/* Create a socket to communicate with */
-	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
-				  SOCK_STREAM, IPPROTO_TCP, &sock);
-	if (result < 0)
-		goto out_err;
-
-	memset(&saddr, 0, sizeof(saddr));
-	result = nodeid_to_addr(con->nodeid, &saddr, NULL, false, &mark);
-	if (result < 0) {
-		log_print("no address for nodeid %d", con->nodeid);
-		goto out_err;
-	}
-
-	sock_set_mark(sock->sk, mark);
-
-	add_sock(sock, con);
-
-	/* Bind to our cluster-known address connecting to avoid
-	   routing problems */
-	memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
-	make_sockaddr(&src_addr, 0, &addr_len);
-	result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
-				 addr_len);
-	if (result < 0) {
-		log_print("could not bind for connect: %d", result);
-		/* This *may* not indicate a critical error */
-	}
-
-	make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
-
-	log_print_ratelimited("connecting to %d", con->nodeid);
-
-	/* Turn off Nagle's algorithm */
-	tcp_sock_set_nodelay(sock->sk);
-
-	result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
-				   O_NONBLOCK);
-	if (result == -EINPROGRESS)
-		result = 0;
-	if (result == 0)
-		goto out;
-
-out_err:
-	if (con->sock) {
-		sock_release(con->sock);
-		con->sock = NULL;
-	} else if (sock) {
-		sock_release(sock);
-	}
-	/*
-	 * Some errors are fatal and this list might need adjusting. For other
-	 * errors we try again until the max number of retries is reached.
-	 */
-	if (result != -EHOSTUNREACH &&
-	    result != -ENETUNREACH &&
-	    result != -ENETDOWN && 
-	    result != -EINVAL &&
-	    result != -EPROTONOSUPPORT) {
-		log_print("connect %d try %d error %d", con->nodeid,
-			  con->retries, result);
-		mutex_unlock(&con->sock_mutex);
-		msleep(1000);
-		lowcomms_connect_sock(con);
-		return;
-	}
-out:
-	mutex_unlock(&con->sock_mutex);
-	return;
-}
-
 /* Get local addresses */
 static void init_local(void)
 {
@@ -1688,6 +1504,76 @@ static void process_listen_recv_socket(struct work_struct *work)
 	accept_from_sock(&listen_con);
 }
 
+static int dlm_connect(struct connection *con)
+{
+	struct sockaddr_storage addr;
+	int result, addr_len;
+	struct socket *sock;
+	unsigned int mark;
+
+	/* Some odd races can cause double-connects, ignore them */
+	if (con->retries++ > MAX_CONNECT_RETRIES)
+		return 0;
+
+	if (con->sock) {
+		log_print("node %d already connected.", con->nodeid);
+		return 0;
+	}
+
+	memset(&addr, 0, sizeof(addr));
+	result = nodeid_to_addr(con->nodeid, &addr, NULL,
+				dlm_proto_ops->try_new_addr, &mark);
+	if (result < 0) {
+		log_print("no address for nodeid %d", con->nodeid);
+		return result;
+	}
+
+	/* Create a socket to communicate with */
+	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
+				  SOCK_STREAM, dlm_proto_ops->proto, &sock);
+	if (result < 0)
+		goto socket_err;
+
+	sock_set_mark(sock->sk, mark);
+	dlm_proto_ops->sockopts(sock);
+
+	add_sock(sock, con);
+
+	result = dlm_proto_ops->bind(sock);
+	if (result < 0)
+		goto add_sock_err;
+
+	log_print_ratelimited("connecting to %d", con->nodeid);
+	make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len);
+	result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr,
+					addr_len);
+	if (result < 0)
+		goto add_sock_err;
+
+	return 0;
+
+add_sock_err:
+	dlm_close_sock(&con->sock);
+
+socket_err:
+	/*
+	 * Some errors are fatal and this list might need adjusting. For other
+	 * errors we try again until the max number of retries is reached.
+	 */
+	if (result != -EHOSTUNREACH &&
+	    result != -ENETUNREACH &&
+	    result != -ENETDOWN &&
+	    result != -EINVAL &&
+	    result != -EPROTONOSUPPORT) {
+		log_print("connect %d try %d error %d", con->nodeid,
+			  con->retries, result);
+		msleep(1000);
+		lowcomms_connect_sock(con);
+	}
+
+	return result;
+}
+
 /* Send workqueue function */
 static void process_send_sockets(struct work_struct *work)
 {
@@ -1705,7 +1591,8 @@ static void process_send_sockets(struct work_struct *work)
 	if (con->sock == NULL) { /* not mutex protected so check it inside too */
 		if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags))
 			msleep(1000);
-		dlm_proto_ops->connect_action(con);
+
+		dlm_connect(con);
 	}
 	if (!list_empty(&con->writequeue))
 		send_to_sock(con);
@@ -1900,6 +1787,43 @@ static int dlm_listen_for_all(void)
 	return result;
 }
 
+static int dlm_tcp_bind(struct socket *sock)
+{
+	struct sockaddr_storage src_addr;
+	int result, addr_len;
+
+	/* Bind to our cluster-known address connecting to avoid
+	 * routing problems.
+	 */
+	memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
+	make_sockaddr(&src_addr, 0, &addr_len);
+
+	result = sock->ops->bind(sock, (struct sockaddr *)&src_addr,
+				 addr_len);
+	if (result < 0) {
+		/* This *may* not indicate a critical error */
+		log_print("could not bind for connect: %d", result);
+	}
+
+	return 0;
+}
+
+static int dlm_tcp_connect(struct connection *con, struct socket *sock,
+			   struct sockaddr *addr, int addr_len)
+{
+	int ret;
+
+	ret = sock->ops->connect(sock, addr, addr_len, O_NONBLOCK);
+	switch (ret) {
+	case -EINPROGRESS:
+		fallthrough;
+	case 0:
+		return 0;
+	}
+
+	return ret;
+}
+
 static int dlm_tcp_listen_validate(void)
 {
 	/* We don't support multi-homed hosts */
@@ -1936,14 +1860,43 @@ static int dlm_tcp_listen_bind(struct socket *sock)
 static const struct dlm_proto_ops dlm_tcp_ops = {
 	.name = "TCP",
 	.proto = IPPROTO_TCP,
+	.connect = dlm_tcp_connect,
+	.sockopts = dlm_tcp_sockopts,
+	.bind = dlm_tcp_bind,
 	.listen_validate = dlm_tcp_listen_validate,
 	.listen_sockopts = dlm_tcp_listen_sockopts,
 	.listen_bind = dlm_tcp_listen_bind,
-	.connect_action = tcp_connect_to_sock,
 	.shutdown_action = dlm_tcp_shutdown,
 	.eof_condition = tcp_eof_condition,
 };
 
+static int dlm_sctp_bind(struct socket *sock)
+{
+	return sctp_bind_addrs(sock, 0);
+}
+
+static int dlm_sctp_connect(struct connection *con, struct socket *sock,
+			    struct sockaddr *addr, int addr_len)
+{
+	int ret;
+
+	/*
+	 * Make sock->ops->connect() function return in specified time,
+	 * since O_NONBLOCK argument in connect() function does not work here,
+	 * then, we should restore the default value of this attribute.
+	 */
+	sock_set_sndtimeo(sock->sk, 5);
+	ret = sock->ops->connect(sock, addr, addr_len, 0);
+	sock_set_sndtimeo(sock->sk, 0);
+	if (ret < 0)
+		return ret;
+
+	if (!test_and_set_bit(CF_CONNECTED, &con->flags))
+		log_print("successful connected to node %d", con->nodeid);
+
+	return 0;
+}
+
 static int dlm_sctp_listen_validate(void)
 {
 	if (!IS_ENABLED(CONFIG_IP_SCTP)) {
@@ -1970,10 +1923,13 @@ static void dlm_sctp_sockopts(struct socket *sock)
 static const struct dlm_proto_ops dlm_sctp_ops = {
 	.name = "SCTP",
 	.proto = IPPROTO_SCTP,
+	.try_new_addr = true,
+	.connect = dlm_sctp_connect,
+	.sockopts = dlm_sctp_sockopts,
+	.bind = dlm_sctp_bind,
 	.listen_validate = dlm_sctp_listen_validate,
 	.listen_sockopts = dlm_sctp_sockopts,
 	.listen_bind = dlm_sctp_bind_listen,
-	.connect_action = sctp_connect_to_sock,
 };
 
 int dlm_lowcomms_start(void)
-- 
2.26.3



^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [Cluster-devel] [RFC dlm/next 07/15] fs: dlm: fix multiple empty writequeue alloc
  2021-06-23 15:14 [Cluster-devel] [RFC dlm/next 00/15] fs: dlm: performance Alexander Aring
                   ` (5 preceding siblings ...)
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 06/15] fs: dlm: generic connect func Alexander Aring
@ 2021-06-23 15:14 ` Alexander Aring
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 08/15] fs: dlm: move receive loop into receive handler Alexander Aring
                   ` (8 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Alexander Aring @ 2021-06-23 15:14 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch will add a mutex that a connection can allocate a writequeue
entry buffer only at a sleepable context at one time. If multiple caller
waits at the writequeue spinlock and the spinlock gets release it could
be that multiple new writequeue page buffers were allocated instead of
allocate one writequeue page buffer and other waiters will use remaining
buffer of it. It will only be the case for sleepable context which is
the common case. In non-sleepable contexts like retransmission we just
don't care about such behaviour.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 21 +++++++++++++++++++++
 1 file changed, 21 insertions(+)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 2919a868d19f..e96911c0c18d 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -84,6 +84,7 @@ struct connection {
 	struct list_head writequeue;  /* List of outgoing writequeue_entries */
 	spinlock_t writequeue_lock;
 	atomic_t writequeue_cnt;
+	struct mutex wq_alloc;
 	int retries;
 #define MAX_CONNECT_RETRIES 3
 	struct hlist_node list;
@@ -264,6 +265,8 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
 		return NULL;
 	}
 
+	mutex_init(&con->wq_alloc);
+
 	spin_lock(&connections_lock);
 	/* Because multiple workqueues/threads calls this function it can
 	 * race on multiple cpu's. Instead of locking hot path __find_con()
@@ -1252,19 +1255,37 @@ static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
 {
 	struct writequeue_entry *e;
 	struct dlm_msg *msg;
+	bool sleepable;
 
 	msg = kzalloc(sizeof(*msg), allocation);
 	if (!msg)
 		return NULL;
 
+	/* this mutex is being used as a wait to avoid multiple "fast"
+	 * new writequeue page list entry allocs in new_wq_entry in
+	 * normal operation which is sleepable context. Without it
+	 * we could end in multiple writequeue entries with one
+	 * dlm message because multiple callers were waiting at
+	 * the writequeue_lock in new_wq_entry().
+	 */
+	sleepable = gfpflags_normal_context(allocation);
+	if (sleepable)
+		mutex_lock(&con->wq_alloc);
+
 	kref_init(&msg->ref);
 
 	e = new_wq_entry(con, len, allocation, ppc, cb, mh);
 	if (!e) {
+		if (sleepable)
+			mutex_unlock(&con->wq_alloc);
+
 		kfree(msg);
 		return NULL;
 	}
 
+	if (sleepable)
+		mutex_unlock(&con->wq_alloc);
+
 	msg->ppc = *ppc;
 	msg->len = len;
 	msg->entry = e;
-- 
2.26.3



^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [Cluster-devel] [RFC dlm/next 08/15] fs: dlm: move receive loop into receive handler
  2021-06-23 15:14 [Cluster-devel] [RFC dlm/next 00/15] fs: dlm: performance Alexander Aring
                   ` (6 preceding siblings ...)
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 07/15] fs: dlm: fix multiple empty writequeue alloc Alexander Aring
@ 2021-06-23 15:14 ` Alexander Aring
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 09/15] fs: dlm: introduce io_workqueue Alexander Aring
                   ` (7 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Alexander Aring @ 2021-06-23 15:14 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch moves the kernel_recvmsg() loop call into the
receive_from_sock() function instead of doing the loop outside the
function and abort the loop over it's return value.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 68 +++++++++++++++++++++--------------------------
 1 file changed, 31 insertions(+), 37 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index e96911c0c18d..8571017c3cdc 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -896,7 +896,6 @@ static int con_realloc_receive_buf(struct connection *con, int newlen)
 /* Data received from remote end */
 static int receive_from_sock(struct connection *con)
 {
-	int call_again_soon = 0;
 	struct msghdr msg;
 	struct kvec iov;
 	int ret, buflen;
@@ -916,41 +915,39 @@ static int receive_from_sock(struct connection *con)
 			goto out_resched;
 	}
 
-	/* calculate new buffer parameter regarding last receive and
-	 * possible leftover bytes
-	 */
-	iov.iov_base = con->rx_buf + con->rx_leftover;
-	iov.iov_len = con->rx_buflen - con->rx_leftover;
-
-	memset(&msg, 0, sizeof(msg));
-	msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
-	ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
-			     msg.msg_flags);
-	if (ret <= 0)
-		goto out_close;
-	else if (ret == iov.iov_len)
-		call_again_soon = 1;
-
-	/* new buflen according readed bytes and leftover from last receive */
-	buflen = ret + con->rx_leftover;
-	ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
-	if (ret < 0)
-		goto out_close;
+	for (;;) {
+		/* calculate new buffer parameter regarding last receive and
+		 * possible leftover bytes
+		 */
+		iov.iov_base = con->rx_buf + con->rx_leftover;
+		iov.iov_len = con->rx_buflen - con->rx_leftover;
+
+		memset(&msg, 0, sizeof(msg));
+		msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
+		ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
+				     msg.msg_flags);
+		if (ret == -EAGAIN)
+			break;
+		else if (ret <= 0)
+			goto out_close;
 
-	/* calculate leftover bytes from process and put it into begin of
-	 * the receive buffer, so next receive we have the full message
-	 *@the start address of the receive buffer.
-	 */
-	con->rx_leftover = buflen - ret;
-	if (con->rx_leftover) {
-		memmove(con->rx_buf, con->rx_buf + ret,
-			con->rx_leftover);
-		call_again_soon = true;
+		/* new buflen according readed bytes and leftover from last receive */
+		buflen = ret + con->rx_leftover;
+		ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
+		if (ret < 0)
+			goto out_close;
+
+		/* calculate leftover bytes from process and put it into begin of
+		 * the receive buffer, so next receive we have the full message
+		 *@the start address of the receive buffer.
+		 */
+		con->rx_leftover = buflen - ret;
+		if (con->rx_leftover) {
+			memmove(con->rx_buf, con->rx_buf + ret,
+				con->rx_leftover);
+		}
 	}
 
-	if (call_again_soon)
-		goto out_resched;
-
 	mutex_unlock(&con->sock_mutex);
 	return 0;
 
@@ -1512,12 +1509,9 @@ int dlm_lowcomms_close(int nodeid)
 static void process_recv_sockets(struct work_struct *work)
 {
 	struct connection *con = container_of(work, struct connection, rwork);
-	int err;
 
 	clear_bit(CF_READ_PENDING, &con->flags);
-	do {
-		err = receive_from_sock(con);
-	} while (!err);
+	receive_from_sock(con);
 }
 
 static void process_listen_recv_socket(struct work_struct *work)
-- 
2.26.3



^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [Cluster-devel] [RFC dlm/next 09/15] fs: dlm: introduce io_workqueue
  2021-06-23 15:14 [Cluster-devel] [RFC dlm/next 00/15] fs: dlm: performance Alexander Aring
                   ` (7 preceding siblings ...)
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 08/15] fs: dlm: move receive loop into receive handler Alexander Aring
@ 2021-06-23 15:14 ` Alexander Aring
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 10/15] fs: dlm: introduce reconnect work Alexander Aring
                   ` (6 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Alexander Aring @ 2021-06-23 15:14 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch removes the send_workqueue and recv_workqueue. Instead we
using one workqueue io_workqueue which is not ordered and it's work is
protected by either rwork_lock or swork_lock per connection. The per
connection lock allows us to handle multiple connection at once which
is not possible with an ordered workqueue. To provide send and receive
each operation has it's own lock. If the sock get closed or assigned,
means we clear or set con->sock, both locks need to be held. For this
case helpers are introduced to hold the "con" lock.

This patch also removed a lot of the PENDING flags and doing some flush
operation in stop_conn() with it. The commit 489d8e559c65 ("fs: dlm: add
reliable connection if reconnect") fixed some issues with connection
termination, maybe this functionality was introduce try to fixing.
However now the midcomms layer will take care about that no send/recv
should happen at a proper termination.

There exists also a lot of confusion about the othercon paradigm which
we only have when we hit a connection race. If we hit the race we have
two connection wheras the second (named as othercon) is only for
receiving. This will end in a lot of confusion, I tried to use the locks
and other resources inside the first connection "sendcon" only to reduce
the amount to confusion.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 364 +++++++++++++++++++++-------------------------
 1 file changed, 164 insertions(+), 200 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 8571017c3cdc..d2febefe1d0d 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -67,20 +67,16 @@
 struct connection {
 	struct socket *sock;	/* NULL if not connected */
 	uint32_t nodeid;	/* So we know who we are in the list */
-	struct mutex sock_mutex;
 	unsigned long flags;
-#define CF_READ_PENDING 1
-#define CF_WRITE_PENDING 2
-#define CF_INIT_PENDING 4
-#define CF_IS_OTHERCON 5
-#define CF_CLOSE 6
-#define CF_APP_LIMITED 7
-#define CF_CLOSING 8
-#define CF_SHUTDOWN 9
-#define CF_CONNECTED 10
-#define CF_RECONNECT 11
-#define CF_DELAY_CONNECT 12
-#define CF_EOF 13
+#define CF_IS_OTHERCON 1
+#define CF_CLOSE 2
+#define CF_APP_LIMITED 3
+#define CF_SHUTDOWN 4
+#define CF_CONNECTED 5
+#define CF_RECONNECT 6
+#define CF_DELAY_CONNECT 7
+#define CF_EOF 8
+#define CF_STOP 9
 	struct list_head writequeue;  /* List of outgoing writequeue_entries */
 	spinlock_t writequeue_lock;
 	atomic_t writequeue_cnt;
@@ -90,8 +86,10 @@ struct connection {
 	struct hlist_node list;
 	struct connection *othercon;
 	struct connection *sendcon;
-	struct work_struct rwork; /* Receive workqueue */
-	struct work_struct swork; /* Send workqueue */
+	struct mutex rwork_lock;
+	struct work_struct rwork;
+	struct mutex swork_lock;
+	struct work_struct swork;
 	wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
 	unsigned char *rx_buf;
 	int rx_buflen;
@@ -101,6 +99,7 @@ struct connection {
 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
 
 struct listen_connection {
+	struct mutex lock;
 	struct socket *sock;
 	struct work_struct rwork;
 };
@@ -177,8 +176,7 @@ static int dlm_local_count;
 int dlm_allow_conn;
 
 /* Work queues */
-static struct workqueue_struct *recv_workqueue;
-static struct workqueue_struct *send_workqueue;
+static struct workqueue_struct *io_workqueue;
 
 static struct hlist_head connection_hash[CONN_HASH_SIZE];
 static DEFINE_SPINLOCK(connections_lock);
@@ -189,6 +187,35 @@ static const struct dlm_proto_ops *dlm_proto_ops;
 static void process_recv_sockets(struct work_struct *work);
 static void process_send_sockets(struct work_struct *work);
 
+static inline void dlm_con_lock(struct connection *con)
+{
+	mutex_lock(&con->swork_lock);
+	mutex_lock(&con->rwork_lock);
+}
+
+static inline void dlm_con_unlock(struct connection *con)
+{
+	mutex_unlock(&con->rwork_lock);
+	mutex_unlock(&con->swork_lock);
+}
+
+static inline void dlm_io_queue(struct connection *con,
+				struct work_struct *work)
+{
+	if (test_bit(CF_STOP, &con->flags))
+		return;
+
+	queue_work(io_workqueue, work);
+}
+
+static inline struct connection *dlm_sendcon(struct connection *con)
+{
+	if (test_bit(CF_IS_OTHERCON, &con->flags))
+		return con->sendcon;
+
+	return con;
+}
+
 /* need to held writequeue_lock */
 static struct writequeue_entry *con_next_wq(struct connection *con)
 {
@@ -230,7 +257,6 @@ static int dlm_con_init(struct connection *con, int nodeid)
 		return -ENOMEM;
 
 	con->nodeid = nodeid;
-	mutex_init(&con->sock_mutex);
 	INIT_LIST_HEAD(&con->writequeue);
 	spin_lock_init(&con->writequeue_lock);
 	atomic_set(&con->writequeue_cnt, 0);
@@ -265,6 +291,8 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
 		return NULL;
 	}
 
+	mutex_init(&con->rwork_lock);
+	mutex_init(&con->swork_lock);
 	mutex_init(&con->wq_alloc);
 
 	spin_lock(&connections_lock);
@@ -488,8 +516,8 @@ static void lowcomms_data_ready(struct sock *sk)
 
 	read_lock_bh(&sk->sk_callback_lock);
 	con = sock2con(sk);
-	if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
-		queue_work(recv_workqueue, &con->rwork);
+	if (con)
+		dlm_io_queue(dlm_sendcon(con), &con->rwork);
 	read_unlock_bh(&sk->sk_callback_lock);
 }
 
@@ -498,7 +526,7 @@ static void lowcomms_listen_data_ready(struct sock *sk)
 	if (!dlm_allow_conn)
 		return;
 
-	queue_work(recv_workqueue, &listen_con.rwork);
+	queue_work(io_workqueue, &listen_con.rwork);
 }
 
 static void lowcomms_write_space(struct sock *sk)
@@ -506,13 +534,12 @@ static void lowcomms_write_space(struct sock *sk)
 	struct connection *con;
 
 	read_lock_bh(&sk->sk_callback_lock);
-	con = sock2con(sk);
+	con = dlm_sendcon(sock2con(sk));
 	if (!con)
 		goto out;
 
 	if (!test_and_set_bit(CF_CONNECTED, &con->flags)) {
 		log_print("successful connected to node %d", con->nodeid);
-		queue_work(send_workqueue, &con->swork);
 		goto out;
 	}
 
@@ -523,8 +550,8 @@ static void lowcomms_write_space(struct sock *sk)
 		clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
 	}
 
-	queue_work(send_workqueue, &con->swork);
 out:
+	dlm_io_queue(con, &con->swork);
 	read_unlock_bh(&sk->sk_callback_lock);
 }
 
@@ -532,7 +559,8 @@ static inline void lowcomms_connect_sock(struct connection *con)
 {
 	if (test_bit(CF_CLOSE, &con->flags))
 		return;
-	queue_work(send_workqueue, &con->swork);
+
+	dlm_io_queue(con, &con->swork);
 	cond_resched();
 }
 
@@ -644,7 +672,7 @@ static void lowcomms_error_report(struct sock *sk)
 	}
 
 	if (!test_and_set_bit(CF_RECONNECT, &con->flags))
-		queue_work(send_workqueue, &con->swork);
+		dlm_io_queue(con, &con->swork);
 
 out:
 	read_unlock_bh(&sk->sk_callback_lock);
@@ -774,27 +802,15 @@ static void dlm_close_sock(struct socket **sock)
 }
 
 /* Close a remote connection and tidy up */
-static void close_connection(struct connection *con, bool and_other,
-			     bool tx, bool rx)
+static void close_connection(struct connection *con, bool and_other)
 {
-	bool closing = test_and_set_bit(CF_CLOSING, &con->flags);
 	struct writequeue_entry *e;
 
-	if (tx && !closing && cancel_work_sync(&con->swork)) {
-		log_print("canceled swork for node %d", con->nodeid);
-		clear_bit(CF_WRITE_PENDING, &con->flags);
-	}
-	if (rx && !closing && cancel_work_sync(&con->rwork)) {
-		log_print("canceled rwork for node %d", con->nodeid);
-		clear_bit(CF_READ_PENDING, &con->flags);
-	}
-
-	mutex_lock(&con->sock_mutex);
 	dlm_close_sock(&con->sock);
 
 	if (con->othercon && and_other) {
 		/* Will only re-enter once. */
-		close_connection(con->othercon, false, tx, rx);
+		close_connection(con->othercon, false);
 	}
 
 	/* if we send a writequeue entry only a half way, we drop the
@@ -824,26 +840,44 @@ static void close_connection(struct connection *con, bool and_other,
 	clear_bit(CF_DELAY_CONNECT, &con->flags);
 	clear_bit(CF_RECONNECT, &con->flags);
 	clear_bit(CF_EOF, &con->flags);
-	mutex_unlock(&con->sock_mutex);
-	clear_bit(CF_CLOSING, &con->flags);
+
+	/* handling for tcp shutdown */
+	clear_bit(CF_SHUTDOWN, &con->flags);
+	wake_up(&con->shutdown_wait);
 }
 
-static void shutdown_connection(struct connection *con)
+static void cancel_io_work(struct connection *con, bool and_other)
 {
-	int ret;
+	struct connection *sendcon = dlm_sendcon(con);
 
-	flush_work(&con->swork);
+	set_bit(CF_STOP, &sendcon->flags);
+	cancel_work_sync(&sendcon->swork);
+	cancel_work_sync(&sendcon->rwork);
+	if (sendcon->othercon && and_other)
+		cancel_work_sync(&sendcon->othercon->rwork);
+
+	dlm_con_lock(sendcon);
+	close_connection(con, and_other);
+	dlm_con_unlock(sendcon);
+
+	clear_bit(CF_STOP, &sendcon->flags);
+}
+
+static void shutdown_connection(struct connection *con,
+				struct connection *sendcon)
+{
+	int ret;
 
-	mutex_lock(&con->sock_mutex);
+	mutex_lock(&sendcon->swork_lock);
 	/* nothing to shutdown */
 	if (!con->sock) {
-		mutex_unlock(&con->sock_mutex);
+		mutex_unlock(&sendcon->swork_lock);
 		return;
 	}
 
 	set_bit(CF_SHUTDOWN, &con->flags);
 	ret = kernel_sock_shutdown(con->sock, SHUT_WR);
-	mutex_unlock(&con->sock_mutex);
+	mutex_unlock(&sendcon->swork_lock);
 	if (ret) {
 		log_print("Connection %p failed to shutdown: %d will force close",
 			  con, ret);
@@ -863,14 +897,22 @@ static void shutdown_connection(struct connection *con)
 
 force_close:
 	clear_bit(CF_SHUTDOWN, &con->flags);
-	close_connection(con, false, true, true);
+	cancel_io_work(con, false);
 }
 
 static void dlm_tcp_shutdown(struct connection *con)
 {
-	if (con->othercon)
-		shutdown_connection(con->othercon);
-	shutdown_connection(con);
+	/* flush pending processes which might trigger send */
+	flush_work(&con->rwork);
+
+	if (con->othercon) {
+		flush_work(&con->othercon->rwork);
+		shutdown_connection(con->othercon, con);
+	}
+
+	/* flush all send */
+	flush_work(&con->swork);
+	shutdown_connection(con, con);
 }
 
 static int con_realloc_receive_buf(struct connection *con, int newlen)
@@ -894,17 +936,17 @@ static int con_realloc_receive_buf(struct connection *con, int newlen)
 }
 
 /* Data received from remote end */
-static int receive_from_sock(struct connection *con)
+static void receive_from_sock(struct connection *con,
+			      struct connection *sendcon)
 {
 	struct msghdr msg;
 	struct kvec iov;
 	int ret, buflen;
 
-	mutex_lock(&con->sock_mutex);
-
+	mutex_lock(&sendcon->rwork_lock);
 	if (con->sock == NULL) {
-		ret = -EAGAIN;
-		goto out_close;
+		mutex_unlock(&sendcon->rwork_lock);
+		return;
 	}
 
 	/* realloc if we get new buffer size to read out */
@@ -926,16 +968,20 @@ static int receive_from_sock(struct connection *con)
 		msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
 		ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
 				     msg.msg_flags);
-		if (ret == -EAGAIN)
+		if (ret == 0) {
+			mutex_unlock(&sendcon->rwork_lock);
+			goto out_eof;
+		} else if (ret < 0) {
 			break;
-		else if (ret <= 0)
-			goto out_close;
+		}
 
 		/* new buflen according readed bytes and leftover from last receive */
 		buflen = ret + con->rx_leftover;
 		ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
-		if (ret < 0)
+		if (ret < 0) {
+			mutex_unlock(&sendcon->rwork_lock);
 			goto out_close;
+		}
 
 		/* calculate leftover bytes from process and put it into begin of
 		 * the receive buffer, so next receive we have the full message
@@ -947,40 +993,33 @@ static int receive_from_sock(struct connection *con)
 				con->rx_leftover);
 		}
 	}
+	mutex_unlock(&sendcon->rwork_lock);
 
-	mutex_unlock(&con->sock_mutex);
-	return 0;
+	return;
 
 out_resched:
-	if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
-		queue_work(recv_workqueue, &con->rwork);
-	mutex_unlock(&con->sock_mutex);
-	return -EAGAIN;
+	dlm_io_queue(sendcon, &con->rwork);
+	return;
 
 out_close:
-	if (ret == 0) {
-		log_print("connection %p got EOF from %d",
-			  con, con->nodeid);
-
-		if (dlm_proto_ops->eof_condition &&
-		    dlm_proto_ops->eof_condition(con)) {
-			set_bit(CF_EOF, &con->flags);
-			mutex_unlock(&con->sock_mutex);
-		} else {
-			mutex_unlock(&con->sock_mutex);
-			close_connection(con, false, true, false);
+	if (!test_and_set_bit(CF_RECONNECT, &sendcon->flags))
+		dlm_io_queue(sendcon, &sendcon->swork);
 
-			/* handling for tcp shutdown */
-			clear_bit(CF_SHUTDOWN, &con->flags);
-			wake_up(&con->shutdown_wait);
-		}
+	return;
 
-		/* signal to breaking receive worker */
-		ret = -1;
-	} else {
-		mutex_unlock(&con->sock_mutex);
+out_eof:
+	log_print("connection %p got EOF from %d",
+		  con, con->nodeid);
+
+	if (dlm_proto_ops->eof_condition &&
+	    dlm_proto_ops->eof_condition(con)) {
+		set_bit(CF_EOF, &con->flags);
+		return;
 	}
-	return ret;
+
+	dlm_con_lock(sendcon);
+	close_connection(con, false);
+	dlm_con_unlock(sendcon);
 }
 
 /* Listening socket is busy, accept a connection */
@@ -1038,7 +1077,7 @@ static int accept_from_sock(struct listen_connection *con)
 
 	sock_set_mark(newsock->sk, mark);
 
-	mutex_lock(&newcon->sock_mutex);
+	dlm_con_lock(newcon);
 	if (newcon->sock) {
 		struct connection *othercon = newcon->othercon;
 
@@ -1046,7 +1085,7 @@ static int accept_from_sock(struct listen_connection *con)
 			othercon = kzalloc(sizeof(*othercon), GFP_NOFS);
 			if (!othercon) {
 				log_print("failed to allocate incoming socket");
-				mutex_unlock(&newcon->sock_mutex);
+				dlm_con_unlock(newcon);
 				srcu_read_unlock(&connections_srcu, idx);
 				result = -ENOMEM;
 				goto accept_err;
@@ -1055,24 +1094,21 @@ static int accept_from_sock(struct listen_connection *con)
 			result = dlm_con_init(othercon, nodeid);
 			if (result < 0) {
 				kfree(othercon);
-				mutex_unlock(&newcon->sock_mutex);
+				dlm_con_unlock(newcon);
 				srcu_read_unlock(&connections_srcu, idx);
 				goto accept_err;
 			}
 
-			lockdep_set_subclass(&othercon->sock_mutex, 1);
 			set_bit(CF_IS_OTHERCON, &othercon->flags);
 			newcon->othercon = othercon;
 			othercon->sendcon = newcon;
 		} else {
 			/* close other sock con if we have something new */
-			close_connection(othercon, false, true, false);
+			close_connection(othercon, false);
 		}
 
-		mutex_lock(&othercon->sock_mutex);
 		add_sock(newsock, othercon);
 		addcon = othercon;
-		mutex_unlock(&othercon->sock_mutex);
 	}
 	else {
 		/* accept copies the sk after we've saved the callbacks, so we
@@ -1083,15 +1119,14 @@ static int accept_from_sock(struct listen_connection *con)
 	}
 
 	set_bit(CF_CONNECTED, &addcon->flags);
-	mutex_unlock(&newcon->sock_mutex);
+	dlm_con_unlock(newcon);
 
 	/*
 	 * Add it to the active queue in case we got data
 	 * between processing the accept adding the socket
 	 * to the read_sockets list
 	 */
-	if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
-		queue_work(recv_workqueue, &addcon->rwork);
+	dlm_io_queue(newcon, &addcon->rwork);
 
 	srcu_read_unlock(&connections_srcu, idx);
 
@@ -1341,7 +1376,7 @@ static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
 	e->len = DLM_WQ_LENGTH_BYTES(e);
 	spin_unlock(&con->writequeue_lock);
 
-	queue_work(send_workqueue, &con->swork);
+	dlm_io_queue(con, &con->swork);
 	return;
 
 out:
@@ -1393,9 +1428,11 @@ static void send_to_sock(struct connection *con)
 	int len, offset, ret;
 	int count = 0;
 
-	mutex_lock(&con->sock_mutex);
-	if (con->sock == NULL)
-		goto out_connect;
+	mutex_lock(&con->swork_lock);
+	if (con->sock == NULL) {
+		dlm_io_queue(con, &con->swork);
+		goto out;
+	}
 
 	spin_lock(&con->writequeue_lock);
 	for (;;) {
@@ -1436,29 +1473,19 @@ static void send_to_sock(struct connection *con)
 		writequeue_entry_complete(e, ret);
 	}
 	spin_unlock(&con->writequeue_lock);
+	mutex_unlock(&con->swork_lock);
 
 	/* close if we got EOF */
 	if (test_and_clear_bit(CF_EOF, &con->flags)) {
-		mutex_unlock(&con->sock_mutex);
-		close_connection(con, false, false, true);
-
-		/* handling for tcp shutdown */
-		clear_bit(CF_SHUTDOWN, &con->flags);
-		wake_up(&con->shutdown_wait);
-	} else {
-		mutex_unlock(&con->sock_mutex);
+		dlm_con_lock(con);
+		close_connection(con, false);
+		dlm_con_unlock(con);
 	}
 
 	return;
 
 out:
-	mutex_unlock(&con->sock_mutex);
-	return;
-
-out_connect:
-	mutex_unlock(&con->sock_mutex);
-	queue_work(send_workqueue, &con->swork);
-	cond_resched();
+	mutex_unlock(&con->swork_lock);
 }
 
 static void clean_one_writequeue(struct connection *con)
@@ -1485,7 +1512,7 @@ int dlm_lowcomms_close(int nodeid)
 	con = nodeid2con(nodeid, 0);
 	if (con) {
 		set_bit(CF_CLOSE, &con->flags);
-		close_connection(con, true, true, true);
+		cancel_io_work(con, true);
 		clean_one_writequeue(con);
 		if (con->othercon)
 			clean_one_writequeue(con->othercon);
@@ -1509,14 +1536,16 @@ int dlm_lowcomms_close(int nodeid)
 static void process_recv_sockets(struct work_struct *work)
 {
 	struct connection *con = container_of(work, struct connection, rwork);
+	struct connection *sendcon = dlm_sendcon(con);
 
-	clear_bit(CF_READ_PENDING, &con->flags);
-	receive_from_sock(con);
+	receive_from_sock(con, sendcon);
 }
 
 static void process_listen_recv_socket(struct work_struct *work)
 {
+	mutex_lock(&listen_con.lock);
 	accept_from_sock(&listen_con);
+	mutex_unlock(&listen_con.lock);
 }
 
 static int dlm_connect(struct connection *con)
@@ -1596,49 +1625,36 @@ static void process_send_sockets(struct work_struct *work)
 
 	WARN_ON(test_bit(CF_IS_OTHERCON, &con->flags));
 
-	clear_bit(CF_WRITE_PENDING, &con->flags);
-
 	if (test_and_clear_bit(CF_RECONNECT, &con->flags)) {
-		close_connection(con, false, false, true);
+		dlm_con_lock(con);
+		close_connection(con, false);
 		dlm_midcomms_unack_msg_resend(con->nodeid);
+		dlm_con_unlock(con);
 	}
 
 	if (con->sock == NULL) { /* not mutex protected so check it inside too */
 		if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags))
 			msleep(1000);
 
+		dlm_con_lock(con);
 		dlm_connect(con);
+		dlm_con_unlock(con);
 	}
-	if (!list_empty(&con->writequeue))
-		send_to_sock(con);
+
+	send_to_sock(con);
 }
 
 static void work_stop(void)
 {
-	if (recv_workqueue) {
-		destroy_workqueue(recv_workqueue);
-		recv_workqueue = NULL;
-	}
-
-	if (send_workqueue) {
-		destroy_workqueue(send_workqueue);
-		send_workqueue = NULL;
-	}
+	destroy_workqueue(io_workqueue);
 }
 
 static int work_start(void)
 {
-	recv_workqueue = alloc_ordered_workqueue("dlm_recv", WQ_MEM_RECLAIM);
-	if (!recv_workqueue) {
-		log_print("can't start dlm_recv");
-		return -ENOMEM;
-	}
-
-	send_workqueue = alloc_ordered_workqueue("dlm_send", WQ_MEM_RECLAIM);
-	if (!send_workqueue) {
-		log_print("can't start dlm_send");
-		destroy_workqueue(recv_workqueue);
-		recv_workqueue = NULL;
+	io_workqueue = alloc_workqueue("dlm_io",
+				       WQ_UNBOUND | WQ_MEM_RECLAIM, 0);
+	if (!io_workqueue) {
+		log_print("can't start dlm_io");
 		return -ENOMEM;
 	}
 
@@ -1660,10 +1676,7 @@ void dlm_lowcomms_shutdown(void)
 	 */
 	dlm_allow_conn = 0;
 
-	if (recv_workqueue)
-		flush_workqueue(recv_workqueue);
-	if (send_workqueue)
-		flush_workqueue(send_workqueue);
+	flush_workqueue(io_workqueue);
 
 	dlm_close_sock(&listen_con.sock);
 
@@ -1672,27 +1685,6 @@ void dlm_lowcomms_shutdown(void)
 	srcu_read_unlock(&connections_srcu, idx);
 }
 
-static void _stop_conn(struct connection *con, bool and_other)
-{
-	mutex_lock(&con->sock_mutex);
-	set_bit(CF_CLOSE, &con->flags);
-	set_bit(CF_READ_PENDING, &con->flags);
-	set_bit(CF_WRITE_PENDING, &con->flags);
-	if (con->sock && con->sock->sk) {
-		write_lock_bh(&con->sock->sk->sk_callback_lock);
-		con->sock->sk->sk_user_data = NULL;
-		write_unlock_bh(&con->sock->sk->sk_callback_lock);
-	}
-	if (con->othercon && and_other)
-		_stop_conn(con->othercon, false);
-	mutex_unlock(&con->sock_mutex);
-}
-
-static void stop_conn(struct connection *con)
-{
-	_stop_conn(con, true);
-}
-
 static void connection_release(struct rcu_head *rcu)
 {
 	struct connection *con = container_of(rcu, struct connection, rcu);
@@ -1703,7 +1695,8 @@ static void connection_release(struct rcu_head *rcu)
 
 static void free_conn(struct connection *con)
 {
-	close_connection(con, true, true, true);
+	cancel_io_work(con, true);
+
 	spin_lock(&connections_lock);
 	hlist_del_rcu(&con->list);
 	spin_unlock(&connections_lock);
@@ -1716,41 +1709,11 @@ static void free_conn(struct connection *con)
 	call_srcu(&connections_srcu, &con->rcu, connection_release);
 }
 
-static void work_flush(void)
-{
-	int ok;
-	int i;
-	struct connection *con;
-
-	do {
-		ok = 1;
-		foreach_conn(stop_conn);
-		if (recv_workqueue)
-			flush_workqueue(recv_workqueue);
-		if (send_workqueue)
-			flush_workqueue(send_workqueue);
-		for (i = 0; i < CONN_HASH_SIZE && ok; i++) {
-			hlist_for_each_entry_rcu(con, &connection_hash[i],
-						 list) {
-				ok &= test_bit(CF_READ_PENDING, &con->flags);
-				ok &= test_bit(CF_WRITE_PENDING, &con->flags);
-				if (con->othercon) {
-					ok &= test_bit(CF_READ_PENDING,
-						       &con->othercon->flags);
-					ok &= test_bit(CF_WRITE_PENDING,
-						       &con->othercon->flags);
-				}
-			}
-		}
-	} while (!ok);
-}
-
 void dlm_lowcomms_stop(void)
 {
 	int idx;
 
 	idx = srcu_read_lock(&connections_srcu);
-	work_flush();
 	foreach_conn(free_conn);
 	srcu_read_unlock(&connections_srcu, idx);
 	work_stop();
@@ -1962,6 +1925,7 @@ int dlm_lowcomms_start(void)
 		goto fail;
 	}
 
+	mutex_init(&listen_con.lock);
 	INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
 
 	error = work_start();
-- 
2.26.3



^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [Cluster-devel] [RFC dlm/next 10/15] fs: dlm: introduce reconnect work
  2021-06-23 15:14 [Cluster-devel] [RFC dlm/next 00/15] fs: dlm: performance Alexander Aring
                   ` (8 preceding siblings ...)
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 09/15] fs: dlm: introduce io_workqueue Alexander Aring
@ 2021-06-23 15:14 ` Alexander Aring
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 11/15] fs: dlm: introduce process workqueue Alexander Aring
                   ` (5 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Alexander Aring @ 2021-06-23 15:14 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch will add another work to close the sockets which we cannot do
inside the lowcomms_error_report() handler. This patch will also close
the "othercon" sock if present.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 163 +++++++++++++++++++++++++---------------------
 1 file changed, 88 insertions(+), 75 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index d2febefe1d0d..a54ed3cf0b45 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -73,10 +73,8 @@ struct connection {
 #define CF_APP_LIMITED 3
 #define CF_SHUTDOWN 4
 #define CF_CONNECTED 5
-#define CF_RECONNECT 6
-#define CF_DELAY_CONNECT 7
-#define CF_EOF 8
-#define CF_STOP 9
+#define CF_EOF 6
+#define CF_STOP 7
 	struct list_head writequeue;  /* List of outgoing writequeue_entries */
 	spinlock_t writequeue_lock;
 	atomic_t writequeue_cnt;
@@ -89,7 +87,9 @@ struct connection {
 	struct mutex rwork_lock;
 	struct work_struct rwork;
 	struct mutex swork_lock;
-	struct work_struct swork;
+	struct delayed_work swork;
+	struct work_struct cwork;
+	int sk_err;
 	wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
 	unsigned char *rx_buf;
 	int rx_buflen;
@@ -184,6 +184,7 @@ DEFINE_STATIC_SRCU(connections_srcu);
 
 static const struct dlm_proto_ops *dlm_proto_ops;
 
+static void process_close_sockets(struct work_struct *work);
 static void process_recv_sockets(struct work_struct *work);
 static void process_send_sockets(struct work_struct *work);
 
@@ -208,6 +209,16 @@ static inline void dlm_io_queue(struct connection *con,
 	queue_work(io_workqueue, work);
 }
 
+static inline void dlm_io_delayed_queue(struct connection *con,
+					struct delayed_work *dwork,
+					unsigned long delay)
+{
+	if (test_bit(CF_STOP, &con->flags))
+		return;
+
+	queue_delayed_work(io_workqueue, dwork, delay);
+}
+
 static inline struct connection *dlm_sendcon(struct connection *con)
 {
 	if (test_bit(CF_IS_OTHERCON, &con->flags))
@@ -260,8 +271,9 @@ static int dlm_con_init(struct connection *con, int nodeid)
 	INIT_LIST_HEAD(&con->writequeue);
 	spin_lock_init(&con->writequeue_lock);
 	atomic_set(&con->writequeue_cnt, 0);
-	INIT_WORK(&con->swork, process_send_sockets);
+	INIT_DELAYED_WORK(&con->swork, process_send_sockets);
 	INIT_WORK(&con->rwork, process_recv_sockets);
+	INIT_WORK(&con->cwork, process_close_sockets);
 	init_waitqueue_head(&con->shutdown_wait);
 
 	return 0;
@@ -551,19 +563,10 @@ static void lowcomms_write_space(struct sock *sk)
 	}
 
 out:
-	dlm_io_queue(con, &con->swork);
+	dlm_io_delayed_queue(con, &con->swork, 0);
 	read_unlock_bh(&sk->sk_callback_lock);
 }
 
-static inline void lowcomms_connect_sock(struct connection *con)
-{
-	if (test_bit(CF_CLOSE, &con->flags))
-		return;
-
-	dlm_io_queue(con, &con->swork);
-	cond_resched();
-}
-
 static void lowcomms_state_change(struct sock *sk)
 {
 	/* SCTP layer is not calling sk_data_ready when the connection
@@ -579,27 +582,6 @@ static void lowcomms_state_change(struct sock *sk)
 	}
 }
 
-int dlm_lowcomms_connect_node(int nodeid)
-{
-	struct connection *con;
-	int idx;
-
-	if (nodeid == dlm_our_nodeid())
-		return 0;
-
-	idx = srcu_read_lock(&connections_srcu);
-	con = nodeid2con(nodeid, GFP_NOFS);
-	if (!con) {
-		srcu_read_unlock(&connections_srcu, idx);
-		return -ENOMEM;
-	}
-
-	lowcomms_connect_sock(con);
-	srcu_read_unlock(&connections_srcu, idx);
-
-	return 0;
-}
-
 int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark)
 {
 	struct dlm_node_addr *na;
@@ -659,20 +641,8 @@ static void lowcomms_error_report(struct sock *sk)
 				   sk->sk_err_soft);
 	}
 
-	/* below sendcon only handling */
-	if (test_bit(CF_IS_OTHERCON, &con->flags))
-		con = con->sendcon;
-
-	switch (sk->sk_err) {
-	case ECONNREFUSED:
-		set_bit(CF_DELAY_CONNECT, &con->flags);
-		break;
-	default:
-		break;
-	}
-
-	if (!test_and_set_bit(CF_RECONNECT, &con->flags))
-		dlm_io_queue(con, &con->swork);
+	con->sk_err = sk->sk_err;
+	dlm_io_queue(dlm_sendcon(con), &con->cwork);
 
 out:
 	read_unlock_bh(&sk->sk_callback_lock);
@@ -837,8 +807,6 @@ static void close_connection(struct connection *con, bool and_other)
 	con->retries = 0;
 	clear_bit(CF_APP_LIMITED, &con->flags);
 	clear_bit(CF_CONNECTED, &con->flags);
-	clear_bit(CF_DELAY_CONNECT, &con->flags);
-	clear_bit(CF_RECONNECT, &con->flags);
 	clear_bit(CF_EOF, &con->flags);
 
 	/* handling for tcp shutdown */
@@ -851,10 +819,13 @@ static void cancel_io_work(struct connection *con, bool and_other)
 	struct connection *sendcon = dlm_sendcon(con);
 
 	set_bit(CF_STOP, &sendcon->flags);
-	cancel_work_sync(&sendcon->swork);
 	cancel_work_sync(&sendcon->rwork);
-	if (sendcon->othercon && and_other)
+	cancel_work_sync(&sendcon->cwork);
+	cancel_delayed_work_sync(&sendcon->swork);
+	if (sendcon->othercon && and_other) {
 		cancel_work_sync(&sendcon->othercon->rwork);
+		cancel_work_sync(&sendcon->othercon->cwork);
+	}
 
 	dlm_con_lock(sendcon);
 	close_connection(con, and_other);
@@ -911,7 +882,7 @@ static void dlm_tcp_shutdown(struct connection *con)
 	}
 
 	/* flush all send */
-	flush_work(&con->swork);
+	flush_delayed_work(&con->swork);
 	shutdown_connection(con, con);
 }
 
@@ -1002,9 +973,10 @@ static void receive_from_sock(struct connection *con,
 	return;
 
 out_close:
-	if (!test_and_set_bit(CF_RECONNECT, &sendcon->flags))
-		dlm_io_queue(sendcon, &sendcon->swork);
-
+	dlm_con_lock(sendcon);
+	close_connection(con, false);
+	dlm_con_unlock(sendcon);
+	dlm_io_delayed_queue(sendcon, &sendcon->swork, 0);
 	return;
 
 out_eof:
@@ -1376,7 +1348,7 @@ static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
 	e->len = DLM_WQ_LENGTH_BYTES(e);
 	spin_unlock(&con->writequeue_lock);
 
-	dlm_io_queue(con, &con->swork);
+	dlm_io_delayed_queue(con, &con->swork, 0);
 	return;
 
 out:
@@ -1430,7 +1402,7 @@ static void send_to_sock(struct connection *con)
 
 	mutex_lock(&con->swork_lock);
 	if (con->sock == NULL) {
-		dlm_io_queue(con, &con->swork);
+		dlm_io_delayed_queue(con, &con->swork, 0);
 		goto out;
 	}
 
@@ -1611,33 +1583,74 @@ static int dlm_connect(struct connection *con)
 	    result != -EPROTONOSUPPORT) {
 		log_print("connect %d try %d error %d", con->nodeid,
 			  con->retries, result);
-		msleep(1000);
-		lowcomms_connect_sock(con);
+		dlm_io_delayed_queue(con, &con->swork,
+				     msecs_to_jiffies(1000));
 	}
 
 	return result;
 }
 
-/* Send workqueue function */
-static void process_send_sockets(struct work_struct *work)
+int dlm_lowcomms_connect_node(int nodeid)
 {
-	struct connection *con = container_of(work, struct connection, swork);
+	struct connection *con;
+	int idx;
 
-	WARN_ON(test_bit(CF_IS_OTHERCON, &con->flags));
+	if (nodeid == dlm_our_nodeid())
+		return 0;
 
-	if (test_and_clear_bit(CF_RECONNECT, &con->flags)) {
-		dlm_con_lock(con);
-		close_connection(con, false);
-		dlm_midcomms_unack_msg_resend(con->nodeid);
-		dlm_con_unlock(con);
+	idx = srcu_read_lock(&connections_srcu);
+	con = nodeid2con(nodeid, GFP_NOFS);
+	if (!con) {
+		srcu_read_unlock(&connections_srcu, idx);
+		return -ENOMEM;
 	}
 
-	if (con->sock == NULL) { /* not mutex protected so check it inside too */
-		if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags))
-			msleep(1000);
+	if (test_bit(CF_CLOSE, &con->flags)) {
+		srcu_read_unlock(&connections_srcu, idx);
+		return 0;
+	}
 
+	dlm_con_lock(con);
+	dlm_connect(con);
+	dlm_con_unlock(con);
+	srcu_read_unlock(&connections_srcu, idx);
+
+	cond_resched();
+	return 0;
+}
+
+static void process_close_sockets(struct work_struct *work)
+{
+	struct connection *con = container_of(work, struct connection, cwork);
+	struct connection *sendcon = dlm_sendcon(con);
+	unsigned int delay = 0;
+
+	dlm_con_lock(sendcon);
+	close_connection(con, false);
+	dlm_con_unlock(sendcon);
+
+	switch (con->sk_err) {
+	case ECONNREFUSED:
+		delay = msecs_to_jiffies(1000);
+		break;
+	default:
+		break;
+	}
+
+	dlm_io_delayed_queue(sendcon, &sendcon->swork, delay);
+}
+
+/* Send workqueue function */
+static void process_send_sockets(struct work_struct *work)
+{
+	struct connection *con = container_of(work, struct connection,
+					      swork.work);
+
+	/* be used to connect socket */
+	if (con->sock == NULL) {
 		dlm_con_lock(con);
 		dlm_connect(con);
+		dlm_midcomms_unack_msg_resend(con->nodeid);
 		dlm_con_unlock(con);
 	}
 
-- 
2.26.3



^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [Cluster-devel] [RFC dlm/next 11/15] fs: dlm: introduce process workqueue
  2021-06-23 15:14 [Cluster-devel] [RFC dlm/next 00/15] fs: dlm: performance Alexander Aring
                   ` (9 preceding siblings ...)
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 10/15] fs: dlm: introduce reconnect work Alexander Aring
@ 2021-06-23 15:14 ` Alexander Aring
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 12/15] fs: dlm: remove send starve Alexander Aring
                   ` (4 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Alexander Aring @ 2021-06-23 15:14 UTC (permalink / raw)
  To: cluster-devel.redhat.com

To not block future receive handling calls this patch introduces a
process workqueue which will call dlm_process_incoming_buffer().

While processing dlm messages the current send functionality should come
to an end and no new queues for swork is allowed. This is done by the
introduced connection bit CF_STOP_SEND.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 219 +++++++++++++++++++++++++++++++---------------
 fs/dlm/midcomms.c |  38 ++++++--
 fs/dlm/midcomms.h |   3 +-
 3 files changed, 178 insertions(+), 82 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index a54ed3cf0b45..28d97f8187a5 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -24,15 +24,15 @@
  * responsibility to resolve these into IP address or
  * whatever it needs for inter-node communication.
  *
- * The comms level is two kernel threads that deal mainly with
- * the receiving of messages from other nodes and passing them
- * up to the mid-level comms layer (which understands the
- * message format) for execution by the locking core, and
- * a send thread which does all the setting up of connections
- * to remote nodes and the sending of data. Threads are not allowed
- * to send their own data because it may cause them to wait in times
- * of high load. Also, this way, the sending thread can collect together
- * messages bound for one node and send them in one block.
+ * Each connection can send and receive at the same time which is considered
+ * as hotpath. Closing or accepting new connection is considered as not hotpath
+ * and will block all send and receive per connection. To disallow sending
+ * while processing dlm message the connection flag CF_STOP_SEND was introduced
+ * which disallow any further dequeuing of the connection writequeue and will
+ * not trigger any new queuing of connection swork. If all received dlm
+ * messages are processed the flag will be dropped and a swork will be
+ * triggered. This combines all new messages which appeared while processing
+ * dlm messages.
  *
  * lowcomms will choose to use either TCP or SCTP as its transport layer
  * depending on the configuration variable 'protocol'. This should be set
@@ -75,6 +75,7 @@ struct connection {
 #define CF_CONNECTED 5
 #define CF_EOF 6
 #define CF_STOP 7
+#define CF_STOP_SEND 8
 	struct list_head writequeue;  /* List of outgoing writequeue_entries */
 	spinlock_t writequeue_lock;
 	atomic_t writequeue_cnt;
@@ -91,9 +92,12 @@ struct connection {
 	struct work_struct cwork;
 	int sk_err;
 	wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
-	unsigned char *rx_buf;
-	int rx_buflen;
+	unsigned char rx_leftover_buf[DLM_MAX_SOCKET_BUFSIZE];
 	int rx_leftover;
+	struct work_struct pwork;
+	struct list_head processqueue;
+	spinlock_t processqueue_lock;
+	struct mutex process_lock;
 	struct rcu_head rcu;
 };
 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
@@ -121,6 +125,13 @@ struct writequeue_entry {
 	struct kref ref;
 };
 
+struct processqueue_entry {
+	unsigned char *buf;
+	int buflen;
+
+	struct list_head list;
+};
+
 struct dlm_msg {
 	struct writequeue_entry *entry;
 	struct dlm_msg *orig_msg;
@@ -177,6 +188,7 @@ int dlm_allow_conn;
 
 /* Work queues */
 static struct workqueue_struct *io_workqueue;
+static struct workqueue_struct *process_workqueue;
 
 static struct hlist_head connection_hash[CONN_HASH_SIZE];
 static DEFINE_SPINLOCK(connections_lock);
@@ -185,6 +197,7 @@ DEFINE_STATIC_SRCU(connections_srcu);
 static const struct dlm_proto_ops *dlm_proto_ops;
 
 static void process_close_sockets(struct work_struct *work);
+static void process_dlm_messages(struct work_struct *work);
 static void process_recv_sockets(struct work_struct *work);
 static void process_send_sockets(struct work_struct *work);
 
@@ -227,11 +240,25 @@ static inline struct connection *dlm_sendcon(struct connection *con)
 	return con;
 }
 
+static inline void con_stop_send(struct connection *con)
+{
+	set_bit(CF_STOP_SEND, &con->flags);
+}
+
+static inline void con_resume_send(struct connection *con)
+{
+	clear_bit(CF_STOP_SEND, &con->flags);
+	dlm_io_delayed_queue(con, &con->swork, 0);
+}
+
 /* need to held writequeue_lock */
 static struct writequeue_entry *con_next_wq(struct connection *con)
 {
 	struct writequeue_entry *e;
 
+	if (test_bit(CF_STOP_SEND, &con->flags))
+		return NULL;
+
 	if (list_empty(&con->writequeue))
 		return NULL;
 
@@ -260,13 +287,8 @@ static bool tcp_eof_condition(struct connection *con)
 	return atomic_read(&con->writequeue_cnt);
 }
 
-static int dlm_con_init(struct connection *con, int nodeid)
+static void dlm_con_init(struct connection *con, int nodeid)
 {
-	con->rx_buflen = dlm_config.ci_buffer_size;
-	con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS);
-	if (!con->rx_buf)
-		return -ENOMEM;
-
 	con->nodeid = nodeid;
 	INIT_LIST_HEAD(&con->writequeue);
 	spin_lock_init(&con->writequeue_lock);
@@ -275,8 +297,6 @@ static int dlm_con_init(struct connection *con, int nodeid)
 	INIT_WORK(&con->rwork, process_recv_sockets);
 	INIT_WORK(&con->cwork, process_close_sockets);
 	init_waitqueue_head(&con->shutdown_wait);
-
-	return 0;
 }
 
 /*
@@ -286,7 +306,7 @@ static int dlm_con_init(struct connection *con, int nodeid)
 static struct connection *nodeid2con(int nodeid, gfp_t alloc)
 {
 	struct connection *con, *tmp;
-	int r, ret;
+	int r;
 
 	r = nodeid_hash(nodeid);
 	con = __find_con(nodeid, r);
@@ -297,16 +317,17 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
 	if (!con)
 		return NULL;
 
-	ret = dlm_con_init(con, nodeid);
-	if (ret) {
-		kfree(con);
-		return NULL;
-	}
+	dlm_con_init(con, nodeid);
 
 	mutex_init(&con->rwork_lock);
 	mutex_init(&con->swork_lock);
 	mutex_init(&con->wq_alloc);
 
+	mutex_init(&con->process_lock);
+	INIT_LIST_HEAD(&con->processqueue);
+	spin_lock_init(&con->processqueue_lock);
+	INIT_WORK(&con->pwork, process_dlm_messages);
+
 	spin_lock(&connections_lock);
 	/* Because multiple workqueues/threads calls this function it can
 	 * race on multiple cpu's. Instead of locking hot path __find_con()
@@ -317,7 +338,6 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
 	tmp = __find_con(nodeid, r);
 	if (tmp) {
 		spin_unlock(&connections_lock);
-		kfree(con->rx_buf);
 		kfree(con);
 		return tmp;
 	}
@@ -819,13 +839,17 @@ static void cancel_io_work(struct connection *con, bool and_other)
 	struct connection *sendcon = dlm_sendcon(con);
 
 	set_bit(CF_STOP, &sendcon->flags);
+	/* stop receiving */
 	cancel_work_sync(&sendcon->rwork);
 	cancel_work_sync(&sendcon->cwork);
-	cancel_delayed_work_sync(&sendcon->swork);
 	if (sendcon->othercon && and_other) {
 		cancel_work_sync(&sendcon->othercon->rwork);
 		cancel_work_sync(&sendcon->othercon->cwork);
 	}
+	/* flush pending processes which might trigger swork */
+	flush_work(&sendcon->pwork);
+	/* stop sending */
+	cancel_delayed_work_sync(&sendcon->swork);
 
 	dlm_con_lock(sendcon);
 	close_connection(con, and_other);
@@ -874,45 +898,77 @@ static void shutdown_connection(struct connection *con,
 static void dlm_tcp_shutdown(struct connection *con)
 {
 	/* flush pending processes which might trigger send */
-	flush_work(&con->rwork);
+	flush_work(&con->pwork);
+	/* flush all send */
+	flush_delayed_work(&con->swork);
 
-	if (con->othercon) {
-		flush_work(&con->othercon->rwork);
+	if (con->othercon)
 		shutdown_connection(con->othercon, con);
-	}
 
-	/* flush all send */
-	flush_delayed_work(&con->swork);
 	shutdown_connection(con, con);
 }
 
-static int con_realloc_receive_buf(struct connection *con, int newlen)
+static struct processqueue_entry *new_processqueue_entry(int nodeid,
+							 int buflen)
 {
-	unsigned char *newbuf;
+	struct processqueue_entry *pentry;
 
-	newbuf = kmalloc(newlen, GFP_NOFS);
-	if (!newbuf)
-		return -ENOMEM;
+	pentry = kmalloc(sizeof(*pentry), GFP_NOFS);
+	if (!pentry)
+		return NULL;
 
-	/* copy any leftover from last receive */
-	if (con->rx_leftover)
-		memmove(newbuf, con->rx_buf, con->rx_leftover);
+	pentry->buf = kmalloc(buflen, GFP_NOFS);
+	if (!pentry->buf) {
+		kfree(pentry);
+		return NULL;
+	}
 
-	/* swap to new buffer space */
-	kfree(con->rx_buf);
-	con->rx_buflen = newlen;
-	con->rx_buf = newbuf;
+	return pentry;
+}
 
-	return 0;
+static void free_processqueue_entry(struct processqueue_entry *pentry)
+{
+	kfree(pentry->buf);
+	kfree(pentry);
+}
+
+static void process_dlm_messages(struct work_struct *work)
+{
+	struct connection *con = container_of(work, struct connection, pwork);
+	struct processqueue_entry *pentry;
+
+	mutex_lock(&con->process_lock);
+	con_stop_send(con);
+
+	for (;;) {
+		spin_lock(&con->processqueue_lock);
+		if (list_empty(&con->processqueue)) {
+			spin_unlock(&con->processqueue_lock);
+			break;
+		}
+
+		pentry = list_first_entry(&con->processqueue,
+					  struct processqueue_entry, list);
+		list_del(&pentry->list);
+		spin_unlock(&con->processqueue_lock);
+
+		dlm_process_incoming_buffer(con->nodeid, pentry->buf,
+					    pentry->buflen);
+		free_processqueue_entry(pentry);
+	}
+
+	con_resume_send(con);
+	mutex_unlock(&con->process_lock);
 }
 
 /* Data received from remote end */
 static void receive_from_sock(struct connection *con,
 			      struct connection *sendcon)
 {
+	struct processqueue_entry *pentry;
+	int ret, buflen, buflen_real;
 	struct msghdr msg;
 	struct kvec iov;
-	int ret, buflen;
 
 	mutex_lock(&sendcon->rwork_lock);
 	if (con->sock == NULL) {
@@ -920,20 +976,21 @@ static void receive_from_sock(struct connection *con,
 		return;
 	}
 
-	/* realloc if we get new buffer size to read out */
-	buflen = dlm_config.ci_buffer_size;
-	if (con->rx_buflen != buflen && con->rx_leftover <= buflen) {
-		ret = con_realloc_receive_buf(con, buflen);
-		if (ret < 0)
+	buflen = READ_ONCE(dlm_config.ci_buffer_size);
+	for (;;) {
+		pentry = new_processqueue_entry(con->nodeid, buflen);
+		if (!pentry) {
+			mutex_unlock(&sendcon->rwork_lock);
 			goto out_resched;
-	}
+		}
+
+		memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover);
 
-	for (;;) {
 		/* calculate new buffer parameter regarding last receive and
 		 * possible leftover bytes
 		 */
-		iov.iov_base = con->rx_buf + con->rx_leftover;
-		iov.iov_len = con->rx_buflen - con->rx_leftover;
+		iov.iov_base = pentry->buf + con->rx_leftover;
+		iov.iov_len = buflen - con->rx_leftover;
 
 		memset(&msg, 0, sizeof(msg));
 		msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
@@ -941,28 +998,39 @@ static void receive_from_sock(struct connection *con,
 				     msg.msg_flags);
 		if (ret == 0) {
 			mutex_unlock(&sendcon->rwork_lock);
+			free_processqueue_entry(pentry);
 			goto out_eof;
 		} else if (ret < 0) {
+			free_processqueue_entry(pentry);
 			break;
 		}
 
 		/* new buflen according readed bytes and leftover from last receive */
-		buflen = ret + con->rx_leftover;
-		ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
+		buflen_real = ret + con->rx_leftover;
+		ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf,
+						   buflen_real);
 		if (ret < 0) {
 			mutex_unlock(&sendcon->rwork_lock);
+			free_processqueue_entry(pentry);
 			goto out_close;
 		}
 
+		pentry->buflen = ret;
+
 		/* calculate leftover bytes from process and put it into begin of
 		 * the receive buffer, so next receive we have the full message
 		 * at the start address of the receive buffer.
 		 */
-		con->rx_leftover = buflen - ret;
-		if (con->rx_leftover) {
-			memmove(con->rx_buf, con->rx_buf + ret,
+		con->rx_leftover = buflen_real - ret;
+		if (con->rx_leftover)
+			memmove(con->rx_leftover_buf, pentry->buf + ret,
 				con->rx_leftover);
-		}
+
+		spin_lock(&sendcon->processqueue_lock);
+		list_add_tail(&pentry->list, &sendcon->processqueue);
+		spin_unlock(&sendcon->processqueue_lock);
+
+		queue_work(process_workqueue, &sendcon->pwork);
 	}
 	mutex_unlock(&sendcon->rwork_lock);
 
@@ -983,6 +1051,9 @@ static void receive_from_sock(struct connection *con,
 	log_print("connection %p got EOF from %d",
 		  con, con->nodeid);
 
+	/* flush pending processing which might trigger send */
+	flush_work(&sendcon->pwork);
+
 	if (dlm_proto_ops->eof_condition &&
 	    dlm_proto_ops->eof_condition(con)) {
 		set_bit(CF_EOF, &con->flags);
@@ -1063,13 +1134,7 @@ static int accept_from_sock(struct listen_connection *con)
 				goto accept_err;
 			}
 
-			result = dlm_con_init(othercon, nodeid);
-			if (result < 0) {
-				kfree(othercon);
-				dlm_con_unlock(newcon);
-				srcu_read_unlock(&connections_srcu, idx);
-				goto accept_err;
-			}
+			dlm_con_init(othercon, nodeid);
 
 			set_bit(CF_IS_OTHERCON, &othercon->flags);
 			newcon->othercon = othercon;
@@ -1099,7 +1164,6 @@ static int accept_from_sock(struct listen_connection *con)
 	 * to the read_sockets list
 	 */
 	dlm_io_queue(newcon, &addcon->rwork);
-
 	srcu_read_unlock(&connections_srcu, idx);
 
 	return 0;
@@ -1348,7 +1412,9 @@ static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
 	e->len = DLM_WQ_LENGTH_BYTES(e);
 	spin_unlock(&con->writequeue_lock);
 
-	dlm_io_delayed_queue(con, &con->swork, 0);
+	if (!test_bit(CF_STOP_SEND, &con->flags))
+		dlm_io_delayed_queue(con, &con->swork, 0);
+
 	return;
 
 out:
@@ -1660,13 +1726,23 @@ static void process_send_sockets(struct work_struct *work)
 static void work_stop(void)
 {
 	destroy_workqueue(io_workqueue);
+	destroy_workqueue(process_workqueue);
 }
 
 static int work_start(void)
 {
+	process_workqueue = alloc_workqueue("dlm_process",
+					    WQ_HIGHPRI | WQ_UNBOUND |
+					    WQ_MEM_RECLAIM, 0);
+	if (!process_workqueue) {
+		log_print("can't start dlm_process");
+		return -ENOMEM;
+	}
+
 	io_workqueue = alloc_workqueue("dlm_io",
 				       WQ_UNBOUND | WQ_MEM_RECLAIM, 0);
 	if (!io_workqueue) {
+		destroy_workqueue(process_workqueue);
 		log_print("can't start dlm_io");
 		return -ENOMEM;
 	}
@@ -1702,7 +1778,6 @@ static void connection_release(struct rcu_head *rcu)
 {
 	struct connection *con = container_of(rcu, struct connection, rcu);
 
-	kfree(con->rx_buf);
 	kfree(con);
 }
 
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index e3de268898ed..483f7c54c217 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -872,12 +872,7 @@ static void dlm_midcomms_receive_buffer_3_1(union dlm_packet *p, int nodeid)
 	dlm_receive_buffer(p, nodeid);
 }
 
-/*
- * Called from the low-level comms layer to process a buffer of
- * commands.
- */
-
-int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
+int dlm_validate_incoming_buffer(int nodeid, unsigned char *buf, int len)
 {
 	const unsigned char *ptr = buf;
 	const struct dlm_header *hd;
@@ -885,7 +880,7 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
 	int ret = 0;
 
 	while (len >= sizeof(struct dlm_header)) {
-		hd = (struct dlm_header *)ptr;
+		hd = (const struct dlm_header *)ptr;
 
 		/* no message should be more than DLM_MAX_SOCKET_BUFSIZE or
 		 * less than dlm_header size.
@@ -912,6 +907,33 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
 		if (msglen > len)
 			break;
 
+		ret += msglen;
+		len -= msglen;
+		ptr += msglen;
+	}
+
+	return ret;
+}
+
+/*
+ * Called from the low-level comms layer to process a buffer of
+ * commands.
+ */
+
+void dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
+{
+	const unsigned char *ptr = buf;
+	const struct dlm_header *hd;
+	uint16_t msglen;
+	int ret = 0;
+
+	while (len >= sizeof(struct dlm_header)) {
+		hd = (struct dlm_header *)ptr;
+
+		msglen = le16_to_cpu(hd->h_length);
+		if (msglen > len)
+			break;
+
 		switch (le32_to_cpu(hd->h_version)) {
 		case DLM_VERSION_3_1:
 			dlm_midcomms_receive_buffer_3_1((union dlm_packet *)ptr, nodeid);
@@ -929,8 +951,6 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
 		len -= msglen;
 		ptr += msglen;
 	}
-
-	return ret;
 }
 
 void dlm_midcomms_unack_msg_resend(int nodeid)
diff --git a/fs/dlm/midcomms.h b/fs/dlm/midcomms.h
index 579abc6929be..a62c1ad786ef 100644
--- a/fs/dlm/midcomms.h
+++ b/fs/dlm/midcomms.h
@@ -14,7 +14,8 @@
 
 struct midcomms_node;
 
-int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int buflen);
+void dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int buflen);
+int dlm_validate_incoming_buffer(int nodeid, unsigned char *buf, int len);
 struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
 					     gfp_t allocation, char **ppc);
 void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh);
-- 
2.26.3



^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [Cluster-devel] [RFC dlm/next 12/15] fs: dlm: remove send starve
  2021-06-23 15:14 [Cluster-devel] [RFC dlm/next 00/15] fs: dlm: performance Alexander Aring
                   ` (10 preceding siblings ...)
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 11/15] fs: dlm: introduce process workqueue Alexander Aring
@ 2021-06-23 15:14 ` Alexander Aring
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 13/15] fs: dlm: move writequeue init to sendcon only Alexander Aring
                   ` (3 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Alexander Aring @ 2021-06-23 15:14 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch removes starve handling in case of sending to much. I think
this was introduced that it can serve some other connections when
sending to much. However we now have not an ordered workqueue anymore
and can handle multiple connection kernel_sendpage() at one time.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 8 --------
 1 file changed, 8 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 28d97f8187a5..ddf3c0c98386 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -61,7 +61,6 @@
 #define NEEDED_RMEM (4*1024*1024)
 
 /* Number of messages to send before rescheduling */
-#define MAX_SEND_MSG_COUNT 25
 #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000)
 
 struct connection {
@@ -1464,7 +1463,6 @@ static void send_to_sock(struct connection *con)
 	const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
 	struct writequeue_entry *e;
 	int len, offset, ret;
-	int count = 0;
 
 	mutex_lock(&con->swork_lock);
 	if (con->sock == NULL) {
@@ -1501,12 +1499,6 @@ static void send_to_sock(struct connection *con)
 		} else if (ret < 0)
 			goto out;
 
-		/* Don't starve people filling buffers */
-		if (++count >= MAX_SEND_MSG_COUNT) {
-			cond_resched();
-			count = 0;
-		}
-
 		spin_lock(&con->writequeue_lock);
 		writequeue_entry_complete(e, ret);
 	}
-- 
2.26.3



^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [Cluster-devel] [RFC dlm/next 13/15] fs: dlm: move writequeue init to sendcon only
  2021-06-23 15:14 [Cluster-devel] [RFC dlm/next 00/15] fs: dlm: performance Alexander Aring
                   ` (11 preceding siblings ...)
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 12/15] fs: dlm: remove send starve Alexander Aring
@ 2021-06-23 15:14 ` Alexander Aring
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 14/15] fs: dlm: flush listen con Alexander Aring
                   ` (2 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Alexander Aring @ 2021-06-23 15:14 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch inits only sendconf functionality to the sendcon and not
othercon. If we have make a mistake by accident we can see it when the
kernel crashes because the othercon was queued for transmitting which
should never be the case. Also add a comment about the othercon handling
and why it's there and how we could possible remove it with breaking
backwards compatibility.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 61 ++++++++++++++++++++++++++++++++++-------------
 1 file changed, 44 insertions(+), 17 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index ddf3c0c98386..e858453b4eb7 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -40,6 +40,29 @@
  * cluster-wide mechanism as it must be the same on all nodes of the cluster
  * for the DLM to function.
  *
+ * TODO:
+ *
+ * Special note about the "othercon" field in connection structure. This is
+ * only set if we hit a race between two peers do connect() and accept() at
+ * the same time. If we don't accept the connection and close it the other
+ * peer will disconnect on it's non "othercon" connection which is known as
+ * "sendcon". It's named "sendcon" because it is a must that we only do
+ * sending over this connection. If we hit the race then the "othercon" is
+ * be used for receiving only. Calling a "ss -t" shows two connections in
+ * this case.
+ *
+ * Overall it makes this code a lot of confusion and the code tries to use
+ * only the "sendcon" as resource e.g. mutexes. As the race is only sometimes
+ * there moving "othercon" in it's own struct e.g. "struct connection_other"
+ * makes it difficult to deal with it when we don't hit the race and the
+ * receiving is done by "sendcon".
+ *
+ * There exists an idea by Steve Whitehouse to get rid of this race by
+ * introducing a priotize accept() rule e.g. $OWN_NODEID < $PEER_NODEID.
+ * If the condition is true we accept the node otherwise we trigger a reconnect
+ * to this peer (because the peer wants connect again). However this is not
+ * backwards compatible and will break the connection handling with the
+ * "othercon" handling.
  */
 
 #include <asm/ioctls.h>
@@ -283,16 +306,15 @@ static struct connection *__find_con(int nodeid, int r)
 
 static bool tcp_eof_condition(struct connection *con)
 {
+	if (test_bit(CF_IS_OTHERCON, &con->flags))
+		return false;
+
 	return atomic_read(&con->writequeue_cnt);
 }
 
 static void dlm_con_init(struct connection *con, int nodeid)
 {
 	con->nodeid = nodeid;
-	INIT_LIST_HEAD(&con->writequeue);
-	spin_lock_init(&con->writequeue_lock);
-	atomic_set(&con->writequeue_cnt, 0);
-	INIT_DELAYED_WORK(&con->swork, process_send_sockets);
 	INIT_WORK(&con->rwork, process_recv_sockets);
 	INIT_WORK(&con->cwork, process_close_sockets);
 	init_waitqueue_head(&con->shutdown_wait);
@@ -320,7 +342,12 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
 
 	mutex_init(&con->rwork_lock);
 	mutex_init(&con->swork_lock);
+
 	mutex_init(&con->wq_alloc);
+	INIT_LIST_HEAD(&con->writequeue);
+	spin_lock_init(&con->writequeue_lock);
+	atomic_set(&con->writequeue_cnt, 0);
+	INIT_DELAYED_WORK(&con->swork, process_send_sockets);
 
 	mutex_init(&con->process_lock);
 	INIT_LIST_HEAD(&con->processqueue);
@@ -813,20 +840,23 @@ static void close_connection(struct connection *con, bool and_other)
 	 * our policy is to start on a clean state when disconnects, we don't
 	 * know what's send/received on transport layer in this case.
 	 */
-	spin_lock(&con->writequeue_lock);
-	if (!list_empty(&con->writequeue)) {
-		e = list_first_entry(&con->writequeue, struct writequeue_entry,
-				     list);
-		if (e->dirty)
-			free_entry(e);
+	if (!test_bit(CF_IS_OTHERCON, &con->flags)) {
+		spin_lock(&con->writequeue_lock);
+		if (!list_empty(&con->writequeue)) {
+			e = list_first_entry(&con->writequeue, struct writequeue_entry,
+					     list);
+			if (e->dirty)
+				free_entry(e);
+		}
+		spin_unlock(&con->writequeue_lock);
+
+		con->retries = 0;
+		clear_bit(CF_APP_LIMITED, &con->flags);
+		clear_bit(CF_EOF, &con->flags);
 	}
-	spin_unlock(&con->writequeue_lock);
 
 	con->rx_leftover = 0;
-	con->retries = 0;
-	clear_bit(CF_APP_LIMITED, &con->flags);
 	clear_bit(CF_CONNECTED, &con->flags);
-	clear_bit(CF_EOF, &con->flags);
 
 	/* handling for tcp shutdown */
 	clear_bit(CF_SHUTDOWN, &con->flags);
@@ -1544,8 +1574,6 @@ int dlm_lowcomms_close(int nodeid)
 		set_bit(CF_CLOSE, &con->flags);
 		cancel_io_work(con, true);
 		clean_one_writequeue(con);
-		if (con->othercon)
-			clean_one_writequeue(con->othercon);
 	}
 	srcu_read_unlock(&connections_srcu, idx);
 
@@ -1781,7 +1809,6 @@ static void free_conn(struct connection *con)
 	hlist_del_rcu(&con->list);
 	spin_unlock(&connections_lock);
 	if (con->othercon) {
-		clean_one_writequeue(con->othercon);
 		call_srcu(&connections_srcu, &con->othercon->rcu,
 			  connection_release);
 	}
-- 
2.26.3



^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [Cluster-devel] [RFC dlm/next 14/15] fs: dlm: flush listen con
  2021-06-23 15:14 [Cluster-devel] [RFC dlm/next 00/15] fs: dlm: performance Alexander Aring
                   ` (12 preceding siblings ...)
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 13/15] fs: dlm: move writequeue init to sendcon only Alexander Aring
@ 2021-06-23 15:14 ` Alexander Aring
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 15/15] fs: dlm: move srcu into loop call Alexander Aring
  2021-06-23 21:31 ` [Cluster-devel] [RFC dlm/next 00/15] fs: dlm: performance Alexander Aring
  15 siblings, 0 replies; 17+ messages in thread
From: Alexander Aring @ 2021-06-23 15:14 UTC (permalink / raw)
  To: cluster-devel.redhat.com

Instead of flush the whole io_workqueue we can just flush the listen con
work to be sure that no new connection will be accepted anymore.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index e858453b4eb7..bb088ebc1614 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -1784,9 +1784,7 @@ void dlm_lowcomms_shutdown(void)
 	 * socket activity.
 	 */
 	dlm_allow_conn = 0;
-
-	flush_workqueue(io_workqueue);
-
+	flush_work(&listen_con.rwork);
 	dlm_close_sock(&listen_con.sock);
 
 	idx = srcu_read_lock(&connections_srcu);
-- 
2.26.3



^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [Cluster-devel] [RFC dlm/next 15/15] fs: dlm: move srcu into loop call
  2021-06-23 15:14 [Cluster-devel] [RFC dlm/next 00/15] fs: dlm: performance Alexander Aring
                   ` (13 preceding siblings ...)
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 14/15] fs: dlm: flush listen con Alexander Aring
@ 2021-06-23 15:14 ` Alexander Aring
  2021-06-23 21:31 ` [Cluster-devel] [RFC dlm/next 00/15] fs: dlm: performance Alexander Aring
  15 siblings, 0 replies; 17+ messages in thread
From: Alexander Aring @ 2021-06-23 15:14 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch cleans up the foreach_conn() helper that we hold the srcu
lock while iterating over the connections.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 12 +++---------
 1 file changed, 3 insertions(+), 9 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index bb088ebc1614..ac4325c4865f 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -377,13 +377,15 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
 /* Loop round all connections */
 static void foreach_conn(void (*conn_func)(struct connection *c))
 {
-	int i;
 	struct connection *con;
+	int i, idx;
 
+	idx = srcu_read_lock(&connections_srcu);
 	for (i = 0; i < CONN_HASH_SIZE; i++) {
 		hlist_for_each_entry_rcu(con, &connection_hash[i], list)
 			conn_func(con);
 	}
+	srcu_read_unlock(&connections_srcu, idx);
 }
 
 static struct dlm_node_addr *find_node_addr(int nodeid)
@@ -1778,8 +1780,6 @@ static void shutdown_conn(struct connection *con)
 
 void dlm_lowcomms_shutdown(void)
 {
-	int idx;
-
 	/* Set all the flags to prevent any
 	 * socket activity.
 	 */
@@ -1787,9 +1787,7 @@ void dlm_lowcomms_shutdown(void)
 	flush_work(&listen_con.rwork);
 	dlm_close_sock(&listen_con.sock);
 
-	idx = srcu_read_lock(&connections_srcu);
 	foreach_conn(shutdown_conn);
-	srcu_read_unlock(&connections_srcu, idx);
 }
 
 static void connection_release(struct rcu_head *rcu)
@@ -1816,11 +1814,7 @@ static void free_conn(struct connection *con)
 
 void dlm_lowcomms_stop(void)
 {
-	int idx;
-
-	idx = srcu_read_lock(&connections_srcu);
 	foreach_conn(free_conn);
-	srcu_read_unlock(&connections_srcu, idx);
 	work_stop();
 	deinit_local();
 
-- 
2.26.3



^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [Cluster-devel] [RFC dlm/next 00/15] fs: dlm: performance
  2021-06-23 15:14 [Cluster-devel] [RFC dlm/next 00/15] fs: dlm: performance Alexander Aring
                   ` (14 preceding siblings ...)
  2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 15/15] fs: dlm: move srcu into loop call Alexander Aring
@ 2021-06-23 21:31 ` Alexander Aring
  15 siblings, 0 replies; 17+ messages in thread
From: Alexander Aring @ 2021-06-23 21:31 UTC (permalink / raw)
  To: cluster-devel.redhat.com

Hi,

On Wed, Jun 23, 2021 at 11:15 AM Alexander Aring <aahringo@redhat.com> wrote:
...
>
> WARNING:
>
> First time ever we let DLM application layer process parallel dlm messages,
> BUT processing is per node/connection in an ordered way (which is
> required). I tested it and I did saw no problems and think that global/per
> lockspace multiple access per nodeid is correct protected for mutual
> access.
>

okay. David Teigland showed me that DLM isn't ready for this step. See
functions e.g. "setup_stub_lkb()"..., I am not sure why I never ran
into some problems...
I will split some patches from this series and try to find/solve all
cases in DLM as next step.

- Alex



^ permalink raw reply	[flat|nested] 17+ messages in thread

end of thread, other threads:[~2021-06-23 21:31 UTC | newest]

Thread overview: 17+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-06-23 15:14 [Cluster-devel] [RFC dlm/next 00/15] fs: dlm: performance Alexander Aring
2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 01/15] fs: dlm: clear CF_APP_LIMITED on close Alexander Aring
2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 02/15] fs: dlm: introduce con_next_wq helper Alexander Aring
2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 03/15] fs: dlm: move to static proto ops Alexander Aring
2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 04/15] fs: dlm: introduce generic listen Alexander Aring
2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 05/15] fs: dlm: auto load sctp module Alexander Aring
2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 06/15] fs: dlm: generic connect func Alexander Aring
2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 07/15] fs: dlm: fix multiple empty writequeue alloc Alexander Aring
2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 08/15] fs: dlm: move receive loop into receive handler Alexander Aring
2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 09/15] fs: dlm: introduce io_workqueue Alexander Aring
2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 10/15] fs: dlm: introduce reconnect work Alexander Aring
2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 11/15] fs: dlm: introduce process workqueue Alexander Aring
2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 12/15] fs: dlm: remove send starve Alexander Aring
2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 13/15] fs: dlm: move writequeue init to sendcon only Alexander Aring
2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 14/15] fs: dlm: flush listen con Alexander Aring
2021-06-23 15:14 ` [Cluster-devel] [RFC dlm/next 15/15] fs: dlm: move srcu into loop call Alexander Aring
2021-06-23 21:31 ` [Cluster-devel] [RFC dlm/next 00/15] fs: dlm: performance Alexander Aring

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.