All of lore.kernel.org
 help / color / mirror / Atom feed
* [Cluster-devel] [PATCH dlm-next 1/7] fs: dlm: use list_empty() to check last iteration
@ 2021-11-30 19:47 Alexander Aring
  2021-11-30 19:47 ` [Cluster-devel] [PATCH dlm-next 2/7] fs: dlm: check for pending users filling buffers Alexander Aring
                   ` (5 more replies)
  0 siblings, 6 replies; 7+ messages in thread
From: Alexander Aring @ 2021-11-30 19:47 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch will use list_empty(&ls->ls_cb_delay) to check for last list
iteration. In case of a multiply count of MAX_CB_QUEUE and the list is
empty we do a extra goto more which we can avoid by checking on
list_empty().

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/ast.c | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 27bae7d4a477..bfac462dd3e8 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -300,6 +300,7 @@ void dlm_callback_resume(struct dlm_ls *ls)
 {
 	struct dlm_lkb *lkb, *safe;
 	int count = 0, sum = 0;
+	bool empty;
 
 	clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
 
@@ -315,10 +316,11 @@ void dlm_callback_resume(struct dlm_ls *ls)
 		if (count == MAX_CB_QUEUE)
 			break;
 	}
+	empty = list_empty(&ls->ls_cb_delay);
 	mutex_unlock(&ls->ls_cb_mutex);
 
 	sum += count;
-	if (count == MAX_CB_QUEUE) {
+	if (!empty) {
 		count = 0;
 		cond_resched();
 		goto more;
-- 
2.27.0



^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [Cluster-devel] [PATCH dlm-next 2/7] fs: dlm: check for pending users filling buffers
  2021-11-30 19:47 [Cluster-devel] [PATCH dlm-next 1/7] fs: dlm: use list_empty() to check last iteration Alexander Aring
@ 2021-11-30 19:47 ` Alexander Aring
  2021-11-30 19:47 ` [Cluster-devel] [PATCH dlm-next 3/7] fs: dlm: use event based wait for pending remove Alexander Aring
                   ` (4 subsequent siblings)
  5 siblings, 0 replies; 7+ messages in thread
From: Alexander Aring @ 2021-11-30 19:47 UTC (permalink / raw)
  To: cluster-devel.redhat.com

Currently we don't care if the DLM application stack is filling buffers
(not committed yet) while we transmit some already committed buffers.
By checking on active writequeue users before dequeue a writequeue entry
we know there is coming more data and do nothing. We wait until the send
worker will be triggered again if the writequeue entry users hit zero.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index f7fc1ac76ce8..6d500ebc6145 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -201,7 +201,10 @@ static struct writequeue_entry *con_next_wq(struct connection *con)
 
 	e = list_first_entry(&con->writequeue, struct writequeue_entry,
 			     list);
-	if (e->len == 0)
+	/* if len is zero nothing is to send, if there are users filling
+	 * buffers we wait until the users are done so we can send more.
+	 */
+	if (e->users || e->len == 0)
 		return NULL;
 
 	return e;
-- 
2.27.0



^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [Cluster-devel] [PATCH dlm-next 3/7] fs: dlm: use event based wait for pending remove
  2021-11-30 19:47 [Cluster-devel] [PATCH dlm-next 1/7] fs: dlm: use list_empty() to check last iteration Alexander Aring
  2021-11-30 19:47 ` [Cluster-devel] [PATCH dlm-next 2/7] fs: dlm: check for pending users filling buffers Alexander Aring
@ 2021-11-30 19:47 ` Alexander Aring
  2021-11-30 19:47 ` [Cluster-devel] [PATCH dlm-next 4/7] fs: dlm: remove wq_alloc mutex Alexander Aring
                   ` (3 subsequent siblings)
  5 siblings, 0 replies; 7+ messages in thread
From: Alexander Aring @ 2021-11-30 19:47 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch will use an event based waitqueue to wait for a possible clash
with the ls_remove_name field of dlm_ls instead of doing busy waiting.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/dlm_internal.h |  1 +
 fs/dlm/lock.c         | 19 ++++++++++++-------
 fs/dlm/lockspace.c    |  1 +
 3 files changed, 14 insertions(+), 7 deletions(-)

diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 019931804af9..74a9590a4dd5 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -576,6 +576,7 @@ struct dlm_ls {
 	struct list_head	ls_new_rsb;	/* new rsb structs */
 
 	spinlock_t		ls_remove_spin;
+	wait_queue_head_t	ls_remove_wait;
 	char			ls_remove_name[DLM_RESNAME_MAXLEN+1];
 	char			*ls_remove_names[DLM_REMOVE_NAMES_MAX];
 	int			ls_remove_len;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 54705d367076..bdb51d209ba2 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1626,21 +1626,24 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
 }
 
 /* If there's an rsb for the same resource being removed, ensure
-   that the remove message is sent before the new lookup message.
-   It should be rare to need a delay here, but if not, then it may
-   be worthwhile to add a proper wait mechanism rather than a delay. */
+ * that the remove message is sent before the new lookup message.
+ */
+
+#define DLM_WAIT_PENDING_COND(ls, r)		\
+	(ls->ls_remove_len &&			\
+	 !rsb_cmp(r, ls->ls_remove_name,	\
+		  ls->ls_remove_len))
 
 static void wait_pending_remove(struct dlm_rsb *r)
 {
 	struct dlm_ls *ls = r->res_ls;
  restart:
 	spin_lock(&ls->ls_remove_spin);
-	if (ls->ls_remove_len &&
-	    !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) {
+	if (DLM_WAIT_PENDING_COND(ls, r)) {
 		log_debug(ls, "delay lookup for remove dir %d %s",
-		  	  r->res_dir_nodeid, r->res_name);
+			  r->res_dir_nodeid, r->res_name);
 		spin_unlock(&ls->ls_remove_spin);
-		msleep(1);
+		wait_event(ls->ls_remove_wait, !DLM_WAIT_PENDING_COND(ls, r));
 		goto restart;
 	}
 	spin_unlock(&ls->ls_remove_spin);
@@ -1792,6 +1795,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 		memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
 		spin_unlock(&ls->ls_remove_spin);
 		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		wake_up(&ls->ls_remove_wait);
 
 		send_remove(r);
 
@@ -4075,6 +4079,7 @@ static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len)
 	memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
 	spin_unlock(&ls->ls_remove_spin);
 	spin_unlock(&ls->ls_rsbtbl[b].lock);
+	wake_up(&ls->ls_remove_wait);
 
 	rv = _create_message(ls, sizeof(struct dlm_message) + len,
 			     dir_nodeid, DLM_MSG_REMOVE, &ms, &mh);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 2e51bd2bdacc..31384e7d6f90 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -512,6 +512,7 @@ static int new_lockspace(const char *name, const char *cluster,
 	}
 
 	spin_lock_init(&ls->ls_remove_spin);
+	init_waitqueue_head(&ls->ls_remove_wait);
 
 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
 		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
-- 
2.27.0



^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [Cluster-devel] [PATCH dlm-next 4/7] fs: dlm: remove wq_alloc mutex
  2021-11-30 19:47 [Cluster-devel] [PATCH dlm-next 1/7] fs: dlm: use list_empty() to check last iteration Alexander Aring
  2021-11-30 19:47 ` [Cluster-devel] [PATCH dlm-next 2/7] fs: dlm: check for pending users filling buffers Alexander Aring
  2021-11-30 19:47 ` [Cluster-devel] [PATCH dlm-next 3/7] fs: dlm: use event based wait for pending remove Alexander Aring
@ 2021-11-30 19:47 ` Alexander Aring
  2021-11-30 19:47 ` [Cluster-devel] [PATCH dlm-next 5/7] fs: dlm: memory cache for midcomms hotpath Alexander Aring
                   ` (2 subsequent siblings)
  5 siblings, 0 replies; 7+ messages in thread
From: Alexander Aring @ 2021-11-30 19:47 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch cleanups the code for allocating a new buffer in the dlm
writequeue mechanism. There was a possible tuneup to allow scheduling
while a new writequeue entry needs to be allocated because either no
sending page is available or are full. To avoid multiple concurrent
users checking at the same time if an entry is available or full
alloc_wq was introduce that those are waiting if there is currently a
new writequeue entry in process to be queued so possible further users
will check on the new allocated writequeue entry if it's full.

To simplify the code we just remove this mutex and switch that the
already introduced spin lock will be held during writequeue check,
allocation and queueing. So other users can never check on available
writequeues while there is a new one in process but not queued yet.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 48 +++++++++++------------------------------------
 1 file changed, 11 insertions(+), 37 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 6d500ebc6145..4919faf79709 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -86,7 +86,6 @@ struct connection {
 	struct list_head writequeue;  /* List of outgoing writequeue_entries */
 	spinlock_t writequeue_lock;
 	atomic_t writequeue_cnt;
-	struct mutex wq_alloc;
 	int retries;
 #define MAX_CONNECT_RETRIES 3
 	struct hlist_node list;
@@ -270,8 +269,6 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
 		return NULL;
 	}
 
-	mutex_init(&con->wq_alloc);
-
 	spin_lock(&connections_lock);
 	/* Because multiple workqueues/threads calls this function it can
 	 * race on multiple cpu's. Instead of locking hot path __find_con()
@@ -1176,16 +1173,15 @@ static void deinit_local(void)
 		kfree(dlm_local_addr[i]);
 }
 
-static struct writequeue_entry *new_writequeue_entry(struct connection *con,
-						     gfp_t allocation)
+static struct writequeue_entry *new_writequeue_entry(struct connection *con)
 {
 	struct writequeue_entry *entry;
 
-	entry = kzalloc(sizeof(*entry), allocation);
+	entry = kzalloc(sizeof(*entry), GFP_ATOMIC);
 	if (!entry)
 		return NULL;
 
-	entry->page = alloc_page(allocation | __GFP_ZERO);
+	entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO);
 	if (!entry->page) {
 		kfree(entry);
 		return NULL;
@@ -1200,8 +1196,8 @@ static struct writequeue_entry *new_writequeue_entry(struct connection *con,
 }
 
 static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
-					     gfp_t allocation, char **ppc,
-					     void (*cb)(void *data), void *data)
+					     char **ppc, void (*cb)(void *data),
+					     void *data)
 {
 	struct writequeue_entry *e;
 
@@ -1217,29 +1213,25 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
 
 			e->end += len;
 			e->users++;
-			spin_unlock(&con->writequeue_lock);
-
-			return e;
+			goto out;
 		}
 	}
-	spin_unlock(&con->writequeue_lock);
 
-	e = new_writequeue_entry(con, allocation);
+	e = new_writequeue_entry(con);
 	if (!e)
-		return NULL;
+		goto out;
 
 	kref_get(&e->ref);
 	*ppc = page_address(e->page);
 	e->end += len;
 	atomic_inc(&con->writequeue_cnt);
-
-	spin_lock(&con->writequeue_lock);
 	if (cb)
 		cb(data);
 
 	list_add_tail(&e->list, &con->writequeue);
-	spin_unlock(&con->writequeue_lock);
 
+out:
+	spin_unlock(&con->writequeue_lock);
 	return e;
 };
 
@@ -1250,37 +1242,19 @@ static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
 {
 	struct writequeue_entry *e;
 	struct dlm_msg *msg;
-	bool sleepable;
 
 	msg = kzalloc(sizeof(*msg), allocation);
 	if (!msg)
 		return NULL;
 
-	/* this mutex is being used as a wait to avoid multiple "fast"
-	 * new writequeue page list entry allocs in new_wq_entry in
-	 * normal operation which is sleepable context. Without it
-	 * we could end in multiple writequeue entries with one
-	 * dlm message because multiple callers were waiting at
-	 * the writequeue_lock in new_wq_entry().
-	 */
-	sleepable = gfpflags_normal_context(allocation);
-	if (sleepable)
-		mutex_lock(&con->wq_alloc);
-
 	kref_init(&msg->ref);
 
-	e = new_wq_entry(con, len, allocation, ppc, cb, data);
+	e = new_wq_entry(con, len, ppc, cb, data);
 	if (!e) {
-		if (sleepable)
-			mutex_unlock(&con->wq_alloc);
-
 		kfree(msg);
 		return NULL;
 	}
 
-	if (sleepable)
-		mutex_unlock(&con->wq_alloc);
-
 	msg->ppc = *ppc;
 	msg->len = len;
 	msg->entry = e;
-- 
2.27.0



^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [Cluster-devel] [PATCH dlm-next 5/7] fs: dlm: memory cache for midcomms hotpath
  2021-11-30 19:47 [Cluster-devel] [PATCH dlm-next 1/7] fs: dlm: use list_empty() to check last iteration Alexander Aring
                   ` (2 preceding siblings ...)
  2021-11-30 19:47 ` [Cluster-devel] [PATCH dlm-next 4/7] fs: dlm: remove wq_alloc mutex Alexander Aring
@ 2021-11-30 19:47 ` Alexander Aring
  2021-11-30 19:47 ` [Cluster-devel] [PATCH dlm-next 6/7] fs: dlm: memory cache for writequeue_entry Alexander Aring
  2021-11-30 19:47 ` [Cluster-devel] [PATCH dlm-next 7/7] fs: dlm: memory cache for lowcomms hotpath Alexander Aring
  5 siblings, 0 replies; 7+ messages in thread
From: Alexander Aring @ 2021-11-30 19:47 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch will introduce a kmem cache for allocating message handles
which are needed for midcomms layer to take track of lowcomms messages.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/memory.c   | 31 ++++++++++++++++++++++++++-----
 fs/dlm/memory.h   |  2 ++
 fs/dlm/midcomms.c | 21 +++++++++++++++------
 fs/dlm/midcomms.h |  1 +
 4 files changed, 44 insertions(+), 11 deletions(-)

diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c
index 5918f4d39586..8996c6453ad5 100644
--- a/fs/dlm/memory.c
+++ b/fs/dlm/memory.c
@@ -10,32 +10,44 @@
 ******************************************************************************/
 
 #include "dlm_internal.h"
+#include "midcomms.h"
 #include "config.h"
 #include "memory.h"
 
+static struct kmem_cache *mhandle_cache;
 static struct kmem_cache *lkb_cache;
 static struct kmem_cache *rsb_cache;
 
 
 int __init dlm_memory_init(void)
 {
+	mhandle_cache = dlm_midcomms_cache_create();
+	if (!mhandle_cache)
+		goto out;
+
 	lkb_cache = kmem_cache_create("dlm_lkb", sizeof(struct dlm_lkb),
 				__alignof__(struct dlm_lkb), 0, NULL);
 	if (!lkb_cache)
-		return -ENOMEM;
+		goto lkb;
 
 	rsb_cache = kmem_cache_create("dlm_rsb", sizeof(struct dlm_rsb),
 				__alignof__(struct dlm_rsb), 0, NULL);
-	if (!rsb_cache) {
-		kmem_cache_destroy(lkb_cache);
-		return -ENOMEM;
-	}
+	if (!rsb_cache)
+		goto rsb;
 
 	return 0;
+
+rsb:
+	kmem_cache_destroy(lkb_cache);
+lkb:
+	kmem_cache_destroy(mhandle_cache);
+out:
+	return -ENOMEM;
 }
 
 void dlm_memory_exit(void)
 {
+	kmem_cache_destroy(mhandle_cache);
 	kmem_cache_destroy(lkb_cache);
 	kmem_cache_destroy(rsb_cache);
 }
@@ -89,3 +101,12 @@ void dlm_free_lkb(struct dlm_lkb *lkb)
 	kmem_cache_free(lkb_cache, lkb);
 }
 
+struct dlm_mhandle *dlm_allocate_mhandle(void)
+{
+	return kmem_cache_alloc(mhandle_cache, GFP_NOFS);
+}
+
+void dlm_free_mhandle(struct dlm_mhandle *mhandle)
+{
+	kmem_cache_free(mhandle_cache, mhandle);
+}
diff --git a/fs/dlm/memory.h b/fs/dlm/memory.h
index 4f218ea4b187..c4d46be778a2 100644
--- a/fs/dlm/memory.h
+++ b/fs/dlm/memory.h
@@ -20,6 +20,8 @@ struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls);
 void dlm_free_lkb(struct dlm_lkb *l);
 char *dlm_allocate_lvb(struct dlm_ls *ls);
 void dlm_free_lvb(char *l);
+struct dlm_mhandle *dlm_allocate_mhandle(void);
+void dlm_free_mhandle(struct dlm_mhandle *mhandle);
 
 #endif		/* __MEMORY_DOT_H__ */
 
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index 74b4308b912c..3635e42b0669 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -137,6 +137,7 @@
 #include "dlm_internal.h"
 #include "lowcomms.h"
 #include "config.h"
+#include "memory.h"
 #include "lock.h"
 #include "util.h"
 #include "midcomms.h"
@@ -220,6 +221,12 @@ DEFINE_STATIC_SRCU(nodes_srcu);
  */
 static DEFINE_MUTEX(close_lock);
 
+struct kmem_cache *dlm_midcomms_cache_create(void)
+{
+	return kmem_cache_create("dlm_mhandle", sizeof(struct dlm_mhandle),
+				 0, 0, NULL);
+}
+
 static inline const char *dlm_state_str(int state)
 {
 	switch (state) {
@@ -279,7 +286,7 @@ static void dlm_mhandle_release(struct rcu_head *rcu)
 	struct dlm_mhandle *mh = container_of(rcu, struct dlm_mhandle, rcu);
 
 	dlm_lowcomms_put_msg(mh->msg);
-	kfree(mh);
+	dlm_free_mhandle(mh);
 }
 
 static void dlm_mhandle_delete(struct midcomms_node *node,
@@ -1073,10 +1080,12 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
 	/* this is a bug, however we going on and hope it will be resolved */
 	WARN_ON(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags));
 
-	mh = kzalloc(sizeof(*mh), GFP_NOFS);
+	mh = dlm_allocate_mhandle();
 	if (!mh)
 		goto err;
 
+	mh->committed = false;
+	mh->ack_rcv = NULL;
 	mh->idx = idx;
 	mh->node = node;
 
@@ -1085,7 +1094,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
 		msg = dlm_lowcomms_new_msg(nodeid, len, allocation, ppc,
 					   NULL, NULL);
 		if (!msg) {
-			kfree(mh);
+			dlm_free_mhandle(mh);
 			goto err;
 		}
 
@@ -1094,13 +1103,13 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
 		msg = dlm_midcomms_get_msg_3_2(mh, nodeid, len, allocation,
 					       ppc);
 		if (!msg) {
-			kfree(mh);
+			dlm_free_mhandle(mh);
 			goto err;
 		}
 
 		break;
 	default:
-		kfree(mh);
+		dlm_free_mhandle(mh);
 		WARN_ON(1);
 		goto err;
 	}
@@ -1136,7 +1145,7 @@ void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh)
 		dlm_lowcomms_commit_msg(mh->msg);
 		dlm_lowcomms_put_msg(mh->msg);
 		/* mh is not part of rcu list in this case */
-		kfree(mh);
+		dlm_free_mhandle(mh);
 		break;
 	case DLM_VERSION_3_2:
 		dlm_midcomms_commit_msg_3_2(mh);
diff --git a/fs/dlm/midcomms.h b/fs/dlm/midcomms.h
index bc63cf73aa87..82bcd9661922 100644
--- a/fs/dlm/midcomms.h
+++ b/fs/dlm/midcomms.h
@@ -30,6 +30,7 @@ int dlm_midcomms_send_queue_cnt(struct midcomms_node *node);
 uint32_t dlm_midcomms_version(struct midcomms_node *node);
 int dlm_midcomms_rawmsg_send(struct midcomms_node *node, void *buf,
 			     int buflen);
+struct kmem_cache *dlm_midcomms_cache_create(void);
 
 #endif				/* __MIDCOMMS_DOT_H__ */
 
-- 
2.27.0



^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [Cluster-devel] [PATCH dlm-next 6/7] fs: dlm: memory cache for writequeue_entry
  2021-11-30 19:47 [Cluster-devel] [PATCH dlm-next 1/7] fs: dlm: use list_empty() to check last iteration Alexander Aring
                   ` (3 preceding siblings ...)
  2021-11-30 19:47 ` [Cluster-devel] [PATCH dlm-next 5/7] fs: dlm: memory cache for midcomms hotpath Alexander Aring
@ 2021-11-30 19:47 ` Alexander Aring
  2021-11-30 19:47 ` [Cluster-devel] [PATCH dlm-next 7/7] fs: dlm: memory cache for lowcomms hotpath Alexander Aring
  5 siblings, 0 replies; 7+ messages in thread
From: Alexander Aring @ 2021-11-30 19:47 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch introduces a kmem cache for writequeue entry. A writequeue
entry get quite a lot allocated if dlm transmit messages.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 26 +++++++++++++++++++++-----
 fs/dlm/lowcomms.h |  1 +
 fs/dlm/memory.c   | 21 ++++++++++++++++++++-
 fs/dlm/memory.h   |  2 ++
 4 files changed, 44 insertions(+), 6 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 4919faf79709..300f44c5d132 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -58,6 +58,7 @@
 #include "dlm_internal.h"
 #include "lowcomms.h"
 #include "midcomms.h"
+#include "memory.h"
 #include "config.h"
 
 #define NEEDED_RMEM (4*1024*1024)
@@ -190,6 +191,19 @@ static const struct dlm_proto_ops *dlm_proto_ops;
 static void process_recv_sockets(struct work_struct *work);
 static void process_send_sockets(struct work_struct *work);
 
+static void writequeue_entry_ctor(void *data)
+{
+	struct writequeue_entry *entry = data;
+
+	INIT_LIST_HEAD(&entry->msgs);
+}
+
+struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void)
+{
+	return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry),
+				 0, 0, writequeue_entry_ctor);
+}
+
 /* need to held writequeue_lock */
 static struct writequeue_entry *con_next_wq(struct connection *con)
 {
@@ -728,7 +742,7 @@ static void dlm_page_release(struct kref *kref)
 						  ref);
 
 	__free_page(e->page);
-	kfree(e);
+	dlm_free_writequeue(e);
 }
 
 static void dlm_msg_release(struct kref *kref)
@@ -1177,21 +1191,23 @@ static struct writequeue_entry *new_writequeue_entry(struct connection *con)
 {
 	struct writequeue_entry *entry;
 
-	entry = kzalloc(sizeof(*entry), GFP_ATOMIC);
+	entry = dlm_allocate_writequeue();
 	if (!entry)
 		return NULL;
 
 	entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO);
 	if (!entry->page) {
-		kfree(entry);
+		dlm_free_writequeue(entry);
 		return NULL;
 	}
 
+	entry->offset = 0;
+	entry->len = 0;
+	entry->end = 0;
+	entry->dirty = false;
 	entry->con = con;
 	entry->users = 1;
 	kref_init(&entry->ref);
-	INIT_LIST_HEAD(&entry->msgs);
-
 	return entry;
 }
 
diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h
index 8108ea24ec30..6c8f4ce457f0 100644
--- a/fs/dlm/lowcomms.h
+++ b/fs/dlm/lowcomms.h
@@ -47,6 +47,7 @@ int dlm_lowcomms_connect_node(int nodeid);
 int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark);
 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len);
 void dlm_midcomms_receive_done(int nodeid);
+struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void);
 
 #endif				/* __LOWCOMMS_DOT_H__ */
 
diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c
index 8996c6453ad5..94af986e83c6 100644
--- a/fs/dlm/memory.c
+++ b/fs/dlm/memory.c
@@ -11,9 +11,11 @@
 
 #include "dlm_internal.h"
 #include "midcomms.h"
+#include "lowcomms.h"
 #include "config.h"
 #include "memory.h"
 
+static struct kmem_cache *writequeue_cache;
 static struct kmem_cache *mhandle_cache;
 static struct kmem_cache *lkb_cache;
 static struct kmem_cache *rsb_cache;
@@ -21,9 +23,13 @@ static struct kmem_cache *rsb_cache;
 
 int __init dlm_memory_init(void)
 {
+	writequeue_cache = dlm_lowcomms_writequeue_cache_create();
+	if (!writequeue_cache)
+		goto out;
+
 	mhandle_cache = dlm_midcomms_cache_create();
 	if (!mhandle_cache)
-		goto out;
+		goto mhandle;
 
 	lkb_cache = kmem_cache_create("dlm_lkb", sizeof(struct dlm_lkb),
 				__alignof__(struct dlm_lkb), 0, NULL);
@@ -41,12 +47,15 @@ int __init dlm_memory_init(void)
 	kmem_cache_destroy(lkb_cache);
 lkb:
 	kmem_cache_destroy(mhandle_cache);
+mhandle:
+	kmem_cache_destroy(writequeue_cache);
 out:
 	return -ENOMEM;
 }
 
 void dlm_memory_exit(void)
 {
+	kmem_cache_destroy(writequeue_cache);
 	kmem_cache_destroy(mhandle_cache);
 	kmem_cache_destroy(lkb_cache);
 	kmem_cache_destroy(rsb_cache);
@@ -110,3 +119,13 @@ void dlm_free_mhandle(struct dlm_mhandle *mhandle)
 {
 	kmem_cache_free(mhandle_cache, mhandle);
 }
+
+struct writequeue_entry *dlm_allocate_writequeue(void)
+{
+	return kmem_cache_alloc(writequeue_cache, GFP_ATOMIC);
+}
+
+void dlm_free_writequeue(struct writequeue_entry *writequeue)
+{
+	kmem_cache_free(writequeue_cache, writequeue);
+}
diff --git a/fs/dlm/memory.h b/fs/dlm/memory.h
index c4d46be778a2..854269eacd44 100644
--- a/fs/dlm/memory.h
+++ b/fs/dlm/memory.h
@@ -22,6 +22,8 @@ char *dlm_allocate_lvb(struct dlm_ls *ls);
 void dlm_free_lvb(char *l);
 struct dlm_mhandle *dlm_allocate_mhandle(void);
 void dlm_free_mhandle(struct dlm_mhandle *mhandle);
+struct writequeue_entry *dlm_allocate_writequeue(void);
+void dlm_free_writequeue(struct writequeue_entry *writequeue);
 
 #endif		/* __MEMORY_DOT_H__ */
 
-- 
2.27.0



^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [Cluster-devel] [PATCH dlm-next 7/7] fs: dlm: memory cache for lowcomms hotpath
  2021-11-30 19:47 [Cluster-devel] [PATCH dlm-next 1/7] fs: dlm: use list_empty() to check last iteration Alexander Aring
                   ` (4 preceding siblings ...)
  2021-11-30 19:47 ` [Cluster-devel] [PATCH dlm-next 6/7] fs: dlm: memory cache for writequeue_entry Alexander Aring
@ 2021-11-30 19:47 ` Alexander Aring
  5 siblings, 0 replies; 7+ messages in thread
From: Alexander Aring @ 2021-11-30 19:47 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch introduces a kmem cache for dlm_msg handles which are used
always if dlm sends a message out. Even if their are covered by midcomms
layer or not.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 13 ++++++++++---
 fs/dlm/lowcomms.h |  1 +
 fs/dlm/memory.c   | 18 ++++++++++++++++++
 fs/dlm/memory.h   |  2 ++
 4 files changed, 31 insertions(+), 3 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 300f44c5d132..23a1ff690725 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -204,6 +204,11 @@ struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void)
 				 0, 0, writequeue_entry_ctor);
 }
 
+struct kmem_cache *dlm_lowcomms_msg_cache_create(void)
+{
+	return kmem_cache_create("dlm_msg", sizeof(struct dlm_msg), 0, 0, NULL);
+}
+
 /* need to held writequeue_lock */
 static struct writequeue_entry *con_next_wq(struct connection *con)
 {
@@ -750,7 +755,7 @@ static void dlm_msg_release(struct kref *kref)
 	struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref);
 
 	kref_put(&msg->entry->ref, dlm_page_release);
-	kfree(msg);
+	dlm_free_msg(msg);
 }
 
 static void free_entry(struct writequeue_entry *e)
@@ -1259,7 +1264,7 @@ static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
 	struct writequeue_entry *e;
 	struct dlm_msg *msg;
 
-	msg = kzalloc(sizeof(*msg), allocation);
+	msg = dlm_allocate_msg(allocation);
 	if (!msg)
 		return NULL;
 
@@ -1267,10 +1272,12 @@ static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
 
 	e = new_wq_entry(con, len, ppc, cb, data);
 	if (!e) {
-		kfree(msg);
+		dlm_free_msg(msg);
 		return NULL;
 	}
 
+	msg->retransmit = false;
+	msg->orig_msg = NULL;
 	msg->ppc = *ppc;
 	msg->len = len;
 	msg->entry = e;
diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h
index 6c8f4ce457f0..29369feea991 100644
--- a/fs/dlm/lowcomms.h
+++ b/fs/dlm/lowcomms.h
@@ -48,6 +48,7 @@ int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark);
 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len);
 void dlm_midcomms_receive_done(int nodeid);
 struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void);
+struct kmem_cache *dlm_lowcomms_msg_cache_create(void);
 
 #endif				/* __LOWCOMMS_DOT_H__ */
 
diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c
index 94af986e83c6..ce35c3c19aeb 100644
--- a/fs/dlm/memory.c
+++ b/fs/dlm/memory.c
@@ -17,6 +17,7 @@
 
 static struct kmem_cache *writequeue_cache;
 static struct kmem_cache *mhandle_cache;
+static struct kmem_cache *msg_cache;
 static struct kmem_cache *lkb_cache;
 static struct kmem_cache *rsb_cache;
 
@@ -36,6 +37,10 @@ int __init dlm_memory_init(void)
 	if (!lkb_cache)
 		goto lkb;
 
+	msg_cache = dlm_lowcomms_msg_cache_create();
+	if (!msg_cache)
+		goto msg;
+
 	rsb_cache = kmem_cache_create("dlm_rsb", sizeof(struct dlm_rsb),
 				__alignof__(struct dlm_rsb), 0, NULL);
 	if (!rsb_cache)
@@ -44,6 +49,8 @@ int __init dlm_memory_init(void)
 	return 0;
 
 rsb:
+	kmem_cache_destroy(msg_cache);
+msg:
 	kmem_cache_destroy(lkb_cache);
 lkb:
 	kmem_cache_destroy(mhandle_cache);
@@ -57,6 +64,7 @@ void dlm_memory_exit(void)
 {
 	kmem_cache_destroy(writequeue_cache);
 	kmem_cache_destroy(mhandle_cache);
+	kmem_cache_destroy(msg_cache);
 	kmem_cache_destroy(lkb_cache);
 	kmem_cache_destroy(rsb_cache);
 }
@@ -129,3 +137,13 @@ void dlm_free_writequeue(struct writequeue_entry *writequeue)
 {
 	kmem_cache_free(writequeue_cache, writequeue);
 }
+
+struct dlm_msg *dlm_allocate_msg(gfp_t allocation)
+{
+	return kmem_cache_alloc(msg_cache, allocation);
+}
+
+void dlm_free_msg(struct dlm_msg *msg)
+{
+	kmem_cache_free(msg_cache, msg);
+}
diff --git a/fs/dlm/memory.h b/fs/dlm/memory.h
index 854269eacd44..7bd3f1a391ca 100644
--- a/fs/dlm/memory.h
+++ b/fs/dlm/memory.h
@@ -24,6 +24,8 @@ struct dlm_mhandle *dlm_allocate_mhandle(void);
 void dlm_free_mhandle(struct dlm_mhandle *mhandle);
 struct writequeue_entry *dlm_allocate_writequeue(void);
 void dlm_free_writequeue(struct writequeue_entry *writequeue);
+struct dlm_msg *dlm_allocate_msg(gfp_t allocation);
+void dlm_free_msg(struct dlm_msg *msg);
 
 #endif		/* __MEMORY_DOT_H__ */
 
-- 
2.27.0



^ permalink raw reply related	[flat|nested] 7+ messages in thread

end of thread, other threads:[~2021-11-30 19:47 UTC | newest]

Thread overview: 7+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-11-30 19:47 [Cluster-devel] [PATCH dlm-next 1/7] fs: dlm: use list_empty() to check last iteration Alexander Aring
2021-11-30 19:47 ` [Cluster-devel] [PATCH dlm-next 2/7] fs: dlm: check for pending users filling buffers Alexander Aring
2021-11-30 19:47 ` [Cluster-devel] [PATCH dlm-next 3/7] fs: dlm: use event based wait for pending remove Alexander Aring
2021-11-30 19:47 ` [Cluster-devel] [PATCH dlm-next 4/7] fs: dlm: remove wq_alloc mutex Alexander Aring
2021-11-30 19:47 ` [Cluster-devel] [PATCH dlm-next 5/7] fs: dlm: memory cache for midcomms hotpath Alexander Aring
2021-11-30 19:47 ` [Cluster-devel] [PATCH dlm-next 6/7] fs: dlm: memory cache for writequeue_entry Alexander Aring
2021-11-30 19:47 ` [Cluster-devel] [PATCH dlm-next 7/7] fs: dlm: memory cache for lowcomms hotpath Alexander Aring

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.