linux-nfs.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH 00/10] SUNRPC thread management changes
@ 2023-08-30  2:54 NeilBrown
  2023-08-30  2:54 ` [PATCH 01/10] SQUASH: revise comments in SUNRPC: change service idle list to be an llist NeilBrown
                   ` (9 more replies)
  0 siblings, 10 replies; 19+ messages in thread
From: NeilBrown @ 2023-08-30  2:54 UTC (permalink / raw)
  To: Chuck Lever, Jeff Layton; +Cc: linux-nfs

Here is an updated set of patches against topic-sunrpc-thread-scheduling.

There are two patches to be squashed into the current top of that topic,
though an llist patch needs to be moved earlier for the second to work.

All the llist changes are no in separate patches.  lwq is now added to lib/

NeilBrown


 [PATCH 01/10] SQUASH: revise comments in SUNRPC: change service idle
 [PATCH 02/10] llist: add interface to check if a node is on a list.
 [PATCH 03/10] SQUASH use new llist interfaces in SUNRPC: change
 [PATCH 04/10] llist: add llist_del_first_this()
 [PATCH 05/10] lib: add light-weight queuing mechanism.
 [PATCH 06/10] SUNRPC: only have one thread waking up at a time
 [PATCH 07/10] SUNRPC: use lwq for sp_sockets - renamed to sp_xprts
 [PATCH 08/10] SUNRPC: change sp_nrthreads to atomic_t
 [PATCH 09/10] SUNRPC: discard sp_lock
 [PATCH 10/10] SUNRPC: change the back-channel queue to lwq


^ permalink raw reply	[flat|nested] 19+ messages in thread

* [PATCH 01/10] SQUASH: revise comments in SUNRPC: change service idle list to be an llist
  2023-08-30  2:54 [PATCH 00/10] SUNRPC thread management changes NeilBrown
@ 2023-08-30  2:54 ` NeilBrown
  2023-08-30  2:54 ` [PATCH 02/10] llist: add interface to check if a node is on a list NeilBrown
                   ` (8 subsequent siblings)
  9 siblings, 0 replies; 19+ messages in thread
From: NeilBrown @ 2023-08-30  2:54 UTC (permalink / raw)
  To: Chuck Lever, Jeff Layton; +Cc: linux-nfs

Revise some comments to hopefully make the more clear and less verbose.

Signed-off-by: NeilBrown <neilb@suse.de>
---
 include/linux/sunrpc/svc.h |  6 +++---
 net/sunrpc/svc_xprt.c      | 20 +++++++-------------
 2 files changed, 10 insertions(+), 16 deletions(-)

diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
index 5216f95411e3..ed20a2ea1f81 100644
--- a/include/linux/sunrpc/svc.h
+++ b/include/linux/sunrpc/svc.h
@@ -271,7 +271,7 @@ enum {
  * @rqstp: the thread which is now busy
  *
  * By convention a thread is busy if rq_idle.next points to rq_idle.
- * This ensures it is not on the idle list.
+ * This will never be the case for threads on the idle list.
  */
 static inline void svc_thread_set_busy(struct svc_rqst *rqstp)
 {
@@ -283,9 +283,9 @@ static inline void svc_thread_set_busy(struct svc_rqst *rqstp)
  * @rqstp: the thread which might be busy
  *
  * By convention a thread is busy if rq_idle.next points to rq_idle.
- * This ensures it is not on the idle list.
+ * This will never be the case for threads on the idle list.
  */
-static inline bool svc_thread_busy(struct svc_rqst *rqstp)
+static inline bool svc_thread_busy(const struct svc_rqst *rqstp)
 {
 	return rqstp->rq_idle.next == &rqstp->rq_idle;
 }
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
index 81327001e074..17c43bde35c9 100644
--- a/net/sunrpc/svc_xprt.c
+++ b/net/sunrpc/svc_xprt.c
@@ -734,22 +734,16 @@ static void svc_rqst_wait_for_work(struct svc_rqst *rqstp)
 		llist_add(&rqstp->rq_idle, &pool->sp_idle_threads);
 
 		if (unlikely(!rqst_should_sleep(rqstp)))
-			/* maybe there were no idle threads when some work
-			 * became ready and so nothing was woken.  We've just
-			 * become idle so someone can to the work - maybe us.
-			 * But we cannot reliably remove ourselves from the
-			 * idle list - we can only remove the first task which
-			 * might be us, and might not.
-			 * So remove and wake it, then schedule().  If it was
-			 * us, we won't sleep.  If it is some other thread, they
-			 * will do the work.
+			/* Work just became available.  This thread cannot simply
+			 * choose not to sleep as it *must* wait until removed.
+			 * So wake the first waiter - whether it is this
+			 * thread or some other, it will get the work done.
 			 */
 			svc_pool_wake_idle_thread(pool);
 
-		/* We mustn't continue while on the idle list, and we
-		 * cannot remove outselves reliably.  The only "work"
-		 * we can do while on the idle list is to freeze.
-		 * So loop until someone removes us
+		/* Since a thread cannot remove itself from an llist,
+		 * schedule until someone else removes @rqstp from
+		 * the idle list.
 		 */
 		while (!svc_thread_busy(rqstp)) {
 			schedule();
-- 
2.41.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 02/10] llist: add interface to check if a node is on a list.
  2023-08-30  2:54 [PATCH 00/10] SUNRPC thread management changes NeilBrown
  2023-08-30  2:54 ` [PATCH 01/10] SQUASH: revise comments in SUNRPC: change service idle list to be an llist NeilBrown
@ 2023-08-30  2:54 ` NeilBrown
  2023-08-30  2:54 ` [PATCH 03/10] SQUASH use new llist interfaces in SUNRPC: change service idle list to be an llist NeilBrown
                   ` (7 subsequent siblings)
  9 siblings, 0 replies; 19+ messages in thread
From: NeilBrown @ 2023-08-30  2:54 UTC (permalink / raw)
  To: Chuck Lever, Jeff Layton; +Cc: linux-nfs

With list.h lists, it is easy to test if a node is on a list, providing
it was initialised and that it is removed with list_del_init().

This patch provides similar functionality for llist.h lists.

 init_llist_node()
marks a node as being not-on-any-list be setting the ->next pointer to
the node itself.
 llist_on_list()
tests if the node is on any list.
 llist_del_first_init()
remove the first element from a llist, and marks it as being off-list.

Signed-off-by: NeilBrown <neilb@suse.de>
---
 include/linux/llist.h | 42 ++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 42 insertions(+)

diff --git a/include/linux/llist.h b/include/linux/llist.h
index 85bda2d02d65..dcb91e3bac1c 100644
--- a/include/linux/llist.h
+++ b/include/linux/llist.h
@@ -73,6 +73,33 @@ static inline void init_llist_head(struct llist_head *list)
 	list->first = NULL;
 }
 
+/**
+ * init_llist_node - initialize lock-less list node
+ * @node:	the node to be initialised
+ *
+ * In cases where there is a need to test if a node is on
+ * a list or not, this initialises the node to clearly
+ * not be on any list.
+ */
+static inline void init_llist_node(struct llist_node *node)
+{
+	node->next = node;
+}
+
+/**
+ * llist_on_list - test if a lock-list list node is on a list
+ * @node:	the node to test
+ *
+ * When a node is on a list the ->next pointer will be NULL or
+ * some other node.  It can never point to itself.  We use that
+ * in init_llist_node() to record that a node is not on any list,
+ * and here to test whether it is on any list.
+ */
+static inline bool llist_on_list(const struct llist_node *node)
+{
+	return node->next != node;
+}
+
 /**
  * llist_entry - get the struct of this entry
  * @ptr:	the &struct llist_node pointer.
@@ -249,6 +276,21 @@ static inline struct llist_node *__llist_del_all(struct llist_head *head)
 
 extern struct llist_node *llist_del_first(struct llist_head *head);
 
+/**
+ * llist_del_first_init - delete first entry from lock-list and mark is as being off-list
+ * @head:	the head of lock-less list to delete from.
+ *
+ * This behave the same as llist_del_first() except that llist_init_node() is called
+ * on the returned node so that llist_on_list() will report false for the node.
+ */
+static inline struct llist_node *llist_del_first_init(struct llist_head *head)
+{
+	struct llist_node *n = llist_del_first(head);
+
+	if (n)
+		init_llist_node(n);
+	return n;
+}
 struct llist_node *llist_reverse_order(struct llist_node *head);
 
 #endif /* LLIST_H */
-- 
2.41.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 03/10] SQUASH use new llist interfaces in SUNRPC: change service idle list to be an llist
  2023-08-30  2:54 [PATCH 00/10] SUNRPC thread management changes NeilBrown
  2023-08-30  2:54 ` [PATCH 01/10] SQUASH: revise comments in SUNRPC: change service idle list to be an llist NeilBrown
  2023-08-30  2:54 ` [PATCH 02/10] llist: add interface to check if a node is on a list NeilBrown
@ 2023-08-30  2:54 ` NeilBrown
  2023-08-30  2:54 ` [PATCH 04/10] llist: add llist_del_first_this() NeilBrown
                   ` (6 subsequent siblings)
  9 siblings, 0 replies; 19+ messages in thread
From: NeilBrown @ 2023-08-30  2:54 UTC (permalink / raw)
  To: Chuck Lever, Jeff Layton; +Cc: linux-nfs

Use init_llist_node, llist_on_list etc for checking if a node is on a
llist.
Discard svc_thread_set_busy() completely and simplify svc_thread_busy()

This can only be squashed if the patch to llist is moved earlier in the
topic.

Signed-off-by: NeilBrown <neilb@suse.de>
---
 include/linux/sunrpc/svc.h | 17 ++---------------
 net/sunrpc/svc.c           |  5 ++---
 2 files changed, 4 insertions(+), 18 deletions(-)

diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
index ed20a2ea1f81..ad4572630335 100644
--- a/include/linux/sunrpc/svc.h
+++ b/include/linux/sunrpc/svc.h
@@ -266,28 +266,15 @@ enum {
 	RQ_DATA,		/* request has data */
 };
 
-/**
- * svc_thread_set_busy - mark a thread as busy
- * @rqstp: the thread which is now busy
- *
- * By convention a thread is busy if rq_idle.next points to rq_idle.
- * This will never be the case for threads on the idle list.
- */
-static inline void svc_thread_set_busy(struct svc_rqst *rqstp)
-{
-	rqstp->rq_idle.next = &rqstp->rq_idle;
-}
-
 /**
  * svc_thread_busy - check if a thread as busy
  * @rqstp: the thread which might be busy
  *
- * By convention a thread is busy if rq_idle.next points to rq_idle.
- * This will never be the case for threads on the idle list.
+ * A thread is only busy when it is not an the idle list.
  */
 static inline bool svc_thread_busy(const struct svc_rqst *rqstp)
 {
-	return rqstp->rq_idle.next == &rqstp->rq_idle;
+	return !llist_on_list(&rqstp->rq_idle);
 }
 
 #define SVC_NET(rqst) (rqst->rq_xprt ? rqst->rq_xprt->xpt_net : rqst->rq_bc_net)
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index addbf28ea50a..5673f30db295 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -642,7 +642,7 @@ svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node)
 
 	folio_batch_init(&rqstp->rq_fbatch);
 
-	svc_thread_set_busy(rqstp);
+	init_llist_node(&rqstp->rq_idle);
 	rqstp->rq_server = serv;
 	rqstp->rq_pool = pool;
 
@@ -705,11 +705,10 @@ void svc_pool_wake_idle_thread(struct svc_pool *pool)
 
 	rcu_read_lock();
 	spin_lock_bh(&pool->sp_lock);
-	ln = llist_del_first(&pool->sp_idle_threads);
+	ln = llist_del_first_init(&pool->sp_idle_threads);
 	spin_unlock_bh(&pool->sp_lock);
 	if (ln) {
 		rqstp = llist_entry(ln, struct svc_rqst, rq_idle);
-		svc_thread_set_busy(rqstp);
 
 		WRITE_ONCE(rqstp->rq_qtime, ktime_get());
 		wake_up_process(rqstp->rq_task);
-- 
2.41.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 04/10] llist: add llist_del_first_this()
  2023-08-30  2:54 [PATCH 00/10] SUNRPC thread management changes NeilBrown
                   ` (2 preceding siblings ...)
  2023-08-30  2:54 ` [PATCH 03/10] SQUASH use new llist interfaces in SUNRPC: change service idle list to be an llist NeilBrown
@ 2023-08-30  2:54 ` NeilBrown
  2023-08-30  2:54 ` [PATCH 05/10] lib: add light-weight queuing mechanism NeilBrown
                   ` (5 subsequent siblings)
  9 siblings, 0 replies; 19+ messages in thread
From: NeilBrown @ 2023-08-30  2:54 UTC (permalink / raw)
  To: Chuck Lever, Jeff Layton; +Cc: linux-nfs

llist_del_first_this() deletes a specific entry from an llist, providing
it is at the head of the list.  Multiple threads can call this
concurrently providing they each offer a different entry.

This can be uses for a set of worker threads which are on the llist when
they are idle.  The head can always be woken, and when it is woken it
can remove itself, and possibly wake the next if there is an excess of
work to do.

Signed-off-by: NeilBrown <neilb@suse.de>
---
 include/linux/llist.h |  4 ++++
 lib/llist.c           | 28 ++++++++++++++++++++++++++++
 2 files changed, 32 insertions(+)

diff --git a/include/linux/llist.h b/include/linux/llist.h
index dcb91e3bac1c..2c982ff7475a 100644
--- a/include/linux/llist.h
+++ b/include/linux/llist.h
@@ -291,6 +291,10 @@ static inline struct llist_node *llist_del_first_init(struct llist_head *head)
 		init_llist_node(n);
 	return n;
 }
+
+extern bool llist_del_first_this(struct llist_head *head,
+				 struct llist_node *this);
+
 struct llist_node *llist_reverse_order(struct llist_node *head);
 
 #endif /* LLIST_H */
diff --git a/lib/llist.c b/lib/llist.c
index 6e668fa5a2c6..f21d0cfbbaaa 100644
--- a/lib/llist.c
+++ b/lib/llist.c
@@ -65,6 +65,34 @@ struct llist_node *llist_del_first(struct llist_head *head)
 }
 EXPORT_SYMBOL_GPL(llist_del_first);
 
+/**
+ * llist_del_first_this - delete given entry of lock-less list if it is first
+ * @head:	the head for your lock-less list
+ * @this:	a list entry.
+ *
+ * If head of the list is given entry, delete and return %true else
+ * return %false.
+ *
+ * Multiple callers can safely call this concurrently with multiple
+ * llist_add() callers, providing all the callers offer a different @this.
+ */
+bool llist_del_first_this(struct llist_head *head,
+			  struct llist_node *this)
+{
+	struct llist_node *entry, *next;
+
+	/* acquire ensures orderig wrt try_cmpxchg() is llist_del_first() */
+	entry = smp_load_acquire(&head->first);
+	do {
+		if (entry != this)
+			return false;
+		next = READ_ONCE(entry->next);
+	} while (!try_cmpxchg(&head->first, &entry, next));
+
+	return true;
+}
+EXPORT_SYMBOL_GPL(llist_del_first_this);
+
 /**
  * llist_reverse_order - reverse order of a llist chain
  * @head:	first item of the list to be reversed
-- 
2.41.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 05/10] lib: add light-weight queuing mechanism.
  2023-08-30  2:54 [PATCH 00/10] SUNRPC thread management changes NeilBrown
                   ` (3 preceding siblings ...)
  2023-08-30  2:54 ` [PATCH 04/10] llist: add llist_del_first_this() NeilBrown
@ 2023-08-30  2:54 ` NeilBrown
  2023-08-30 15:21   ` Chuck Lever
                     ` (2 more replies)
  2023-08-30  2:54 ` [PATCH 06/10] SUNRPC: only have one thread waking up at a time NeilBrown
                   ` (4 subsequent siblings)
  9 siblings, 3 replies; 19+ messages in thread
From: NeilBrown @ 2023-08-30  2:54 UTC (permalink / raw)
  To: Chuck Lever, Jeff Layton; +Cc: linux-nfs

lwq is a FIFO single-linked queue that only requires a spinlock
for dequeueing, which happens in process context.  Enqueueing is atomic
with no spinlock and can happen in any context.

Include a unit test for basic functionality - runs at boot time.  Does
not use kunit framework.

Signed-off-by: NeilBrown <neilb@suse.de>
---
 include/linux/lwq.h | 120 +++++++++++++++++++++++++++++++++++
 lib/Kconfig         |   5 ++
 lib/Makefile        |   2 +-
 lib/lwq.c           | 149 ++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 275 insertions(+), 1 deletion(-)
 create mode 100644 include/linux/lwq.h
 create mode 100644 lib/lwq.c

diff --git a/include/linux/lwq.h b/include/linux/lwq.h
new file mode 100644
index 000000000000..52b9c81b493a
--- /dev/null
+++ b/include/linux/lwq.h
@@ -0,0 +1,120 @@
+/* SPDX-License-Identifier: GPL-2.0-only */
+
+#ifndef LWQ_H
+#define LWQ_H
+/*
+ * light-weight single-linked queue built from llist
+ *
+ * Entries can be enqueued from any context with no locking.
+ * Entries can be dequeued from process context with integrated locking.
+ */
+#include <linux/container_of.h>
+#include <linux/spinlock.h>
+#include <linux/llist.h>
+
+struct lwq_node {
+	struct llist_node node;
+};
+
+struct lwq {
+	spinlock_t		lock;
+	struct llist_node	*ready;		/* entries to be dequeued */
+	struct llist_head	new;		/* entries being enqueued */
+};
+
+/**
+ * lwq_init - initialise a lwq
+ * @q:	the lwq object
+ */
+static inline void lwq_init(struct lwq *q)
+{
+	spin_lock_init(&q->lock);
+	q->ready = NULL;
+	init_llist_head(&q->new);
+}
+
+/**
+ * lwq_empty - test if lwq contains any entry
+ * @q:	the lwq object
+ *
+ * This empty test contains an acquire barrier so that if a wakeup
+ * is sent when lwq_dequeue returns true, it is safe to go to sleep after
+ * a test on lwq_empty().
+ */
+static inline bool lwq_empty(struct lwq *q)
+{
+	/* acquire ensures ordering wrt lwq_enqueue() */
+	return smp_load_acquire(&q->ready) == NULL && llist_empty(&q->new);
+}
+
+struct llist_node *__lwq_dequeue(struct lwq *q);
+/**
+ * lwq_dequeue - dequeue first (oldest) entry from lwq
+ * @q:		the queue to dequeue from
+ * @type:	the type of object to return
+ * @member:	them member in returned object which is an lwq_node.
+ *
+ * Remove a single object from the lwq and return it.  This will take
+ * a spinlock and so must always be called in the same context, typcially
+ * process contet.
+ */
+#define lwq_dequeue(q, type, member)					\
+	({ struct llist_node *_n = __lwq_dequeue(q);			\
+	  _n ? container_of(_n, type, member.node) : NULL; })
+
+struct llist_node *lwq_dequeue_all(struct lwq *q);
+
+/**
+ * lwq_for_each_safe - iterate over detached queue allowing deletion
+ * @_n:		iterator variable
+ * @_t1:	temporary struct llist_node **
+ * @_t2:	temporary struct llist_node *
+ * @_l:		address of llist_node pointer from lwq_dequeue_all()
+ * @_member:	member in _n where lwq_node is found.
+ *
+ * Iterate over members in a dequeued list.  If the iterator variable
+ * is set to NULL, the iterator removes that entry from the queue.
+ */
+#define lwq_for_each_safe(_n, _t1, _t2, _l, _member)			\
+	for (_t1 = (_l);						\
+	     *(_t1) ? (_n = container_of(*(_t1), typeof(*(_n)), _member.node),\
+		       _t2 = ((*_t1)->next),				\
+		       true)						\
+	     : false;							\
+	     (_n) ? (_t1 = &(_n)->_member.node.next, 0)			\
+	     : ((*(_t1) = (_t2)),  0))
+
+/**
+ * lwq_enqueue - add a new item to the end of the queue
+ * @n	- the lwq_node embedded in the item to be added
+ * @q	- the lwq to append to.
+ *
+ * No locking is needed to append to the queue so this can
+ * be called from any context.
+ * Return %true is the list may have previously been empty.
+ */
+static inline bool lwq_enqueue(struct lwq_node *n, struct lwq *q)
+{
+	/* acquire enqures ordering wrt lwq_dequeue */
+	return llist_add(&n->node, &q->new) &&
+		smp_load_acquire(&q->ready) == NULL;
+}
+
+/**
+ * lwq_enqueue_batch - add a list of new items to the end of the queue
+ * @n	- the lwq_node embedded in the first item to be added
+ * @q	- the lwq to append to.
+ *
+ * No locking is needed to append to the queue so this can
+ * be called from any context.
+ * Return %true is the list may have previously been empty.
+ */
+static inline bool lwq_enqueue_batch(struct llist_node *n, struct lwq *q)
+{
+	struct llist_node *e = n;
+
+	/* acquire enqures ordering wrt lwq_dequeue */
+	return llist_add_batch(llist_reverse_order(n), e, &q->new) &&
+		smp_load_acquire(&q->ready) == NULL;
+}
+#endif /* LWQ_H */
diff --git a/lib/Kconfig b/lib/Kconfig
index 5c2da561c516..6620bdba4f94 100644
--- a/lib/Kconfig
+++ b/lib/Kconfig
@@ -763,3 +763,8 @@ config ASN1_ENCODER
 
 config POLYNOMIAL
        tristate
+
+config LWQ_TEST
+	bool "RPC: enable boot-time test for lwq queuing"
+	help
+          Enable boot-time test of lwq functionality.
diff --git a/lib/Makefile b/lib/Makefile
index 1ffae65bb7ee..4b67c2d6af62 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -45,7 +45,7 @@ obj-y	+= lockref.o
 obj-y += bcd.o sort.o parser.o debug_locks.o random32.o \
 	 bust_spinlocks.o kasprintf.o bitmap.o scatterlist.o \
 	 list_sort.o uuid.o iov_iter.o clz_ctz.o \
-	 bsearch.o find_bit.o llist.o memweight.o kfifo.o \
+	 bsearch.o find_bit.o llist.o lwq.o memweight.o kfifo.o \
 	 percpu-refcount.o rhashtable.o base64.o \
 	 once.o refcount.o rcuref.o usercopy.o errseq.o bucket_locks.o \
 	 generic-radix-tree.o
diff --git a/lib/lwq.c b/lib/lwq.c
new file mode 100644
index 000000000000..d6be6dda3867
--- /dev/null
+++ b/lib/lwq.c
@@ -0,0 +1,149 @@
+// SPDX-License-Identifier: GPL-2.0-only
+/*
+ * Light weight single-linked queue.
+ *
+ * Entries are enqueued to the head of an llist, with no blocking.
+ * This can happen in any context.
+ *
+ * Entries are dequeued using a spinlock to protect against
+ * multiple access.  The llist is staged in reverse order, and refreshed
+ * from the llist when it exhausts.
+ */
+#include <linux/rcupdate.h>
+#include <linux/lwq.h>
+
+struct llist_node *__lwq_dequeue(struct lwq *q)
+{
+	struct llist_node *this;
+
+	if (lwq_empty(q))
+		return NULL;
+	spin_lock(&q->lock);
+	this = q->ready;
+	if (!this && !llist_empty(&q->new)) {
+		/* ensure queue doesn't appear transiently lwq_empty */
+		smp_store_release(&q->ready, (void *)1);
+		this = llist_reverse_order(llist_del_all(&q->new));
+		if (!this)
+			q->ready = NULL;
+	}
+	if (this)
+		q->ready = llist_next(this);
+	spin_unlock(&q->lock);
+	return this;
+}
+
+/**
+ * lwq_dequeue_all - dequeue all currently enqueued objects
+ * @q:	the queue to dequeue from
+ *
+ * Remove and return a linked list of llist_nodes of all the objects that were
+ * in the queue. The first on the list will be the object that was least
+ * recently enqueued.
+ */
+struct llist_node *lwq_dequeue_all(struct lwq *q)
+{
+	struct llist_node *r, *t, **ep;
+
+	if (lwq_empty(q))
+		return NULL;
+
+	spin_lock(&q->lock);
+	r = q->ready;
+	q->ready = NULL;
+	t = llist_del_all(&q->new);
+	spin_unlock(&q->lock);
+	ep = &r;
+	while (*ep)
+		ep = &(*ep)->next;
+	*ep = llist_reverse_order(t);
+	return r;
+}
+
+#if IS_ENABLED(CONFIG_LWQ_TEST)
+
+#include <linux/module.h>
+#include <linux/slab.h>
+#include <linux/wait_bit.h>
+#include <linux/kthread.h>
+#include <linux/delay.h>
+struct tnode {
+	struct lwq_node n;
+	int i;
+	int c;
+};
+
+static int lwq_exercise(void *qv)
+{
+	struct lwq *q = qv;
+	int cnt;
+	struct tnode *t;
+
+	for (cnt = 0; cnt < 10000; cnt++) {
+		wait_var_event(q, (t = lwq_dequeue(q, struct tnode, n)) != NULL);
+		t->c++;
+		if (lwq_enqueue(&t->n, q))
+			wake_up_var(q);
+	}
+	while (!kthread_should_stop())
+		schedule_timeout_idle(1);
+	return 0;
+}
+
+static int lwq_test(void)
+{
+	int i;
+	struct lwq q;
+	struct llist_node *l, **t1, *t2;
+	struct tnode *t;
+	struct task_struct *threads[8];
+
+	printk(KERN_INFO "testing lwq....\n");
+	lwq_init(&q);
+	printk(KERN_INFO " lwq: run some threads\n");
+	for (i = 0; i < ARRAY_SIZE(threads); i++)
+		threads[i] = kthread_run(lwq_exercise, &q, "lwq-test-%d", i);
+	for (i = 0; i < 100; i++) {
+		t = kmalloc(sizeof(*t), GFP_KERNEL);
+		t->i = i;
+		t->c = 0;
+		if (lwq_enqueue(&t->n, &q))
+			wake_up_var(&q);
+	};
+	/* wait for threads to exit */
+	for (i = 0; i < ARRAY_SIZE(threads); i++)
+		if (!IS_ERR_OR_NULL(threads[i]))
+			kthread_stop(threads[i]);
+	printk(KERN_INFO " lwq: dequeue first 50:");
+	for (i = 0; i < 50 ; i++) {
+		if (i && (i % 10) == 0) {
+			printk(KERN_CONT "\n");
+			printk(KERN_INFO " lwq: ... ");
+		}
+		t = lwq_dequeue(&q, struct tnode, n);
+		printk(KERN_CONT " %d(%d)", t->i, t->c);
+		kfree(t);
+	}
+	printk(KERN_CONT "\n");
+	l = lwq_dequeue_all(&q);
+	printk(KERN_INFO " lwq: delete the multiples of 3 (test lwq_for_each_safe())\n");
+	lwq_for_each_safe(t, t1, t2, &l, n) {
+		if ((t->i % 3) == 0) {
+			t->i = -1;
+			kfree(t);
+			t = NULL;
+		}
+	}
+	if (l)
+		lwq_enqueue_batch(l, &q);
+	printk(KERN_INFO " lwq: dequeue remaining:");
+	while ((t = lwq_dequeue(&q, struct tnode, n)) != NULL) {
+		printk(KERN_CONT " %d", t->i);
+		kfree(t);
+	}
+	printk(KERN_CONT "\n");
+	return 0;
+}
+
+module_init(lwq_test);
+#endif /* CONFIG_LWQ_TEST*/
-- 
2.41.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 06/10] SUNRPC: only have one thread waking up at a time
  2023-08-30  2:54 [PATCH 00/10] SUNRPC thread management changes NeilBrown
                   ` (4 preceding siblings ...)
  2023-08-30  2:54 ` [PATCH 05/10] lib: add light-weight queuing mechanism NeilBrown
@ 2023-08-30  2:54 ` NeilBrown
  2023-08-30 15:28   ` Chuck Lever
  2023-08-30  2:54 ` [PATCH 07/10] SUNRPC: use lwq for sp_sockets - renamed to sp_xprts NeilBrown
                   ` (3 subsequent siblings)
  9 siblings, 1 reply; 19+ messages in thread
From: NeilBrown @ 2023-08-30  2:54 UTC (permalink / raw)
  To: Chuck Lever, Jeff Layton; +Cc: linux-nfs

Currently if several items of work become available in quick succession,
that number of threads (if available) will be woken.  By the time some
of them wake up another thread that was already cache-warm might have
come along and completed the work.  Anecdotal evidence suggests as many
as 15% of wakes find nothing to do once they get to the point of
looking.

This patch changes svc_pool_wake_idle_thread() to wake the first thread
on the queue but NOT remove it.  Subsequent calls will wake the same
thread.  Once that thread starts it will dequeue itself and after
dequeueing some work to do, it will wake the next thread if there is more
work ready.  This results in a more orderly increase in the number of
busy threads.

As a bonus, this allows us to reduce locking around the idle queue.
svc_pool_wake_idle_thread() no longer needs to take a lock (beyond
rcu_read_lock()) as it doesn't manipulate the queue, it just looks at
the first item.

The thread itself can avoid locking by using the new
llist_del_first_this() interface.  This will safely remove the thread
itself if it is the head.  If it isn't the head, it will do nothing.
If multiple threads call this concurrently only one will succeed.  The
others will do nothing, so no corruption can result.

If a thread wakes up and finds that it cannot dequeue itself that means
either
- that it wasn't woken because it was the head of the queue.  Maybe the
  freezer woke it.  In that case it can go back to sleep (after trying
  to freeze of course).
- some other thread found there was nothing to do very recently, and
  placed itself on the head of the queue in front of this thread.
  It must check again after placing itself there, so it can be deemed to
  be responsible for any pending work, and this thread can go back to
  sleep until woken.

No code ever tests for busy threads any more.  Only each thread itself
cares if it is busy.  So svc_thread_busy() is no longer needed.

Signed-off-by: NeilBrown <neilb@suse.de>
---
 include/linux/sunrpc/svc.h | 11 -----------
 net/sunrpc/svc.c           | 14 ++++++--------
 net/sunrpc/svc_xprt.c      | 35 ++++++++++++++++++++++-------------
 3 files changed, 28 insertions(+), 32 deletions(-)

diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
index ad4572630335..dafa362b4fdd 100644
--- a/include/linux/sunrpc/svc.h
+++ b/include/linux/sunrpc/svc.h
@@ -266,17 +266,6 @@ enum {
 	RQ_DATA,		/* request has data */
 };
 
-/**
- * svc_thread_busy - check if a thread as busy
- * @rqstp: the thread which might be busy
- *
- * A thread is only busy when it is not an the idle list.
- */
-static inline bool svc_thread_busy(const struct svc_rqst *rqstp)
-{
-	return !llist_on_list(&rqstp->rq_idle);
-}
-
 #define SVC_NET(rqst) (rqst->rq_xprt ? rqst->rq_xprt->xpt_net : rqst->rq_bc_net)
 
 /*
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index 5673f30db295..3267d740235e 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -642,7 +642,6 @@ svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node)
 
 	folio_batch_init(&rqstp->rq_fbatch);
 
-	init_llist_node(&rqstp->rq_idle);
 	rqstp->rq_server = serv;
 	rqstp->rq_pool = pool;
 
@@ -704,17 +703,16 @@ void svc_pool_wake_idle_thread(struct svc_pool *pool)
 	struct llist_node *ln;
 
 	rcu_read_lock();
-	spin_lock_bh(&pool->sp_lock);
-	ln = llist_del_first_init(&pool->sp_idle_threads);
-	spin_unlock_bh(&pool->sp_lock);
+	ln = READ_ONCE(pool->sp_idle_threads.first);
 	if (ln) {
 		rqstp = llist_entry(ln, struct svc_rqst, rq_idle);
-
 		WRITE_ONCE(rqstp->rq_qtime, ktime_get());
-		wake_up_process(rqstp->rq_task);
+		if (!task_is_running(rqstp->rq_task)) {
+			wake_up_process(rqstp->rq_task);
+			trace_svc_wake_up(rqstp->rq_task->pid);
+			percpu_counter_inc(&pool->sp_threads_woken);
+		}
 		rcu_read_unlock();
-		percpu_counter_inc(&pool->sp_threads_woken);
-		trace_svc_wake_up(rqstp->rq_task->pid);
 		return;
 	}
 	rcu_read_unlock();
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
index 17c43bde35c9..a51570a4cbf2 100644
--- a/net/sunrpc/svc_xprt.c
+++ b/net/sunrpc/svc_xprt.c
@@ -732,20 +732,16 @@ static void svc_rqst_wait_for_work(struct svc_rqst *rqstp)
 	if (rqst_should_sleep(rqstp)) {
 		set_current_state(TASK_IDLE | TASK_FREEZABLE);
 		llist_add(&rqstp->rq_idle, &pool->sp_idle_threads);
+		if (likely(rqst_should_sleep(rqstp)))
+			schedule();
 
-		if (unlikely(!rqst_should_sleep(rqstp)))
-			/* Work just became available.  This thread cannot simply
-			 * choose not to sleep as it *must* wait until removed.
-			 * So wake the first waiter - whether it is this
-			 * thread or some other, it will get the work done.
+		while (!llist_del_first_this(&pool->sp_idle_threads,
+					     &rqstp->rq_idle)) {
+			/* Cannot @rqstp from idle list, so some other thread
+			 * must have queued itself after finding
+			 * no work to do, so they have taken responsibility
+			 * for any outstanding work.
 			 */
-			svc_pool_wake_idle_thread(pool);
-
-		/* Since a thread cannot remove itself from an llist,
-		 * schedule until someone else removes @rqstp from
-		 * the idle list.
-		 */
-		while (!svc_thread_busy(rqstp)) {
 			schedule();
 			set_current_state(TASK_IDLE | TASK_FREEZABLE);
 		}
@@ -835,6 +831,15 @@ static void svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt)
 	svc_xprt_release(rqstp);
 }
 
+static void wake_next(struct svc_rqst *rqstp)
+{
+	if (!rqst_should_sleep(rqstp))
+		/* More work pending after I dequeued some,
+		 * wake another worker
+		 */
+		svc_pool_wake_idle_thread(rqstp->rq_pool);
+}
+
 /**
  * svc_recv - Receive and process the next request on any transport
  * @rqstp: an idle RPC service thread
@@ -854,13 +859,16 @@ void svc_recv(struct svc_rqst *rqstp)
 
 	clear_bit(SP_TASK_PENDING, &pool->sp_flags);
 
-	if (svc_thread_should_stop(rqstp))
+	if (svc_thread_should_stop(rqstp)) {
+		wake_next(rqstp);
 		return;
+	}
 
 	rqstp->rq_xprt = svc_xprt_dequeue(pool);
 	if (rqstp->rq_xprt) {
 		struct svc_xprt *xprt = rqstp->rq_xprt;
 
+		wake_next(rqstp);
 		/* Normally we will wait up to 5 seconds for any required
 		 * cache information to be provided.  When there are no
 		 * idle threads, we reduce the wait time.
@@ -885,6 +893,7 @@ void svc_recv(struct svc_rqst *rqstp)
 		if (req) {
 			list_del(&req->rq_bc_list);
 			spin_unlock_bh(&serv->sv_cb_lock);
+			wake_next(rqstp);
 
 			svc_process_bc(req, rqstp);
 			return;
-- 
2.41.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 07/10] SUNRPC: use lwq for sp_sockets - renamed to sp_xprts
  2023-08-30  2:54 [PATCH 00/10] SUNRPC thread management changes NeilBrown
                   ` (5 preceding siblings ...)
  2023-08-30  2:54 ` [PATCH 06/10] SUNRPC: only have one thread waking up at a time NeilBrown
@ 2023-08-30  2:54 ` NeilBrown
  2023-08-30  2:54 ` [PATCH 08/10] SUNRPC: change sp_nrthreads to atomic_t NeilBrown
                   ` (2 subsequent siblings)
  9 siblings, 0 replies; 19+ messages in thread
From: NeilBrown @ 2023-08-30  2:54 UTC (permalink / raw)
  To: Chuck Lever, Jeff Layton; +Cc: linux-nfs

lwq avoids using back pointers in lists, and uses less locking.
This introduces a new spinlock, but the other one will be removed in a
future patch.

For svc_clean_up_xprts(), we now dequeue the entire queue, walk it to
remove and process the xprts that need cleaning up, then re-enqueue the
remaining queue.

Signed-off-by: NeilBrown <neilb@suse.de>
---
 include/linux/sunrpc/svc.h      |  3 +-
 include/linux/sunrpc/svc_xprt.h |  2 +-
 net/sunrpc/svc.c                |  2 +-
 net/sunrpc/svc_xprt.c           | 57 ++++++++++-----------------------
 4 files changed, 21 insertions(+), 43 deletions(-)

diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
index dafa362b4fdd..7ff9fe785e49 100644
--- a/include/linux/sunrpc/svc.h
+++ b/include/linux/sunrpc/svc.h
@@ -17,6 +17,7 @@
 #include <linux/sunrpc/xdr.h>
 #include <linux/sunrpc/auth.h>
 #include <linux/sunrpc/svcauth.h>
+#include <linux/lwq.h>
 #include <linux/wait.h>
 #include <linux/mm.h>
 #include <linux/pagevec.h>
@@ -34,7 +35,7 @@
 struct svc_pool {
 	unsigned int		sp_id;	    	/* pool id; also node id on NUMA */
 	spinlock_t		sp_lock;	/* protects all fields */
-	struct list_head	sp_sockets;	/* pending sockets */
+	struct lwq		sp_xprts;	/* pending transports */
 	unsigned int		sp_nrthreads;	/* # of threads in pool */
 	struct list_head	sp_all_threads;	/* all server threads */
 	struct llist_head	sp_idle_threads; /* idle server threads */
diff --git a/include/linux/sunrpc/svc_xprt.h b/include/linux/sunrpc/svc_xprt.h
index fa55d12dc765..8e20cd60e2e7 100644
--- a/include/linux/sunrpc/svc_xprt.h
+++ b/include/linux/sunrpc/svc_xprt.h
@@ -54,7 +54,7 @@ struct svc_xprt {
 	const struct svc_xprt_ops *xpt_ops;
 	struct kref		xpt_ref;
 	struct list_head	xpt_list;
-	struct list_head	xpt_ready;
+	struct lwq_node		xpt_ready;
 	unsigned long		xpt_flags;
 
 	struct svc_serv		*xpt_server;	/* service for transport */
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index 3267d740235e..63cddb8cb08d 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -508,7 +508,7 @@ __svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
 				i, serv->sv_name);
 
 		pool->sp_id = i;
-		INIT_LIST_HEAD(&pool->sp_sockets);
+		lwq_init(&pool->sp_xprts);
 		INIT_LIST_HEAD(&pool->sp_all_threads);
 		init_llist_head(&pool->sp_idle_threads);
 		spin_lock_init(&pool->sp_lock);
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
index a51570a4cbf2..2399811582cc 100644
--- a/net/sunrpc/svc_xprt.c
+++ b/net/sunrpc/svc_xprt.c
@@ -201,7 +201,6 @@ void svc_xprt_init(struct net *net, struct svc_xprt_class *xcl,
 	kref_init(&xprt->xpt_ref);
 	xprt->xpt_server = serv;
 	INIT_LIST_HEAD(&xprt->xpt_list);
-	INIT_LIST_HEAD(&xprt->xpt_ready);
 	INIT_LIST_HEAD(&xprt->xpt_deferred);
 	INIT_LIST_HEAD(&xprt->xpt_users);
 	mutex_init(&xprt->xpt_mutex);
@@ -472,9 +471,7 @@ void svc_xprt_enqueue(struct svc_xprt *xprt)
 	pool = svc_pool_for_cpu(xprt->xpt_server);
 
 	percpu_counter_inc(&pool->sp_sockets_queued);
-	spin_lock_bh(&pool->sp_lock);
-	list_add_tail(&xprt->xpt_ready, &pool->sp_sockets);
-	spin_unlock_bh(&pool->sp_lock);
+	lwq_enqueue(&xprt->xpt_ready, &pool->sp_xprts);
 
 	svc_pool_wake_idle_thread(pool);
 }
@@ -487,18 +484,9 @@ static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool)
 {
 	struct svc_xprt	*xprt = NULL;
 
-	if (list_empty(&pool->sp_sockets))
-		goto out;
-
-	spin_lock_bh(&pool->sp_lock);
-	if (likely(!list_empty(&pool->sp_sockets))) {
-		xprt = list_first_entry(&pool->sp_sockets,
-					struct svc_xprt, xpt_ready);
-		list_del_init(&xprt->xpt_ready);
+	xprt = lwq_dequeue(&pool->sp_xprts, struct svc_xprt, xpt_ready);
+	if (xprt)
 		svc_xprt_get(xprt);
-	}
-	spin_unlock_bh(&pool->sp_lock);
-out:
 	return xprt;
 }
 
@@ -708,7 +696,7 @@ rqst_should_sleep(struct svc_rqst *rqstp)
 		return false;
 
 	/* was a socket queued? */
-	if (!list_empty(&pool->sp_sockets))
+	if (!lwq_empty(&pool->sp_xprts))
 		return false;
 
 	/* are we shutting down? */
@@ -1047,7 +1035,6 @@ static void svc_delete_xprt(struct svc_xprt *xprt)
 
 	spin_lock_bh(&serv->sv_lock);
 	list_del_init(&xprt->xpt_list);
-	WARN_ON_ONCE(!list_empty(&xprt->xpt_ready));
 	if (test_bit(XPT_TEMP, &xprt->xpt_flags))
 		serv->sv_tmpcnt--;
 	spin_unlock_bh(&serv->sv_lock);
@@ -1098,36 +1085,26 @@ static int svc_close_list(struct svc_serv *serv, struct list_head *xprt_list, st
 	return ret;
 }
 
-static struct svc_xprt *svc_dequeue_net(struct svc_serv *serv, struct net *net)
+static void svc_clean_up_xprts(struct svc_serv *serv, struct net *net)
 {
-	struct svc_pool *pool;
 	struct svc_xprt *xprt;
-	struct svc_xprt *tmp;
 	int i;
 
 	for (i = 0; i < serv->sv_nrpools; i++) {
-		pool = &serv->sv_pools[i];
-
-		spin_lock_bh(&pool->sp_lock);
-		list_for_each_entry_safe(xprt, tmp, &pool->sp_sockets, xpt_ready) {
-			if (xprt->xpt_net != net)
-				continue;
-			list_del_init(&xprt->xpt_ready);
-			spin_unlock_bh(&pool->sp_lock);
-			return xprt;
+		struct svc_pool *pool = &serv->sv_pools[i];
+		struct llist_node *q, **t1, *t2;
+
+		q = lwq_dequeue_all(&pool->sp_xprts);
+		lwq_for_each_safe(xprt, t1, t2, &q, xpt_ready) {
+			if (xprt->xpt_net == net) {
+				set_bit(XPT_CLOSE, &xprt->xpt_flags);
+				svc_delete_xprt(xprt);
+				xprt = NULL;
+			}
 		}
-		spin_unlock_bh(&pool->sp_lock);
-	}
-	return NULL;
-}
 
-static void svc_clean_up_xprts(struct svc_serv *serv, struct net *net)
-{
-	struct svc_xprt *xprt;
-
-	while ((xprt = svc_dequeue_net(serv, net))) {
-		set_bit(XPT_CLOSE, &xprt->xpt_flags);
-		svc_delete_xprt(xprt);
+		if (q)
+			lwq_enqueue_batch(q, &pool->sp_xprts);
 	}
 }
 
-- 
2.41.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 08/10] SUNRPC: change sp_nrthreads to atomic_t
  2023-08-30  2:54 [PATCH 00/10] SUNRPC thread management changes NeilBrown
                   ` (6 preceding siblings ...)
  2023-08-30  2:54 ` [PATCH 07/10] SUNRPC: use lwq for sp_sockets - renamed to sp_xprts NeilBrown
@ 2023-08-30  2:54 ` NeilBrown
  2023-08-30  2:54 ` [PATCH 09/10] SUNRPC: discard sp_lock NeilBrown
  2023-08-30  2:54 ` [PATCH 10/10] SUNRPC: change the back-channel queue to lwq NeilBrown
  9 siblings, 0 replies; 19+ messages in thread
From: NeilBrown @ 2023-08-30  2:54 UTC (permalink / raw)
  To: Chuck Lever, Jeff Layton; +Cc: linux-nfs

Using an atomic_t avoids the need to take a spinlock (which can soon be
removed).

Choosing a thread to kill needs to be careful as we cannot set the "die
now" bit atomically with the test on the count.  Instead we temporarily
increase the count.

Signed-off-by: NeilBrown <neilb@suse.de>
---
 fs/nfsd/nfssvc.c           |  3 ++-
 include/linux/sunrpc/svc.h |  2 +-
 net/sunrpc/svc.c           | 37 ++++++++++++++++++++-----------------
 3 files changed, 23 insertions(+), 19 deletions(-)

diff --git a/fs/nfsd/nfssvc.c b/fs/nfsd/nfssvc.c
index 062f51fe4dfb..5e455ced0711 100644
--- a/fs/nfsd/nfssvc.c
+++ b/fs/nfsd/nfssvc.c
@@ -718,7 +718,8 @@ int nfsd_get_nrthreads(int n, int *nthreads, struct net *net)
 
 	if (nn->nfsd_serv != NULL) {
 		for (i = 0; i < nn->nfsd_serv->sv_nrpools && i < n; i++)
-			nthreads[i] = nn->nfsd_serv->sv_pools[i].sp_nrthreads;
+			nthreads[i] =
+				atomic_read(&nn->nfsd_serv->sv_pools[i].sp_nrthreads);
 	}
 
 	return 0;
diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
index 7ff9fe785e49..9d0fcd6148ae 100644
--- a/include/linux/sunrpc/svc.h
+++ b/include/linux/sunrpc/svc.h
@@ -36,7 +36,7 @@ struct svc_pool {
 	unsigned int		sp_id;	    	/* pool id; also node id on NUMA */
 	spinlock_t		sp_lock;	/* protects all fields */
 	struct lwq		sp_xprts;	/* pending transports */
-	unsigned int		sp_nrthreads;	/* # of threads in pool */
+	atomic_t		sp_nrthreads;	/* # of threads in pool */
 	struct list_head	sp_all_threads;	/* all server threads */
 	struct llist_head	sp_idle_threads; /* idle server threads */
 
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index 63cddb8cb08d..9524af33ace9 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -681,8 +681,8 @@ svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node)
 	serv->sv_nrthreads += 1;
 	spin_unlock_bh(&serv->sv_lock);
 
+	atomic_inc(&pool->sp_nrthreads);
 	spin_lock_bh(&pool->sp_lock);
-	pool->sp_nrthreads++;
 	list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads);
 	spin_unlock_bh(&pool->sp_lock);
 	return rqstp;
@@ -727,23 +727,24 @@ svc_pool_next(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
 }
 
 static struct svc_pool *
-svc_pool_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
+svc_pool_victim(struct svc_serv *serv, struct svc_pool *target_pool,
+		unsigned int *state)
 {
+	struct svc_pool *pool;
 	unsigned int i;
 
+retry:
+	pool = target_pool;
+
 	if (pool != NULL) {
-		spin_lock_bh(&pool->sp_lock);
-		if (pool->sp_nrthreads)
+		if (atomic_inc_not_zero(&pool->sp_nrthreads))
 			goto found_pool;
-		spin_unlock_bh(&pool->sp_lock);
 		return NULL;
 	} else {
 		for (i = 0; i < serv->sv_nrpools; i++) {
 			pool = &serv->sv_pools[--(*state) % serv->sv_nrpools];
-			spin_lock_bh(&pool->sp_lock);
-			if (pool->sp_nrthreads)
+			if (atomic_inc_not_zero(&pool->sp_nrthreads))
 				goto found_pool;
-			spin_unlock_bh(&pool->sp_lock);
 		}
 		return NULL;
 	}
@@ -751,8 +752,12 @@ svc_pool_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *stat
 found_pool:
 	set_bit(SP_VICTIM_REMAINS, &pool->sp_flags);
 	set_bit(SP_NEED_VICTIM, &pool->sp_flags);
-	spin_unlock_bh(&pool->sp_lock);
-	return pool;
+	if (!atomic_dec_and_test(&pool->sp_nrthreads))
+		return pool;
+	/* Nothing left in this pool any more */
+	clear_bit(SP_NEED_VICTIM, &pool->sp_flags);
+	clear_bit(SP_VICTIM_REMAINS, &pool->sp_flags);
+	goto retry;
 }
 
 static int
@@ -828,13 +833,10 @@ svc_stop_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 int
 svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 {
-	if (pool == NULL) {
+	if (!pool)
 		nrservs -= serv->sv_nrthreads;
-	} else {
-		spin_lock_bh(&pool->sp_lock);
-		nrservs -= pool->sp_nrthreads;
-		spin_unlock_bh(&pool->sp_lock);
-	}
+	else
+		nrservs -= atomic_read(&pool->sp_nrthreads);
 
 	if (nrservs > 0)
 		return svc_start_kthreads(serv, pool, nrservs);
@@ -921,10 +923,11 @@ svc_exit_thread(struct svc_rqst *rqstp)
 	struct svc_pool	*pool = rqstp->rq_pool;
 
 	spin_lock_bh(&pool->sp_lock);
-	pool->sp_nrthreads--;
 	list_del_rcu(&rqstp->rq_all);
 	spin_unlock_bh(&pool->sp_lock);
 
+	atomic_dec(&pool->sp_nrthreads);
+
 	spin_lock_bh(&serv->sv_lock);
 	serv->sv_nrthreads -= 1;
 	spin_unlock_bh(&serv->sv_lock);
-- 
2.41.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 09/10] SUNRPC: discard sp_lock
  2023-08-30  2:54 [PATCH 00/10] SUNRPC thread management changes NeilBrown
                   ` (7 preceding siblings ...)
  2023-08-30  2:54 ` [PATCH 08/10] SUNRPC: change sp_nrthreads to atomic_t NeilBrown
@ 2023-08-30  2:54 ` NeilBrown
  2023-08-30  2:54 ` [PATCH 10/10] SUNRPC: change the back-channel queue to lwq NeilBrown
  9 siblings, 0 replies; 19+ messages in thread
From: NeilBrown @ 2023-08-30  2:54 UTC (permalink / raw)
  To: Chuck Lever, Jeff Layton; +Cc: linux-nfs

sp_lock is now only used to protect sp_all_threads.  This isn't needed
as sp_all_threads is only manipulated through svc_set_num_threads(),
which must be locked.  Read-acccess only requires rcu_read_lock().  So
no more locking is needed.

Signed-off-by: NeilBrown <neilb@suse.de>
---
 include/linux/sunrpc/svc.h |  1 -
 net/sunrpc/svc.c           | 10 +++++-----
 2 files changed, 5 insertions(+), 6 deletions(-)

diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
index 9d0fcd6148ae..8ce1392c1a35 100644
--- a/include/linux/sunrpc/svc.h
+++ b/include/linux/sunrpc/svc.h
@@ -34,7 +34,6 @@
  */
 struct svc_pool {
 	unsigned int		sp_id;	    	/* pool id; also node id on NUMA */
-	spinlock_t		sp_lock;	/* protects all fields */
 	struct lwq		sp_xprts;	/* pending transports */
 	atomic_t		sp_nrthreads;	/* # of threads in pool */
 	struct list_head	sp_all_threads;	/* all server threads */
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index 9524af33ace9..61ea8ce7975f 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -511,7 +511,6 @@ __svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
 		lwq_init(&pool->sp_xprts);
 		INIT_LIST_HEAD(&pool->sp_all_threads);
 		init_llist_head(&pool->sp_idle_threads);
-		spin_lock_init(&pool->sp_lock);
 
 		percpu_counter_init(&pool->sp_messages_arrived, 0, GFP_KERNEL);
 		percpu_counter_init(&pool->sp_sockets_queued, 0, GFP_KERNEL);
@@ -682,9 +681,12 @@ svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node)
 	spin_unlock_bh(&serv->sv_lock);
 
 	atomic_inc(&pool->sp_nrthreads);
-	spin_lock_bh(&pool->sp_lock);
+
+	/* Protected by whatever lock the service uses when calling
+	 * svc_set_num_threads()
+	 */
 	list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads);
-	spin_unlock_bh(&pool->sp_lock);
+
 	return rqstp;
 }
 
@@ -922,9 +924,7 @@ svc_exit_thread(struct svc_rqst *rqstp)
 	struct svc_serv	*serv = rqstp->rq_server;
 	struct svc_pool	*pool = rqstp->rq_pool;
 
-	spin_lock_bh(&pool->sp_lock);
 	list_del_rcu(&rqstp->rq_all);
-	spin_unlock_bh(&pool->sp_lock);
 
 	atomic_dec(&pool->sp_nrthreads);
 
-- 
2.41.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 10/10] SUNRPC: change the back-channel queue to lwq
  2023-08-30  2:54 [PATCH 00/10] SUNRPC thread management changes NeilBrown
                   ` (8 preceding siblings ...)
  2023-08-30  2:54 ` [PATCH 09/10] SUNRPC: discard sp_lock NeilBrown
@ 2023-08-30  2:54 ` NeilBrown
  9 siblings, 0 replies; 19+ messages in thread
From: NeilBrown @ 2023-08-30  2:54 UTC (permalink / raw)
  To: Chuck Lever, Jeff Layton; +Cc: linux-nfs

This removes the need to store and update back-links in the list.
It also remove the need for the _bh version of spin_lock().

Signed-off-by: NeilBrown <neilb@suse.de>
---
 include/linux/sunrpc/svc.h        |  3 +--
 include/linux/sunrpc/xprt.h       |  3 ++-
 net/sunrpc/backchannel_rqst.c     |  5 +----
 net/sunrpc/svc.c                  |  3 +--
 net/sunrpc/svc_xprt.c             | 12 +++---------
 net/sunrpc/xprtrdma/backchannel.c |  4 +---
 6 files changed, 9 insertions(+), 21 deletions(-)

diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
index 8ce1392c1a35..c1feaf0d1542 100644
--- a/include/linux/sunrpc/svc.h
+++ b/include/linux/sunrpc/svc.h
@@ -90,10 +90,9 @@ struct svc_serv {
 	int			(*sv_threadfn)(void *data);
 
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
-	struct list_head	sv_cb_list;	/* queue for callback requests
+	struct lwq		sv_cb_list;	/* queue for callback requests
 						 * that arrive over the same
 						 * connection */
-	spinlock_t		sv_cb_lock;	/* protects the svc_cb_list */
 	bool			sv_bc_enabled;	/* service uses backchannel */
 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
 };
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index b52411bcfe4e..5413bf694b18 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -57,6 +57,7 @@ struct xprt_class;
 struct seq_file;
 struct svc_serv;
 struct net;
+#include <linux/lwq.h>
 
 /*
  * This describes a complete RPC request
@@ -121,7 +122,7 @@ struct rpc_rqst {
 	int			rq_ntrans;
 
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
-	struct list_head	rq_bc_list;	/* Callback service list */
+	struct lwq_node		rq_bc_list;	/* Callback service list */
 	unsigned long		rq_bc_pa_state;	/* Backchannel prealloc state */
 	struct list_head	rq_bc_pa_list;	/* Backchannel prealloc list */
 #endif /* CONFIG_SUNRPC_BACKCHANEL */
diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
index 44b7c89a635f..caa94cf57123 100644
--- a/net/sunrpc/backchannel_rqst.c
+++ b/net/sunrpc/backchannel_rqst.c
@@ -83,7 +83,6 @@ static struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt)
 		return NULL;
 
 	req->rq_xprt = xprt;
-	INIT_LIST_HEAD(&req->rq_bc_list);
 
 	/* Preallocate one XDR receive buffer */
 	if (xprt_alloc_xdr_buf(&req->rq_rcv_buf, gfp_flags) < 0) {
@@ -367,8 +366,6 @@ void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied)
 
 	dprintk("RPC:       add callback request to list\n");
 	xprt_get(xprt);
-	spin_lock(&bc_serv->sv_cb_lock);
-	list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
-	spin_unlock(&bc_serv->sv_cb_lock);
+	lwq_enqueue(&req->rq_bc_list, &bc_serv->sv_cb_list);
 	svc_pool_wake_idle_thread(&bc_serv->sv_pools[0]);
 }
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index 61ea8ce7975f..3d3aaed8311c 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -438,8 +438,7 @@ EXPORT_SYMBOL_GPL(svc_bind);
 static void
 __svc_init_bc(struct svc_serv *serv)
 {
-	INIT_LIST_HEAD(&serv->sv_cb_list);
-	spin_lock_init(&serv->sv_cb_lock);
+	lwq_init(&serv->sv_cb_list);
 }
 #else
 static void
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
index 2399811582cc..b4beb38152ab 100644
--- a/net/sunrpc/svc_xprt.c
+++ b/net/sunrpc/svc_xprt.c
@@ -705,7 +705,7 @@ rqst_should_sleep(struct svc_rqst *rqstp)
 
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 	if (svc_is_backchannel(rqstp)) {
-		if (!list_empty(&rqstp->rq_server->sv_cb_list))
+		if (!lwq_empty(&rqstp->rq_server->sv_cb_list))
 			return false;
 	}
 #endif
@@ -875,18 +875,12 @@ void svc_recv(struct svc_rqst *rqstp)
 		struct svc_serv *serv = rqstp->rq_server;
 		struct rpc_rqst *req;
 
-		spin_lock_bh(&serv->sv_cb_lock);
-		req = list_first_entry_or_null(&serv->sv_cb_list,
-					       struct rpc_rqst, rq_bc_list);
+		req = lwq_dequeue(&serv->sv_cb_list,
+				  struct rpc_rqst, rq_bc_list);
 		if (req) {
-			list_del(&req->rq_bc_list);
-			spin_unlock_bh(&serv->sv_cb_lock);
 			wake_next(rqstp);
-
 			svc_process_bc(req, rqstp);
-			return;
 		}
-		spin_unlock_bh(&serv->sv_cb_lock);
 	}
 #endif
 }
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index bfc434ec52a7..8c817e755262 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -263,9 +263,7 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
 	/* Queue rqst for ULP's callback service */
 	bc_serv = xprt->bc_serv;
 	xprt_get(xprt);
-	spin_lock(&bc_serv->sv_cb_lock);
-	list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list);
-	spin_unlock(&bc_serv->sv_cb_lock);
+	lwq_enqueue(&rqst->rq_bc_list, &bc_serv->sv_cb_list);
 
 	svc_pool_wake_idle_thread(&bc_serv->sv_pools[0]);
 
-- 
2.41.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* Re: [PATCH 05/10] lib: add light-weight queuing mechanism.
  2023-08-30  2:54 ` [PATCH 05/10] lib: add light-weight queuing mechanism NeilBrown
@ 2023-08-30 15:21   ` Chuck Lever
  2023-09-03 23:57     ` NeilBrown
  2023-08-30 15:35   ` Chuck Lever
  2023-08-30 16:03   ` Chuck Lever
  2 siblings, 1 reply; 19+ messages in thread
From: Chuck Lever @ 2023-08-30 15:21 UTC (permalink / raw)
  To: NeilBrown; +Cc: Jeff Layton, linux-nfs

On Wed, Aug 30, 2023 at 12:54:48PM +1000, NeilBrown wrote:
> lwq is a FIFO single-linked queue that only requires a spinlock
> for dequeueing, which happens in process context.  Enqueueing is atomic
> with no spinlock and can happen in any context.
> 
> Include a unit test for basic functionality - runs at boot time.  Does
> not use kunit framework.
> 
> Signed-off-by: NeilBrown <neilb@suse.de>
> ---
>  include/linux/lwq.h | 120 +++++++++++++++++++++++++++++++++++
>  lib/Kconfig         |   5 ++
>  lib/Makefile        |   2 +-
>  lib/lwq.c           | 149 ++++++++++++++++++++++++++++++++++++++++++++
>  4 files changed, 275 insertions(+), 1 deletion(-)
>  create mode 100644 include/linux/lwq.h
>  create mode 100644 lib/lwq.c

I've applied and/or squashed the previous four and pushed.

I don't have any specific complaints on this one, but checkpatch
throws about 20 warnings. Some of those you might want to deal with
or just ignore. Up to you, but I'll hold off on applying it until I
hear from you.

Also, I'm trying to collect a set of potential reviewers for it:

[cel@bazille even-releases]$ scripts/get_maintainer.pl lib/
Andrew Morton <akpm@linux-foundation.org> (commit_signer:206/523=39%)
"Liam R. Howlett" <Liam.Howlett@oracle.com> (commit_signer:89/523=17%,authored:61/523=12%)
Kees Cook <keescook@chromium.org> (commit_signer:48/523=9%)
Greg Kroah-Hartman <gregkh@linuxfoundation.org> (commit_signer:48/523=9%)
David Gow <davidgow@google.com> (commit_signer:43/523=8%)
linux-kernel@vger.kernel.org (open list)
[cel@bazille even-releases]$

Is that a reasonable set to add as Cc's?


> diff --git a/include/linux/lwq.h b/include/linux/lwq.h
> new file mode 100644
> index 000000000000..52b9c81b493a
> --- /dev/null
> +++ b/include/linux/lwq.h
> @@ -0,0 +1,120 @@
> +/* SPDX-License-Identifier: GPL-2.0-only */
> +
> +#ifndef LWQ_H
> +#define LWQ_H
> +/*
> + * light-weight single-linked queue built from llist
> + *
> + * Entries can be enqueued from any context with no locking.
> + * Entries can be dequeued from process context with integrated locking.
> + */
> +#include <linux/container_of.h>
> +#include <linux/spinlock.h>
> +#include <linux/llist.h>
> +
> +struct lwq_node {
> +	struct llist_node node;
> +};
> +
> +struct lwq {
> +	spinlock_t		lock;
> +	struct llist_node	*ready;		/* entries to be dequeued */
> +	struct llist_head	new;		/* entries being enqueued */
> +};
> +
> +/**
> + * lwq_init - initialise a lwq
> + * @q:	the lwq object
> + */
> +static inline void lwq_init(struct lwq *q)
> +{
> +	spin_lock_init(&q->lock);
> +	q->ready = NULL;
> +	init_llist_head(&q->new);
> +}
> +
> +/**
> + * lwq_empty - test if lwq contains any entry
> + * @q:	the lwq object
> + *
> + * This empty test contains an acquire barrier so that if a wakeup
> + * is sent when lwq_dequeue returns true, it is safe to go to sleep after
> + * a test on lwq_empty().
> + */
> +static inline bool lwq_empty(struct lwq *q)
> +{
> +	/* acquire ensures ordering wrt lwq_enqueue() */
> +	return smp_load_acquire(&q->ready) == NULL && llist_empty(&q->new);
> +}
> +
> +struct llist_node *__lwq_dequeue(struct lwq *q);
> +/**
> + * lwq_dequeue - dequeue first (oldest) entry from lwq
> + * @q:		the queue to dequeue from
> + * @type:	the type of object to return
> + * @member:	them member in returned object which is an lwq_node.
> + *
> + * Remove a single object from the lwq and return it.  This will take
> + * a spinlock and so must always be called in the same context, typcially
> + * process contet.
> + */
> +#define lwq_dequeue(q, type, member)					\
> +	({ struct llist_node *_n = __lwq_dequeue(q);			\
> +	  _n ? container_of(_n, type, member.node) : NULL; })
> +
> +struct llist_node *lwq_dequeue_all(struct lwq *q);
> +
> +/**
> + * lwq_for_each_safe - iterate over detached queue allowing deletion
> + * @_n:		iterator variable
> + * @_t1:	temporary struct llist_node **
> + * @_t2:	temporary struct llist_node *
> + * @_l:		address of llist_node pointer from lwq_dequeue_all()
> + * @_member:	member in _n where lwq_node is found.
> + *
> + * Iterate over members in a dequeued list.  If the iterator variable
> + * is set to NULL, the iterator removes that entry from the queue.
> + */
> +#define lwq_for_each_safe(_n, _t1, _t2, _l, _member)			\
> +	for (_t1 = (_l);						\
> +	     *(_t1) ? (_n = container_of(*(_t1), typeof(*(_n)), _member.node),\
> +		       _t2 = ((*_t1)->next),				\
> +		       true)						\
> +	     : false;							\
> +	     (_n) ? (_t1 = &(_n)->_member.node.next, 0)			\
> +	     : ((*(_t1) = (_t2)),  0))
> +
> +/**
> + * lwq_enqueue - add a new item to the end of the queue
> + * @n	- the lwq_node embedded in the item to be added
> + * @q	- the lwq to append to.
> + *
> + * No locking is needed to append to the queue so this can
> + * be called from any context.
> + * Return %true is the list may have previously been empty.
> + */
> +static inline bool lwq_enqueue(struct lwq_node *n, struct lwq *q)
> +{
> +	/* acquire enqures ordering wrt lwq_dequeue */
> +	return llist_add(&n->node, &q->new) &&
> +		smp_load_acquire(&q->ready) == NULL;
> +}
> +
> +/**
> + * lwq_enqueue_batch - add a list of new items to the end of the queue
> + * @n	- the lwq_node embedded in the first item to be added
> + * @q	- the lwq to append to.
> + *
> + * No locking is needed to append to the queue so this can
> + * be called from any context.
> + * Return %true is the list may have previously been empty.
> + */
> +static inline bool lwq_enqueue_batch(struct llist_node *n, struct lwq *q)
> +{
> +	struct llist_node *e = n;
> +
> +	/* acquire enqures ordering wrt lwq_dequeue */
> +	return llist_add_batch(llist_reverse_order(n), e, &q->new) &&
> +		smp_load_acquire(&q->ready) == NULL;
> +}
> +#endif /* LWQ_H */
> diff --git a/lib/Kconfig b/lib/Kconfig
> index 5c2da561c516..6620bdba4f94 100644
> --- a/lib/Kconfig
> +++ b/lib/Kconfig
> @@ -763,3 +763,8 @@ config ASN1_ENCODER
>  
>  config POLYNOMIAL
>         tristate
> +
> +config LWQ_TEST
> +	bool "RPC: enable boot-time test for lwq queuing"
> +	help
> +          Enable boot-time test of lwq functionality.
> diff --git a/lib/Makefile b/lib/Makefile
> index 1ffae65bb7ee..4b67c2d6af62 100644
> --- a/lib/Makefile
> +++ b/lib/Makefile
> @@ -45,7 +45,7 @@ obj-y	+= lockref.o
>  obj-y += bcd.o sort.o parser.o debug_locks.o random32.o \
>  	 bust_spinlocks.o kasprintf.o bitmap.o scatterlist.o \
>  	 list_sort.o uuid.o iov_iter.o clz_ctz.o \
> -	 bsearch.o find_bit.o llist.o memweight.o kfifo.o \
> +	 bsearch.o find_bit.o llist.o lwq.o memweight.o kfifo.o \
>  	 percpu-refcount.o rhashtable.o base64.o \
>  	 once.o refcount.o rcuref.o usercopy.o errseq.o bucket_locks.o \
>  	 generic-radix-tree.o
> diff --git a/lib/lwq.c b/lib/lwq.c
> new file mode 100644
> index 000000000000..d6be6dda3867
> --- /dev/null
> +++ b/lib/lwq.c
> @@ -0,0 +1,149 @@
> +// SPDX-License-Identifier: GPL-2.0-only
> +/*
> + * Light weight single-linked queue.
> + *
> + * Entries are enqueued to the head of an llist, with no blocking.
> + * This can happen in any context.
> + *
> + * Entries are dequeued using a spinlock to protect against
> + * multiple access.  The llist is staged in reverse order, and refreshed
> + * from the llist when it exhausts.
> + */
> +#include <linux/rcupdate.h>
> +#include <linux/lwq.h>
> +
> +struct llist_node *__lwq_dequeue(struct lwq *q)
> +{
> +	struct llist_node *this;
> +
> +	if (lwq_empty(q))
> +		return NULL;
> +	spin_lock(&q->lock);
> +	this = q->ready;
> +	if (!this && !llist_empty(&q->new)) {
> +		/* ensure queue doesn't appear transiently lwq_empty */
> +		smp_store_release(&q->ready, (void *)1);
> +		this = llist_reverse_order(llist_del_all(&q->new));
> +		if (!this)
> +			q->ready = NULL;
> +	}
> +	if (this)
> +		q->ready = llist_next(this);
> +	spin_unlock(&q->lock);
> +	return this;
> +}
> +
> +/**
> + * lwq_dequeue_all - dequeue all currently enqueued objects
> + * @q:	the queue to dequeue from
> + *
> + * Remove and return a linked list of llist_nodes of all the objects that were
> + * in the queue. The first on the list will be the object that was least
> + * recently enqueued.
> + */
> +struct llist_node *lwq_dequeue_all(struct lwq *q)
> +{
> +	struct llist_node *r, *t, **ep;
> +
> +	if (lwq_empty(q))
> +		return NULL;
> +
> +	spin_lock(&q->lock);
> +	r = q->ready;
> +	q->ready = NULL;
> +	t = llist_del_all(&q->new);
> +	spin_unlock(&q->lock);
> +	ep = &r;
> +	while (*ep)
> +		ep = &(*ep)->next;
> +	*ep = llist_reverse_order(t);
> +	return r;
> +}
> +
> +#if IS_ENABLED(CONFIG_LWQ_TEST)
> +
> +#include <linux/module.h>
> +#include <linux/slab.h>
> +#include <linux/wait_bit.h>
> +#include <linux/kthread.h>
> +#include <linux/delay.h>
> +struct tnode {
> +	struct lwq_node n;
> +	int i;
> +	int c;
> +};
> +
> +static int lwq_exercise(void *qv)
> +{
> +	struct lwq *q = qv;
> +	int cnt;
> +	struct tnode *t;
> +
> +	for (cnt = 0; cnt < 10000; cnt++) {
> +		wait_var_event(q, (t = lwq_dequeue(q, struct tnode, n)) != NULL);
> +		t->c++;
> +		if (lwq_enqueue(&t->n, q))
> +			wake_up_var(q);
> +	}
> +	while (!kthread_should_stop())
> +		schedule_timeout_idle(1);
> +	return 0;
> +}
> +
> +static int lwq_test(void)
> +{
> +	int i;
> +	struct lwq q;
> +	struct llist_node *l, **t1, *t2;
> +	struct tnode *t;
> +	struct task_struct *threads[8];
> +
> +	printk(KERN_INFO "testing lwq....\n");
> +	lwq_init(&q);
> +	printk(KERN_INFO " lwq: run some threads\n");
> +	for (i = 0; i < ARRAY_SIZE(threads); i++)
> +		threads[i] = kthread_run(lwq_exercise, &q, "lwq-test-%d", i);
> +	for (i = 0; i < 100; i++) {
> +		t = kmalloc(sizeof(*t), GFP_KERNEL);
> +		t->i = i;
> +		t->c = 0;
> +		if (lwq_enqueue(&t->n, &q))
> +			wake_up_var(&q);
> +	};
> +	/* wait for threads to exit */
> +	for (i = 0; i < ARRAY_SIZE(threads); i++)
> +		if (!IS_ERR_OR_NULL(threads[i]))
> +			kthread_stop(threads[i]);
> +	printk(KERN_INFO " lwq: dequeue first 50:");
> +	for (i = 0; i < 50 ; i++) {
> +		if (i && (i % 10) == 0) {
> +			printk(KERN_CONT "\n");
> +			printk(KERN_INFO " lwq: ... ");
> +		}
> +		t = lwq_dequeue(&q, struct tnode, n);
> +		printk(KERN_CONT " %d(%d)", t->i, t->c);
> +		kfree(t);
> +	}
> +	printk(KERN_CONT "\n");
> +	l = lwq_dequeue_all(&q);
> +	printk(KERN_INFO " lwq: delete the multiples of 3 (test lwq_for_each_safe())\n");
> +	lwq_for_each_safe(t, t1, t2, &l, n) {
> +		if ((t->i % 3) == 0) {
> +			t->i = -1;
> +			kfree(t);
> +			t = NULL;
> +		}
> +	}
> +	if (l)
> +		lwq_enqueue_batch(l, &q);
> +	printk(KERN_INFO " lwq: dequeue remaining:");
> +	while ((t = lwq_dequeue(&q, struct tnode, n)) != NULL) {
> +		printk(KERN_CONT " %d", t->i);
> +		kfree(t);
> +	}
> +	printk(KERN_CONT "\n");
> +	return 0;
> +}
> +
> +module_init(lwq_test);
> +#endif /* CONFIG_LWQ_TEST*/
> -- 
> 2.41.0
> 

-- 
Chuck Lever

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH 06/10] SUNRPC: only have one thread waking up at a time
  2023-08-30  2:54 ` [PATCH 06/10] SUNRPC: only have one thread waking up at a time NeilBrown
@ 2023-08-30 15:28   ` Chuck Lever
  2023-09-04  0:35     ` NeilBrown
  0 siblings, 1 reply; 19+ messages in thread
From: Chuck Lever @ 2023-08-30 15:28 UTC (permalink / raw)
  To: NeilBrown; +Cc: Jeff Layton, linux-nfs

On Wed, Aug 30, 2023 at 12:54:49PM +1000, NeilBrown wrote:
> Currently if several items of work become available in quick succession,
> that number of threads (if available) will be woken.  By the time some
> of them wake up another thread that was already cache-warm might have
> come along and completed the work.  Anecdotal evidence suggests as many
> as 15% of wakes find nothing to do once they get to the point of
> looking.
> 
> This patch changes svc_pool_wake_idle_thread() to wake the first thread
> on the queue but NOT remove it.  Subsequent calls will wake the same
> thread.  Once that thread starts it will dequeue itself and after
> dequeueing some work to do, it will wake the next thread if there is more
> work ready.  This results in a more orderly increase in the number of
> busy threads.
> 
> As a bonus, this allows us to reduce locking around the idle queue.
> svc_pool_wake_idle_thread() no longer needs to take a lock (beyond
> rcu_read_lock()) as it doesn't manipulate the queue, it just looks at
> the first item.
> 
> The thread itself can avoid locking by using the new
> llist_del_first_this() interface.  This will safely remove the thread
> itself if it is the head.  If it isn't the head, it will do nothing.
> If multiple threads call this concurrently only one will succeed.  The
> others will do nothing, so no corruption can result.
> 
> If a thread wakes up and finds that it cannot dequeue itself that means
> either
> - that it wasn't woken because it was the head of the queue.  Maybe the
>   freezer woke it.  In that case it can go back to sleep (after trying
>   to freeze of course).
> - some other thread found there was nothing to do very recently, and
>   placed itself on the head of the queue in front of this thread.
>   It must check again after placing itself there, so it can be deemed to
>   be responsible for any pending work, and this thread can go back to
>   sleep until woken.
> 
> No code ever tests for busy threads any more.  Only each thread itself
> cares if it is busy.  So svc_thread_busy() is no longer needed.
> 
> Signed-off-by: NeilBrown <neilb@suse.de>
> ---
>  include/linux/sunrpc/svc.h | 11 -----------
>  net/sunrpc/svc.c           | 14 ++++++--------
>  net/sunrpc/svc_xprt.c      | 35 ++++++++++++++++++++++-------------
>  3 files changed, 28 insertions(+), 32 deletions(-)
> 
> diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
> index ad4572630335..dafa362b4fdd 100644
> --- a/include/linux/sunrpc/svc.h
> +++ b/include/linux/sunrpc/svc.h
> @@ -266,17 +266,6 @@ enum {
>  	RQ_DATA,		/* request has data */
>  };
>  
> -/**
> - * svc_thread_busy - check if a thread as busy
> - * @rqstp: the thread which might be busy
> - *
> - * A thread is only busy when it is not an the idle list.
> - */
> -static inline bool svc_thread_busy(const struct svc_rqst *rqstp)
> -{
> -	return !llist_on_list(&rqstp->rq_idle);
> -}
> -
>  #define SVC_NET(rqst) (rqst->rq_xprt ? rqst->rq_xprt->xpt_net : rqst->rq_bc_net)
>  
>  /*
> diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
> index 5673f30db295..3267d740235e 100644
> --- a/net/sunrpc/svc.c
> +++ b/net/sunrpc/svc.c
> @@ -642,7 +642,6 @@ svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node)
>  
>  	folio_batch_init(&rqstp->rq_fbatch);
>  
> -	init_llist_node(&rqstp->rq_idle);
>  	rqstp->rq_server = serv;
>  	rqstp->rq_pool = pool;
>  
> @@ -704,17 +703,16 @@ void svc_pool_wake_idle_thread(struct svc_pool *pool)
>  	struct llist_node *ln;
>  
>  	rcu_read_lock();
> -	spin_lock_bh(&pool->sp_lock);
> -	ln = llist_del_first_init(&pool->sp_idle_threads);
> -	spin_unlock_bh(&pool->sp_lock);
> +	ln = READ_ONCE(pool->sp_idle_threads.first);
>  	if (ln) {
>  		rqstp = llist_entry(ln, struct svc_rqst, rq_idle);
> -
>  		WRITE_ONCE(rqstp->rq_qtime, ktime_get());
> -		wake_up_process(rqstp->rq_task);
> +		if (!task_is_running(rqstp->rq_task)) {
> +			wake_up_process(rqstp->rq_task);
> +			trace_svc_wake_up(rqstp->rq_task->pid);
> +			percpu_counter_inc(&pool->sp_threads_woken);
> +		}
>  		rcu_read_unlock();
> -		percpu_counter_inc(&pool->sp_threads_woken);
> -		trace_svc_wake_up(rqstp->rq_task->pid);
>  		return;
>  	}
>  	rcu_read_unlock();
> diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
> index 17c43bde35c9..a51570a4cbf2 100644
> --- a/net/sunrpc/svc_xprt.c
> +++ b/net/sunrpc/svc_xprt.c
> @@ -732,20 +732,16 @@ static void svc_rqst_wait_for_work(struct svc_rqst *rqstp)
>  	if (rqst_should_sleep(rqstp)) {
>  		set_current_state(TASK_IDLE | TASK_FREEZABLE);
>  		llist_add(&rqstp->rq_idle, &pool->sp_idle_threads);
> +		if (likely(rqst_should_sleep(rqstp)))
> +			schedule();
>  
> -		if (unlikely(!rqst_should_sleep(rqstp)))
> -			/* Work just became available.  This thread cannot simply
> -			 * choose not to sleep as it *must* wait until removed.
> -			 * So wake the first waiter - whether it is this
> -			 * thread or some other, it will get the work done.
> +		while (!llist_del_first_this(&pool->sp_idle_threads,
> +					     &rqstp->rq_idle)) {
> +			/* Cannot @rqstp from idle list, so some other thread

I was not aware that "@rqstp" was a verb.  ;-)

Maybe the nice new comment that you are deleting just above here
would be appropriate to move here.


> +			 * must have queued itself after finding
> +			 * no work to do, so they have taken responsibility
> +			 * for any outstanding work.
>  			 */
> -			svc_pool_wake_idle_thread(pool);
> -
> -		/* Since a thread cannot remove itself from an llist,
> -		 * schedule until someone else removes @rqstp from
> -		 * the idle list.
> -		 */
> -		while (!svc_thread_busy(rqstp)) {
>  			schedule();
>  			set_current_state(TASK_IDLE | TASK_FREEZABLE);
>  		}
> @@ -835,6 +831,15 @@ static void svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt)
>  	svc_xprt_release(rqstp);
>  }
>  
> +static void wake_next(struct svc_rqst *rqstp)

Nit: I would prefer a subsystem-specific name for this little guy.
Makes it a little easier to distinguish from generic scheduler
functions when looking at perf output.

How about "svc_thread_wake_next" ?


> +{
> +	if (!rqst_should_sleep(rqstp))

rqst_should_sleep() should also get a better name IMO, but that
helper was added many patches ago. If you agree to a change, I can
do that surgery.


> +		/* More work pending after I dequeued some,
> +		 * wake another worker
> +		 */
> +		svc_pool_wake_idle_thread(rqstp->rq_pool);
> +}
> +
>  /**
>   * svc_recv - Receive and process the next request on any transport
>   * @rqstp: an idle RPC service thread
> @@ -854,13 +859,16 @@ void svc_recv(struct svc_rqst *rqstp)
>  
>  	clear_bit(SP_TASK_PENDING, &pool->sp_flags);
>  
> -	if (svc_thread_should_stop(rqstp))
> +	if (svc_thread_should_stop(rqstp)) {
> +		wake_next(rqstp);
>  		return;
> +	}
>  
>  	rqstp->rq_xprt = svc_xprt_dequeue(pool);
>  	if (rqstp->rq_xprt) {
>  		struct svc_xprt *xprt = rqstp->rq_xprt;
>  
> +		wake_next(rqstp);
>  		/* Normally we will wait up to 5 seconds for any required
>  		 * cache information to be provided.  When there are no
>  		 * idle threads, we reduce the wait time.
> @@ -885,6 +893,7 @@ void svc_recv(struct svc_rqst *rqstp)
>  		if (req) {
>  			list_del(&req->rq_bc_list);
>  			spin_unlock_bh(&serv->sv_cb_lock);
> +			wake_next(rqstp);
>  
>  			svc_process_bc(req, rqstp);
>  			return;
> -- 
> 2.41.0
> 

-- 
Chuck Lever

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH 05/10] lib: add light-weight queuing mechanism.
  2023-08-30  2:54 ` [PATCH 05/10] lib: add light-weight queuing mechanism NeilBrown
  2023-08-30 15:21   ` Chuck Lever
@ 2023-08-30 15:35   ` Chuck Lever
  2023-09-03 23:59     ` NeilBrown
  2023-08-30 16:03   ` Chuck Lever
  2 siblings, 1 reply; 19+ messages in thread
From: Chuck Lever @ 2023-08-30 15:35 UTC (permalink / raw)
  To: NeilBrown; +Cc: Jeff Layton, linux-nfs

On Wed, Aug 30, 2023 at 12:54:48PM +1000, NeilBrown wrote:
> lwq is a FIFO single-linked queue that only requires a spinlock
> for dequeueing, which happens in process context.  Enqueueing is atomic
> with no spinlock and can happen in any context.
> 
> Include a unit test for basic functionality - runs at boot time.  Does
> not use kunit framework.
> 
> Signed-off-by: NeilBrown <neilb@suse.de>
> ---
>  include/linux/lwq.h | 120 +++++++++++++++++++++++++++++++++++
>  lib/Kconfig         |   5 ++
>  lib/Makefile        |   2 +-
>  lib/lwq.c           | 149 ++++++++++++++++++++++++++++++++++++++++++++
>  4 files changed, 275 insertions(+), 1 deletion(-)
>  create mode 100644 include/linux/lwq.h
>  create mode 100644 lib/lwq.c
> 
> diff --git a/include/linux/lwq.h b/include/linux/lwq.h
> new file mode 100644
> index 000000000000..52b9c81b493a
> --- /dev/null
> +++ b/include/linux/lwq.h
> @@ -0,0 +1,120 @@
> +/* SPDX-License-Identifier: GPL-2.0-only */
> +
> +#ifndef LWQ_H
> +#define LWQ_H
> +/*
> + * light-weight single-linked queue built from llist
> + *
> + * Entries can be enqueued from any context with no locking.
> + * Entries can be dequeued from process context with integrated locking.
> + */
> +#include <linux/container_of.h>
> +#include <linux/spinlock.h>
> +#include <linux/llist.h>
> +
> +struct lwq_node {
> +	struct llist_node node;
> +};
> +
> +struct lwq {
> +	spinlock_t		lock;
> +	struct llist_node	*ready;		/* entries to be dequeued */
> +	struct llist_head	new;		/* entries being enqueued */
> +};
> +
> +/**
> + * lwq_init - initialise a lwq
> + * @q:	the lwq object
> + */
> +static inline void lwq_init(struct lwq *q)
> +{
> +	spin_lock_init(&q->lock);
> +	q->ready = NULL;
> +	init_llist_head(&q->new);
> +}
> +
> +/**
> + * lwq_empty - test if lwq contains any entry
> + * @q:	the lwq object
> + *
> + * This empty test contains an acquire barrier so that if a wakeup
> + * is sent when lwq_dequeue returns true, it is safe to go to sleep after
> + * a test on lwq_empty().
> + */
> +static inline bool lwq_empty(struct lwq *q)
> +{
> +	/* acquire ensures ordering wrt lwq_enqueue() */
> +	return smp_load_acquire(&q->ready) == NULL && llist_empty(&q->new);
> +}
> +
> +struct llist_node *__lwq_dequeue(struct lwq *q);
> +/**
> + * lwq_dequeue - dequeue first (oldest) entry from lwq
> + * @q:		the queue to dequeue from
> + * @type:	the type of object to return
> + * @member:	them member in returned object which is an lwq_node.
> + *
> + * Remove a single object from the lwq and return it.  This will take
> + * a spinlock and so must always be called in the same context, typcially
> + * process contet.
> + */
> +#define lwq_dequeue(q, type, member)					\
> +	({ struct llist_node *_n = __lwq_dequeue(q);			\
> +	  _n ? container_of(_n, type, member.node) : NULL; })
> +
> +struct llist_node *lwq_dequeue_all(struct lwq *q);
> +
> +/**
> + * lwq_for_each_safe - iterate over detached queue allowing deletion
> + * @_n:		iterator variable
> + * @_t1:	temporary struct llist_node **
> + * @_t2:	temporary struct llist_node *
> + * @_l:		address of llist_node pointer from lwq_dequeue_all()
> + * @_member:	member in _n where lwq_node is found.
> + *
> + * Iterate over members in a dequeued list.  If the iterator variable
> + * is set to NULL, the iterator removes that entry from the queue.
> + */
> +#define lwq_for_each_safe(_n, _t1, _t2, _l, _member)			\
> +	for (_t1 = (_l);						\
> +	     *(_t1) ? (_n = container_of(*(_t1), typeof(*(_n)), _member.node),\
> +		       _t2 = ((*_t1)->next),				\
> +		       true)						\
> +	     : false;							\
> +	     (_n) ? (_t1 = &(_n)->_member.node.next, 0)			\
> +	     : ((*(_t1) = (_t2)),  0))
> +
> +/**
> + * lwq_enqueue - add a new item to the end of the queue
> + * @n	- the lwq_node embedded in the item to be added
> + * @q	- the lwq to append to.
> + *
> + * No locking is needed to append to the queue so this can
> + * be called from any context.
> + * Return %true is the list may have previously been empty.
> + */
> +static inline bool lwq_enqueue(struct lwq_node *n, struct lwq *q)
> +{
> +	/* acquire enqures ordering wrt lwq_dequeue */
> +	return llist_add(&n->node, &q->new) &&
> +		smp_load_acquire(&q->ready) == NULL;
> +}
> +
> +/**
> + * lwq_enqueue_batch - add a list of new items to the end of the queue
> + * @n	- the lwq_node embedded in the first item to be added
> + * @q	- the lwq to append to.
> + *
> + * No locking is needed to append to the queue so this can
> + * be called from any context.
> + * Return %true is the list may have previously been empty.
> + */
> +static inline bool lwq_enqueue_batch(struct llist_node *n, struct lwq *q)
> +{
> +	struct llist_node *e = n;
> +
> +	/* acquire enqures ordering wrt lwq_dequeue */
> +	return llist_add_batch(llist_reverse_order(n), e, &q->new) &&
> +		smp_load_acquire(&q->ready) == NULL;
> +}
> +#endif /* LWQ_H */
> diff --git a/lib/Kconfig b/lib/Kconfig
> index 5c2da561c516..6620bdba4f94 100644
> --- a/lib/Kconfig
> +++ b/lib/Kconfig
> @@ -763,3 +763,8 @@ config ASN1_ENCODER
>  
>  config POLYNOMIAL
>         tristate
> +
> +config LWQ_TEST
> +	bool "RPC: enable boot-time test for lwq queuing"

Since LWQ is no longer RPC specific, you can drop the "RPC: " from
the option's short description.


> +	help
> +          Enable boot-time test of lwq functionality.
> diff --git a/lib/Makefile b/lib/Makefile
> index 1ffae65bb7ee..4b67c2d6af62 100644
> --- a/lib/Makefile
> +++ b/lib/Makefile
> @@ -45,7 +45,7 @@ obj-y	+= lockref.o
>  obj-y += bcd.o sort.o parser.o debug_locks.o random32.o \
>  	 bust_spinlocks.o kasprintf.o bitmap.o scatterlist.o \
>  	 list_sort.o uuid.o iov_iter.o clz_ctz.o \
> -	 bsearch.o find_bit.o llist.o memweight.o kfifo.o \
> +	 bsearch.o find_bit.o llist.o lwq.o memweight.o kfifo.o \
>  	 percpu-refcount.o rhashtable.o base64.o \
>  	 once.o refcount.o rcuref.o usercopy.o errseq.o bucket_locks.o \
>  	 generic-radix-tree.o
> diff --git a/lib/lwq.c b/lib/lwq.c
> new file mode 100644
> index 000000000000..d6be6dda3867
> --- /dev/null
> +++ b/lib/lwq.c
> @@ -0,0 +1,149 @@
> +// SPDX-License-Identifier: GPL-2.0-only
> +/*
> + * Light weight single-linked queue.
> + *
> + * Entries are enqueued to the head of an llist, with no blocking.
> + * This can happen in any context.
> + *
> + * Entries are dequeued using a spinlock to protect against
> + * multiple access.  The llist is staged in reverse order, and refreshed
> + * from the llist when it exhausts.
> + */
> +#include <linux/rcupdate.h>
> +#include <linux/lwq.h>
> +
> +struct llist_node *__lwq_dequeue(struct lwq *q)
> +{
> +	struct llist_node *this;
> +
> +	if (lwq_empty(q))
> +		return NULL;
> +	spin_lock(&q->lock);
> +	this = q->ready;
> +	if (!this && !llist_empty(&q->new)) {
> +		/* ensure queue doesn't appear transiently lwq_empty */
> +		smp_store_release(&q->ready, (void *)1);
> +		this = llist_reverse_order(llist_del_all(&q->new));
> +		if (!this)
> +			q->ready = NULL;
> +	}
> +	if (this)
> +		q->ready = llist_next(this);
> +	spin_unlock(&q->lock);
> +	return this;
> +}
> +
> +/**
> + * lwq_dequeue_all - dequeue all currently enqueued objects
> + * @q:	the queue to dequeue from
> + *
> + * Remove and return a linked list of llist_nodes of all the objects that were
> + * in the queue. The first on the list will be the object that was least
> + * recently enqueued.
> + */
> +struct llist_node *lwq_dequeue_all(struct lwq *q)
> +{
> +	struct llist_node *r, *t, **ep;
> +
> +	if (lwq_empty(q))
> +		return NULL;
> +
> +	spin_lock(&q->lock);
> +	r = q->ready;
> +	q->ready = NULL;
> +	t = llist_del_all(&q->new);
> +	spin_unlock(&q->lock);
> +	ep = &r;
> +	while (*ep)
> +		ep = &(*ep)->next;
> +	*ep = llist_reverse_order(t);
> +	return r;
> +}
> +
> +#if IS_ENABLED(CONFIG_LWQ_TEST)
> +
> +#include <linux/module.h>
> +#include <linux/slab.h>
> +#include <linux/wait_bit.h>
> +#include <linux/kthread.h>
> +#include <linux/delay.h>
> +struct tnode {
> +	struct lwq_node n;
> +	int i;
> +	int c;
> +};
> +
> +static int lwq_exercise(void *qv)
> +{
> +	struct lwq *q = qv;
> +	int cnt;
> +	struct tnode *t;
> +
> +	for (cnt = 0; cnt < 10000; cnt++) {
> +		wait_var_event(q, (t = lwq_dequeue(q, struct tnode, n)) != NULL);
> +		t->c++;
> +		if (lwq_enqueue(&t->n, q))
> +			wake_up_var(q);
> +	}
> +	while (!kthread_should_stop())
> +		schedule_timeout_idle(1);
> +	return 0;
> +}
> +
> +static int lwq_test(void)
> +{
> +	int i;
> +	struct lwq q;
> +	struct llist_node *l, **t1, *t2;
> +	struct tnode *t;
> +	struct task_struct *threads[8];
> +
> +	printk(KERN_INFO "testing lwq....\n");
> +	lwq_init(&q);
> +	printk(KERN_INFO " lwq: run some threads\n");
> +	for (i = 0; i < ARRAY_SIZE(threads); i++)
> +		threads[i] = kthread_run(lwq_exercise, &q, "lwq-test-%d", i);
> +	for (i = 0; i < 100; i++) {
> +		t = kmalloc(sizeof(*t), GFP_KERNEL);
> +		t->i = i;
> +		t->c = 0;
> +		if (lwq_enqueue(&t->n, &q))
> +			wake_up_var(&q);
> +	};
> +	/* wait for threads to exit */
> +	for (i = 0; i < ARRAY_SIZE(threads); i++)
> +		if (!IS_ERR_OR_NULL(threads[i]))
> +			kthread_stop(threads[i]);
> +	printk(KERN_INFO " lwq: dequeue first 50:");
> +	for (i = 0; i < 50 ; i++) {
> +		if (i && (i % 10) == 0) {
> +			printk(KERN_CONT "\n");
> +			printk(KERN_INFO " lwq: ... ");
> +		}
> +		t = lwq_dequeue(&q, struct tnode, n);
> +		printk(KERN_CONT " %d(%d)", t->i, t->c);
> +		kfree(t);
> +	}
> +	printk(KERN_CONT "\n");
> +	l = lwq_dequeue_all(&q);
> +	printk(KERN_INFO " lwq: delete the multiples of 3 (test lwq_for_each_safe())\n");
> +	lwq_for_each_safe(t, t1, t2, &l, n) {
> +		if ((t->i % 3) == 0) {
> +			t->i = -1;
> +			kfree(t);
> +			t = NULL;
> +		}
> +	}
> +	if (l)
> +		lwq_enqueue_batch(l, &q);
> +	printk(KERN_INFO " lwq: dequeue remaining:");
> +	while ((t = lwq_dequeue(&q, struct tnode, n)) != NULL) {
> +		printk(KERN_CONT " %d", t->i);
> +		kfree(t);
> +	}
> +	printk(KERN_CONT "\n");
> +	return 0;
> +}
> +
> +module_init(lwq_test);
> +#endif /* CONFIG_LWQ_TEST*/
> -- 
> 2.41.0
> 

-- 
Chuck Lever

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH 05/10] lib: add light-weight queuing mechanism.
  2023-08-30  2:54 ` [PATCH 05/10] lib: add light-weight queuing mechanism NeilBrown
  2023-08-30 15:21   ` Chuck Lever
  2023-08-30 15:35   ` Chuck Lever
@ 2023-08-30 16:03   ` Chuck Lever
  2023-09-04  0:02     ` NeilBrown
  2 siblings, 1 reply; 19+ messages in thread
From: Chuck Lever @ 2023-08-30 16:03 UTC (permalink / raw)
  To: NeilBrown; +Cc: Jeff Layton, linux-nfs

On Wed, Aug 30, 2023 at 12:54:48PM +1000, NeilBrown wrote:
> lwq is a FIFO single-linked queue that only requires a spinlock
> for dequeueing, which happens in process context.  Enqueueing is atomic
> with no spinlock and can happen in any context.
> 
> Include a unit test for basic functionality - runs at boot time.  Does
> not use kunit framework.
> 
> Signed-off-by: NeilBrown <neilb@suse.de>
> ---
>  include/linux/lwq.h | 120 +++++++++++++++++++++++++++++++++++
>  lib/Kconfig         |   5 ++
>  lib/Makefile        |   2 +-
>  lib/lwq.c           | 149 ++++++++++++++++++++++++++++++++++++++++++++
>  4 files changed, 275 insertions(+), 1 deletion(-)
>  create mode 100644 include/linux/lwq.h
>  create mode 100644 lib/lwq.c
> 
> diff --git a/include/linux/lwq.h b/include/linux/lwq.h
> new file mode 100644
> index 000000000000..52b9c81b493a
> --- /dev/null
> +++ b/include/linux/lwq.h
> @@ -0,0 +1,120 @@
> +/* SPDX-License-Identifier: GPL-2.0-only */
> +
> +#ifndef LWQ_H
> +#define LWQ_H
> +/*
> + * light-weight single-linked queue built from llist
> + *
> + * Entries can be enqueued from any context with no locking.
> + * Entries can be dequeued from process context with integrated locking.
> + */
> +#include <linux/container_of.h>
> +#include <linux/spinlock.h>
> +#include <linux/llist.h>
> +
> +struct lwq_node {
> +	struct llist_node node;
> +};
> +
> +struct lwq {
> +	spinlock_t		lock;
> +	struct llist_node	*ready;		/* entries to be dequeued */
> +	struct llist_head	new;		/* entries being enqueued */
> +};
> +
> +/**
> + * lwq_init - initialise a lwq
> + * @q:	the lwq object
> + */
> +static inline void lwq_init(struct lwq *q)
> +{
> +	spin_lock_init(&q->lock);
> +	q->ready = NULL;
> +	init_llist_head(&q->new);
> +}
> +
> +/**
> + * lwq_empty - test if lwq contains any entry
> + * @q:	the lwq object
> + *
> + * This empty test contains an acquire barrier so that if a wakeup
> + * is sent when lwq_dequeue returns true, it is safe to go to sleep after
> + * a test on lwq_empty().
> + */
> +static inline bool lwq_empty(struct lwq *q)
> +{
> +	/* acquire ensures ordering wrt lwq_enqueue() */
> +	return smp_load_acquire(&q->ready) == NULL && llist_empty(&q->new);
> +}
> +
> +struct llist_node *__lwq_dequeue(struct lwq *q);
> +/**
> + * lwq_dequeue - dequeue first (oldest) entry from lwq
> + * @q:		the queue to dequeue from
> + * @type:	the type of object to return
> + * @member:	them member in returned object which is an lwq_node.
> + *
> + * Remove a single object from the lwq and return it.  This will take
> + * a spinlock and so must always be called in the same context, typcially
> + * process contet.
> + */
> +#define lwq_dequeue(q, type, member)					\
> +	({ struct llist_node *_n = __lwq_dequeue(q);			\
> +	  _n ? container_of(_n, type, member.node) : NULL; })
> +
> +struct llist_node *lwq_dequeue_all(struct lwq *q);
> +
> +/**
> + * lwq_for_each_safe - iterate over detached queue allowing deletion
> + * @_n:		iterator variable
> + * @_t1:	temporary struct llist_node **
> + * @_t2:	temporary struct llist_node *
> + * @_l:		address of llist_node pointer from lwq_dequeue_all()
> + * @_member:	member in _n where lwq_node is found.
> + *
> + * Iterate over members in a dequeued list.  If the iterator variable
> + * is set to NULL, the iterator removes that entry from the queue.
> + */
> +#define lwq_for_each_safe(_n, _t1, _t2, _l, _member)			\
> +	for (_t1 = (_l);						\
> +	     *(_t1) ? (_n = container_of(*(_t1), typeof(*(_n)), _member.node),\
> +		       _t2 = ((*_t1)->next),				\
> +		       true)						\
> +	     : false;							\
> +	     (_n) ? (_t1 = &(_n)->_member.node.next, 0)			\
> +	     : ((*(_t1) = (_t2)),  0))
> +
> +/**
> + * lwq_enqueue - add a new item to the end of the queue
> + * @n	- the lwq_node embedded in the item to be added
> + * @q	- the lwq to append to.
> + *
> + * No locking is needed to append to the queue so this can
> + * be called from any context.
> + * Return %true is the list may have previously been empty.
> + */
> +static inline bool lwq_enqueue(struct lwq_node *n, struct lwq *q)
> +{
> +	/* acquire enqures ordering wrt lwq_dequeue */
> +	return llist_add(&n->node, &q->new) &&
> +		smp_load_acquire(&q->ready) == NULL;
> +}
> +
> +/**
> + * lwq_enqueue_batch - add a list of new items to the end of the queue
> + * @n	- the lwq_node embedded in the first item to be added
> + * @q	- the lwq to append to.
> + *
> + * No locking is needed to append to the queue so this can
> + * be called from any context.
> + * Return %true is the list may have previously been empty.
> + */
> +static inline bool lwq_enqueue_batch(struct llist_node *n, struct lwq *q)
> +{
> +	struct llist_node *e = n;
> +
> +	/* acquire enqures ordering wrt lwq_dequeue */
> +	return llist_add_batch(llist_reverse_order(n), e, &q->new) &&
> +		smp_load_acquire(&q->ready) == NULL;
> +}
> +#endif /* LWQ_H */
> diff --git a/lib/Kconfig b/lib/Kconfig
> index 5c2da561c516..6620bdba4f94 100644
> --- a/lib/Kconfig
> +++ b/lib/Kconfig
> @@ -763,3 +763,8 @@ config ASN1_ENCODER
>  
>  config POLYNOMIAL
>         tristate
> +
> +config LWQ_TEST
> +	bool "RPC: enable boot-time test for lwq queuing"
> +	help
> +          Enable boot-time test of lwq functionality.
> diff --git a/lib/Makefile b/lib/Makefile
> index 1ffae65bb7ee..4b67c2d6af62 100644
> --- a/lib/Makefile
> +++ b/lib/Makefile
> @@ -45,7 +45,7 @@ obj-y	+= lockref.o
>  obj-y += bcd.o sort.o parser.o debug_locks.o random32.o \
>  	 bust_spinlocks.o kasprintf.o bitmap.o scatterlist.o \
>  	 list_sort.o uuid.o iov_iter.o clz_ctz.o \
> -	 bsearch.o find_bit.o llist.o memweight.o kfifo.o \
> +	 bsearch.o find_bit.o llist.o lwq.o memweight.o kfifo.o \
>  	 percpu-refcount.o rhashtable.o base64.o \
>  	 once.o refcount.o rcuref.o usercopy.o errseq.o bucket_locks.o \
>  	 generic-radix-tree.o
> diff --git a/lib/lwq.c b/lib/lwq.c
> new file mode 100644
> index 000000000000..d6be6dda3867
> --- /dev/null
> +++ b/lib/lwq.c
> @@ -0,0 +1,149 @@
> +// SPDX-License-Identifier: GPL-2.0-only
> +/*
> + * Light weight single-linked queue.
> + *
> + * Entries are enqueued to the head of an llist, with no blocking.
> + * This can happen in any context.
> + *
> + * Entries are dequeued using a spinlock to protect against
> + * multiple access.  The llist is staged in reverse order, and refreshed
> + * from the llist when it exhausts.
> + */
> +#include <linux/rcupdate.h>
> +#include <linux/lwq.h>
> +
> +struct llist_node *__lwq_dequeue(struct lwq *q)
> +{
> +	struct llist_node *this;
> +
> +	if (lwq_empty(q))
> +		return NULL;
> +	spin_lock(&q->lock);
> +	this = q->ready;
> +	if (!this && !llist_empty(&q->new)) {
> +		/* ensure queue doesn't appear transiently lwq_empty */
> +		smp_store_release(&q->ready, (void *)1);
> +		this = llist_reverse_order(llist_del_all(&q->new));
> +		if (!this)
> +			q->ready = NULL;
> +	}
> +	if (this)
> +		q->ready = llist_next(this);
> +	spin_unlock(&q->lock);
> +	return this;
> +}
> +
> +/**
> + * lwq_dequeue_all - dequeue all currently enqueued objects
> + * @q:	the queue to dequeue from
> + *
> + * Remove and return a linked list of llist_nodes of all the objects that were
> + * in the queue. The first on the list will be the object that was least
> + * recently enqueued.
> + */
> +struct llist_node *lwq_dequeue_all(struct lwq *q)
> +{
> +	struct llist_node *r, *t, **ep;
> +
> +	if (lwq_empty(q))
> +		return NULL;
> +
> +	spin_lock(&q->lock);
> +	r = q->ready;
> +	q->ready = NULL;
> +	t = llist_del_all(&q->new);
> +	spin_unlock(&q->lock);
> +	ep = &r;
> +	while (*ep)
> +		ep = &(*ep)->next;
> +	*ep = llist_reverse_order(t);
> +	return r;
> +}

ERROR: modpost: "lwq_dequeue_all" [net/sunrpc/sunrpc.ko] undefined!
ERROR: modpost: "__lwq_dequeue" [net/sunrpc/sunrpc.ko] undefined!
make[3]: *** [/home/cel/src/linux/even-releases/scripts/Makefile.modpost:144: Module.symvers] Error 1
make[2]: *** [/home/cel/src/linux/even-releases/Makefile:1984: modpost] Error 2
make[1]: *** [/home/cel/src/linux/even-releases/Makefile:234: __sub-make] Error 2
make: *** [Makefile:234: __sub-make] Error 2

You might need an EXPORT_SYMBOL_GPL or two now.


> +
> +#if IS_ENABLED(CONFIG_LWQ_TEST)
> +
> +#include <linux/module.h>
> +#include <linux/slab.h>
> +#include <linux/wait_bit.h>
> +#include <linux/kthread.h>
> +#include <linux/delay.h>
> +struct tnode {
> +	struct lwq_node n;
> +	int i;
> +	int c;
> +};
> +
> +static int lwq_exercise(void *qv)
> +{
> +	struct lwq *q = qv;
> +	int cnt;
> +	struct tnode *t;
> +
> +	for (cnt = 0; cnt < 10000; cnt++) {
> +		wait_var_event(q, (t = lwq_dequeue(q, struct tnode, n)) != NULL);
> +		t->c++;
> +		if (lwq_enqueue(&t->n, q))
> +			wake_up_var(q);
> +	}
> +	while (!kthread_should_stop())
> +		schedule_timeout_idle(1);
> +	return 0;
> +}
> +
> +static int lwq_test(void)
> +{
> +	int i;
> +	struct lwq q;
> +	struct llist_node *l, **t1, *t2;
> +	struct tnode *t;
> +	struct task_struct *threads[8];
> +
> +	printk(KERN_INFO "testing lwq....\n");
> +	lwq_init(&q);
> +	printk(KERN_INFO " lwq: run some threads\n");
> +	for (i = 0; i < ARRAY_SIZE(threads); i++)
> +		threads[i] = kthread_run(lwq_exercise, &q, "lwq-test-%d", i);
> +	for (i = 0; i < 100; i++) {
> +		t = kmalloc(sizeof(*t), GFP_KERNEL);
> +		t->i = i;
> +		t->c = 0;
> +		if (lwq_enqueue(&t->n, &q))
> +			wake_up_var(&q);
> +	};
> +	/* wait for threads to exit */
> +	for (i = 0; i < ARRAY_SIZE(threads); i++)
> +		if (!IS_ERR_OR_NULL(threads[i]))
> +			kthread_stop(threads[i]);
> +	printk(KERN_INFO " lwq: dequeue first 50:");
> +	for (i = 0; i < 50 ; i++) {
> +		if (i && (i % 10) == 0) {
> +			printk(KERN_CONT "\n");
> +			printk(KERN_INFO " lwq: ... ");
> +		}
> +		t = lwq_dequeue(&q, struct tnode, n);
> +		printk(KERN_CONT " %d(%d)", t->i, t->c);
> +		kfree(t);
> +	}
> +	printk(KERN_CONT "\n");
> +	l = lwq_dequeue_all(&q);
> +	printk(KERN_INFO " lwq: delete the multiples of 3 (test lwq_for_each_safe())\n");
> +	lwq_for_each_safe(t, t1, t2, &l, n) {
> +		if ((t->i % 3) == 0) {
> +			t->i = -1;
> +			kfree(t);
> +			t = NULL;
> +		}
> +	}
> +	if (l)
> +		lwq_enqueue_batch(l, &q);
> +	printk(KERN_INFO " lwq: dequeue remaining:");
> +	while ((t = lwq_dequeue(&q, struct tnode, n)) != NULL) {
> +		printk(KERN_CONT " %d", t->i);
> +		kfree(t);
> +	}
> +	printk(KERN_CONT "\n");
> +	return 0;
> +}
> +
> +module_init(lwq_test);
> +#endif /* CONFIG_LWQ_TEST*/
> -- 
> 2.41.0
> 

-- 
Chuck Lever

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH 05/10] lib: add light-weight queuing mechanism.
  2023-08-30 15:21   ` Chuck Lever
@ 2023-09-03 23:57     ` NeilBrown
  0 siblings, 0 replies; 19+ messages in thread
From: NeilBrown @ 2023-09-03 23:57 UTC (permalink / raw)
  To: Chuck Lever; +Cc: Jeff Layton, linux-nfs

On Thu, 31 Aug 2023, Chuck Lever wrote:
> On Wed, Aug 30, 2023 at 12:54:48PM +1000, NeilBrown wrote:
> > lwq is a FIFO single-linked queue that only requires a spinlock
> > for dequeueing, which happens in process context.  Enqueueing is atomic
> > with no spinlock and can happen in any context.
> > 
> > Include a unit test for basic functionality - runs at boot time.  Does
> > not use kunit framework.
> > 
> > Signed-off-by: NeilBrown <neilb@suse.de>
> > ---
> >  include/linux/lwq.h | 120 +++++++++++++++++++++++++++++++++++
> >  lib/Kconfig         |   5 ++
> >  lib/Makefile        |   2 +-
> >  lib/lwq.c           | 149 ++++++++++++++++++++++++++++++++++++++++++++
> >  4 files changed, 275 insertions(+), 1 deletion(-)
> >  create mode 100644 include/linux/lwq.h
> >  create mode 100644 lib/lwq.c
> 
> I've applied and/or squashed the previous four and pushed.

Thanks.

> 
> I don't have any specific complaints on this one, but checkpatch
> throws about 20 warnings. Some of those you might want to deal with
> or just ignore. Up to you, but I'll hold off on applying it until I
> hear from you.

There are 5 "Avoid logging continuation" warnings that I cannot avoid.
11 "Prefer FOO_{cont,info}(..) to printk" warnings that I don't think
are relevant.  There is no "FOO" that is appropriate, and other testing
code just uses printk.
There is one "added file - does MAINTAINERS need updating?" warning.
I don't know that we need a MAINTAINER for each little lib file (??)
There is one "write a better help paragraph" warning, but I cannot
think of anything useful to add,
And 2 "memory barrier without comment" warnings where there *is* a
comment, but it is one line to far away.

So I don't want to fix any of those warnings. - thanks.

> 
> Also, I'm trying to collect a set of potential reviewers for it:
> 
> [cel@bazille even-releases]$ scripts/get_maintainer.pl lib/
> Andrew Morton <akpm@linux-foundation.org> (commit_signer:206/523=39%)
> "Liam R. Howlett" <Liam.Howlett@oracle.com> (commit_signer:89/523=17%,authored:61/523=12%)
> Kees Cook <keescook@chromium.org> (commit_signer:48/523=9%)
> Greg Kroah-Hartman <gregkh@linuxfoundation.org> (commit_signer:48/523=9%)
> David Gow <davidgow@google.com> (commit_signer:43/523=8%)
> linux-kernel@vger.kernel.org (open list)
> [cel@bazille even-releases]$
> 
> Is that a reasonable set to add as Cc's?

It would be hard to do better.  I had a look at history and it is mostly
drive-by stuff.  A few have been funnelled through Andrew Morton because
he is willing to take most things that don't have any other home.
I doubt we'll get good review - but I've been surprised before.

Thanks,
NeilBrown

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH 05/10] lib: add light-weight queuing mechanism.
  2023-08-30 15:35   ` Chuck Lever
@ 2023-09-03 23:59     ` NeilBrown
  0 siblings, 0 replies; 19+ messages in thread
From: NeilBrown @ 2023-09-03 23:59 UTC (permalink / raw)
  To: Chuck Lever; +Cc: Jeff Layton, linux-nfs

On Thu, 31 Aug 2023, Chuck Lever wrote:
> On Wed, Aug 30, 2023 at 12:54:48PM +1000, NeilBrown wrote:
> > lwq is a FIFO single-linked queue that only requires a spinlock
> > for dequeueing, which happens in process context.  Enqueueing is atomic
> > with no spinlock and can happen in any context.
> > 
> > Include a unit test for basic functionality - runs at boot time.  Does
> > not use kunit framework.
> > 
> > Signed-off-by: NeilBrown <neilb@suse.de>
> > ---
> >  include/linux/lwq.h | 120 +++++++++++++++++++++++++++++++++++
> >  lib/Kconfig         |   5 ++
> >  lib/Makefile        |   2 +-
> >  lib/lwq.c           | 149 ++++++++++++++++++++++++++++++++++++++++++++
> >  4 files changed, 275 insertions(+), 1 deletion(-)
> >  create mode 100644 include/linux/lwq.h
> >  create mode 100644 lib/lwq.c
> > 
> > diff --git a/include/linux/lwq.h b/include/linux/lwq.h
> > new file mode 100644
> > index 000000000000..52b9c81b493a
> > --- /dev/null
> > +++ b/include/linux/lwq.h
> > @@ -0,0 +1,120 @@
> > +/* SPDX-License-Identifier: GPL-2.0-only */
> > +
> > +#ifndef LWQ_H
> > +#define LWQ_H
> > +/*
> > + * light-weight single-linked queue built from llist
> > + *
> > + * Entries can be enqueued from any context with no locking.
> > + * Entries can be dequeued from process context with integrated locking.
> > + */
> > +#include <linux/container_of.h>
> > +#include <linux/spinlock.h>
> > +#include <linux/llist.h>
> > +
> > +struct lwq_node {
> > +	struct llist_node node;
> > +};
> > +
> > +struct lwq {
> > +	spinlock_t		lock;
> > +	struct llist_node	*ready;		/* entries to be dequeued */
> > +	struct llist_head	new;		/* entries being enqueued */
> > +};
> > +
> > +/**
> > + * lwq_init - initialise a lwq
> > + * @q:	the lwq object
> > + */
> > +static inline void lwq_init(struct lwq *q)
> > +{
> > +	spin_lock_init(&q->lock);
> > +	q->ready = NULL;
> > +	init_llist_head(&q->new);
> > +}
> > +
> > +/**
> > + * lwq_empty - test if lwq contains any entry
> > + * @q:	the lwq object
> > + *
> > + * This empty test contains an acquire barrier so that if a wakeup
> > + * is sent when lwq_dequeue returns true, it is safe to go to sleep after
> > + * a test on lwq_empty().
> > + */
> > +static inline bool lwq_empty(struct lwq *q)
> > +{
> > +	/* acquire ensures ordering wrt lwq_enqueue() */
> > +	return smp_load_acquire(&q->ready) == NULL && llist_empty(&q->new);
> > +}
> > +
> > +struct llist_node *__lwq_dequeue(struct lwq *q);
> > +/**
> > + * lwq_dequeue - dequeue first (oldest) entry from lwq
> > + * @q:		the queue to dequeue from
> > + * @type:	the type of object to return
> > + * @member:	them member in returned object which is an lwq_node.
> > + *
> > + * Remove a single object from the lwq and return it.  This will take
> > + * a spinlock and so must always be called in the same context, typcially
> > + * process contet.
> > + */
> > +#define lwq_dequeue(q, type, member)					\
> > +	({ struct llist_node *_n = __lwq_dequeue(q);			\
> > +	  _n ? container_of(_n, type, member.node) : NULL; })
> > +
> > +struct llist_node *lwq_dequeue_all(struct lwq *q);
> > +
> > +/**
> > + * lwq_for_each_safe - iterate over detached queue allowing deletion
> > + * @_n:		iterator variable
> > + * @_t1:	temporary struct llist_node **
> > + * @_t2:	temporary struct llist_node *
> > + * @_l:		address of llist_node pointer from lwq_dequeue_all()
> > + * @_member:	member in _n where lwq_node is found.
> > + *
> > + * Iterate over members in a dequeued list.  If the iterator variable
> > + * is set to NULL, the iterator removes that entry from the queue.
> > + */
> > +#define lwq_for_each_safe(_n, _t1, _t2, _l, _member)			\
> > +	for (_t1 = (_l);						\
> > +	     *(_t1) ? (_n = container_of(*(_t1), typeof(*(_n)), _member.node),\
> > +		       _t2 = ((*_t1)->next),				\
> > +		       true)						\
> > +	     : false;							\
> > +	     (_n) ? (_t1 = &(_n)->_member.node.next, 0)			\
> > +	     : ((*(_t1) = (_t2)),  0))
> > +
> > +/**
> > + * lwq_enqueue - add a new item to the end of the queue
> > + * @n	- the lwq_node embedded in the item to be added
> > + * @q	- the lwq to append to.
> > + *
> > + * No locking is needed to append to the queue so this can
> > + * be called from any context.
> > + * Return %true is the list may have previously been empty.
> > + */
> > +static inline bool lwq_enqueue(struct lwq_node *n, struct lwq *q)
> > +{
> > +	/* acquire enqures ordering wrt lwq_dequeue */
> > +	return llist_add(&n->node, &q->new) &&
> > +		smp_load_acquire(&q->ready) == NULL;
> > +}
> > +
> > +/**
> > + * lwq_enqueue_batch - add a list of new items to the end of the queue
> > + * @n	- the lwq_node embedded in the first item to be added
> > + * @q	- the lwq to append to.
> > + *
> > + * No locking is needed to append to the queue so this can
> > + * be called from any context.
> > + * Return %true is the list may have previously been empty.
> > + */
> > +static inline bool lwq_enqueue_batch(struct llist_node *n, struct lwq *q)
> > +{
> > +	struct llist_node *e = n;
> > +
> > +	/* acquire enqures ordering wrt lwq_dequeue */
> > +	return llist_add_batch(llist_reverse_order(n), e, &q->new) &&
> > +		smp_load_acquire(&q->ready) == NULL;
> > +}
> > +#endif /* LWQ_H */
> > diff --git a/lib/Kconfig b/lib/Kconfig
> > index 5c2da561c516..6620bdba4f94 100644
> > --- a/lib/Kconfig
> > +++ b/lib/Kconfig
> > @@ -763,3 +763,8 @@ config ASN1_ENCODER
> >  
> >  config POLYNOMIAL
> >         tristate
> > +
> > +config LWQ_TEST
> > +	bool "RPC: enable boot-time test for lwq queuing"
> 
> Since LWQ is no longer RPC specific, you can drop the "RPC: " from
> the option's short description.

Thanks.  I changed "RPC" to "lib" locally.  If I need to resend that
will be included, or you could just make the change if nothing else
turns up.

NeilBrown

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH 05/10] lib: add light-weight queuing mechanism.
  2023-08-30 16:03   ` Chuck Lever
@ 2023-09-04  0:02     ` NeilBrown
  0 siblings, 0 replies; 19+ messages in thread
From: NeilBrown @ 2023-09-04  0:02 UTC (permalink / raw)
  To: Chuck Lever; +Cc: Jeff Layton, linux-nfs

On Thu, 31 Aug 2023, Chuck Lever wrote:
> On Wed, Aug 30, 2023 at 12:54:48PM +1000, NeilBrown wrote:
> > lwq is a FIFO single-linked queue that only requires a spinlock
> > for dequeueing, which happens in process context.  Enqueueing is atomic
> > with no spinlock and can happen in any context.
> > 
> > Include a unit test for basic functionality - runs at boot time.  Does
> > not use kunit framework.
> > 
> > Signed-off-by: NeilBrown <neilb@suse.de>
> > ---
> >  include/linux/lwq.h | 120 +++++++++++++++++++++++++++++++++++
> >  lib/Kconfig         |   5 ++
> >  lib/Makefile        |   2 +-
> >  lib/lwq.c           | 149 ++++++++++++++++++++++++++++++++++++++++++++
> >  4 files changed, 275 insertions(+), 1 deletion(-)
> >  create mode 100644 include/linux/lwq.h
> >  create mode 100644 lib/lwq.c
> > 
> > diff --git a/include/linux/lwq.h b/include/linux/lwq.h
> > new file mode 100644
> > index 000000000000..52b9c81b493a
> > --- /dev/null
> > +++ b/include/linux/lwq.h
> > @@ -0,0 +1,120 @@
> > +/* SPDX-License-Identifier: GPL-2.0-only */
> > +
> > +#ifndef LWQ_H
> > +#define LWQ_H
> > +/*
> > + * light-weight single-linked queue built from llist
> > + *
> > + * Entries can be enqueued from any context with no locking.
> > + * Entries can be dequeued from process context with integrated locking.
> > + */
> > +#include <linux/container_of.h>
> > +#include <linux/spinlock.h>
> > +#include <linux/llist.h>
> > +
> > +struct lwq_node {
> > +	struct llist_node node;
> > +};
> > +
> > +struct lwq {
> > +	spinlock_t		lock;
> > +	struct llist_node	*ready;		/* entries to be dequeued */
> > +	struct llist_head	new;		/* entries being enqueued */
> > +};
> > +
> > +/**
> > + * lwq_init - initialise a lwq
> > + * @q:	the lwq object
> > + */
> > +static inline void lwq_init(struct lwq *q)
> > +{
> > +	spin_lock_init(&q->lock);
> > +	q->ready = NULL;
> > +	init_llist_head(&q->new);
> > +}
> > +
> > +/**
> > + * lwq_empty - test if lwq contains any entry
> > + * @q:	the lwq object
> > + *
> > + * This empty test contains an acquire barrier so that if a wakeup
> > + * is sent when lwq_dequeue returns true, it is safe to go to sleep after
> > + * a test on lwq_empty().
> > + */
> > +static inline bool lwq_empty(struct lwq *q)
> > +{
> > +	/* acquire ensures ordering wrt lwq_enqueue() */
> > +	return smp_load_acquire(&q->ready) == NULL && llist_empty(&q->new);
> > +}
> > +
> > +struct llist_node *__lwq_dequeue(struct lwq *q);
> > +/**
> > + * lwq_dequeue - dequeue first (oldest) entry from lwq
> > + * @q:		the queue to dequeue from
> > + * @type:	the type of object to return
> > + * @member:	them member in returned object which is an lwq_node.
> > + *
> > + * Remove a single object from the lwq and return it.  This will take
> > + * a spinlock and so must always be called in the same context, typcially
> > + * process contet.
> > + */
> > +#define lwq_dequeue(q, type, member)					\
> > +	({ struct llist_node *_n = __lwq_dequeue(q);			\
> > +	  _n ? container_of(_n, type, member.node) : NULL; })
> > +
> > +struct llist_node *lwq_dequeue_all(struct lwq *q);
> > +
> > +/**
> > + * lwq_for_each_safe - iterate over detached queue allowing deletion
> > + * @_n:		iterator variable
> > + * @_t1:	temporary struct llist_node **
> > + * @_t2:	temporary struct llist_node *
> > + * @_l:		address of llist_node pointer from lwq_dequeue_all()
> > + * @_member:	member in _n where lwq_node is found.
> > + *
> > + * Iterate over members in a dequeued list.  If the iterator variable
> > + * is set to NULL, the iterator removes that entry from the queue.
> > + */
> > +#define lwq_for_each_safe(_n, _t1, _t2, _l, _member)			\
> > +	for (_t1 = (_l);						\
> > +	     *(_t1) ? (_n = container_of(*(_t1), typeof(*(_n)), _member.node),\
> > +		       _t2 = ((*_t1)->next),				\
> > +		       true)						\
> > +	     : false;							\
> > +	     (_n) ? (_t1 = &(_n)->_member.node.next, 0)			\
> > +	     : ((*(_t1) = (_t2)),  0))
> > +
> > +/**
> > + * lwq_enqueue - add a new item to the end of the queue
> > + * @n	- the lwq_node embedded in the item to be added
> > + * @q	- the lwq to append to.
> > + *
> > + * No locking is needed to append to the queue so this can
> > + * be called from any context.
> > + * Return %true is the list may have previously been empty.
> > + */
> > +static inline bool lwq_enqueue(struct lwq_node *n, struct lwq *q)
> > +{
> > +	/* acquire enqures ordering wrt lwq_dequeue */
> > +	return llist_add(&n->node, &q->new) &&
> > +		smp_load_acquire(&q->ready) == NULL;
> > +}
> > +
> > +/**
> > + * lwq_enqueue_batch - add a list of new items to the end of the queue
> > + * @n	- the lwq_node embedded in the first item to be added
> > + * @q	- the lwq to append to.
> > + *
> > + * No locking is needed to append to the queue so this can
> > + * be called from any context.
> > + * Return %true is the list may have previously been empty.
> > + */
> > +static inline bool lwq_enqueue_batch(struct llist_node *n, struct lwq *q)
> > +{
> > +	struct llist_node *e = n;
> > +
> > +	/* acquire enqures ordering wrt lwq_dequeue */
> > +	return llist_add_batch(llist_reverse_order(n), e, &q->new) &&
> > +		smp_load_acquire(&q->ready) == NULL;
> > +}
> > +#endif /* LWQ_H */
> > diff --git a/lib/Kconfig b/lib/Kconfig
> > index 5c2da561c516..6620bdba4f94 100644
> > --- a/lib/Kconfig
> > +++ b/lib/Kconfig
> > @@ -763,3 +763,8 @@ config ASN1_ENCODER
> >  
> >  config POLYNOMIAL
> >         tristate
> > +
> > +config LWQ_TEST
> > +	bool "RPC: enable boot-time test for lwq queuing"
> > +	help
> > +          Enable boot-time test of lwq functionality.
> > diff --git a/lib/Makefile b/lib/Makefile
> > index 1ffae65bb7ee..4b67c2d6af62 100644
> > --- a/lib/Makefile
> > +++ b/lib/Makefile
> > @@ -45,7 +45,7 @@ obj-y	+= lockref.o
> >  obj-y += bcd.o sort.o parser.o debug_locks.o random32.o \
> >  	 bust_spinlocks.o kasprintf.o bitmap.o scatterlist.o \
> >  	 list_sort.o uuid.o iov_iter.o clz_ctz.o \
> > -	 bsearch.o find_bit.o llist.o memweight.o kfifo.o \
> > +	 bsearch.o find_bit.o llist.o lwq.o memweight.o kfifo.o \
> >  	 percpu-refcount.o rhashtable.o base64.o \
> >  	 once.o refcount.o rcuref.o usercopy.o errseq.o bucket_locks.o \
> >  	 generic-radix-tree.o
> > diff --git a/lib/lwq.c b/lib/lwq.c
> > new file mode 100644
> > index 000000000000..d6be6dda3867
> > --- /dev/null
> > +++ b/lib/lwq.c
> > @@ -0,0 +1,149 @@
> > +// SPDX-License-Identifier: GPL-2.0-only
> > +/*
> > + * Light weight single-linked queue.
> > + *
> > + * Entries are enqueued to the head of an llist, with no blocking.
> > + * This can happen in any context.
> > + *
> > + * Entries are dequeued using a spinlock to protect against
> > + * multiple access.  The llist is staged in reverse order, and refreshed
> > + * from the llist when it exhausts.
> > + */
> > +#include <linux/rcupdate.h>
> > +#include <linux/lwq.h>
> > +
> > +struct llist_node *__lwq_dequeue(struct lwq *q)
> > +{
> > +	struct llist_node *this;
> > +
> > +	if (lwq_empty(q))
> > +		return NULL;
> > +	spin_lock(&q->lock);
> > +	this = q->ready;
> > +	if (!this && !llist_empty(&q->new)) {
> > +		/* ensure queue doesn't appear transiently lwq_empty */
> > +		smp_store_release(&q->ready, (void *)1);
> > +		this = llist_reverse_order(llist_del_all(&q->new));
> > +		if (!this)
> > +			q->ready = NULL;
> > +	}
> > +	if (this)
> > +		q->ready = llist_next(this);
> > +	spin_unlock(&q->lock);
> > +	return this;
> > +}
> > +
> > +/**
> > + * lwq_dequeue_all - dequeue all currently enqueued objects
> > + * @q:	the queue to dequeue from
> > + *
> > + * Remove and return a linked list of llist_nodes of all the objects that were
> > + * in the queue. The first on the list will be the object that was least
> > + * recently enqueued.
> > + */
> > +struct llist_node *lwq_dequeue_all(struct lwq *q)
> > +{
> > +	struct llist_node *r, *t, **ep;
> > +
> > +	if (lwq_empty(q))
> > +		return NULL;
> > +
> > +	spin_lock(&q->lock);
> > +	r = q->ready;
> > +	q->ready = NULL;
> > +	t = llist_del_all(&q->new);
> > +	spin_unlock(&q->lock);
> > +	ep = &r;
> > +	while (*ep)
> > +		ep = &(*ep)->next;
> > +	*ep = llist_reverse_order(t);
> > +	return r;
> > +}
> 
> ERROR: modpost: "lwq_dequeue_all" [net/sunrpc/sunrpc.ko] undefined!
> ERROR: modpost: "__lwq_dequeue" [net/sunrpc/sunrpc.ko] undefined!
> make[3]: *** [/home/cel/src/linux/even-releases/scripts/Makefile.modpost:144: Module.symvers] Error 1
> make[2]: *** [/home/cel/src/linux/even-releases/Makefile:1984: modpost] Error 2
> make[1]: *** [/home/cel/src/linux/even-releases/Makefile:234: __sub-make] Error 2
> make: *** [Makefile:234: __sub-make] Error 2
> 
> You might need an EXPORT_SYMBOL_GPL or two now.

:-)  It seems something else did turn up..

Yes,
+EXPORT_SYMBOL_GPL(__lwq_dequeue);
...
+EXPORT_SYMBOL_GPL(lwq_dequeue_all);

should be enough.

Thanks,
NeilBrown

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH 06/10] SUNRPC: only have one thread waking up at a time
  2023-08-30 15:28   ` Chuck Lever
@ 2023-09-04  0:35     ` NeilBrown
  0 siblings, 0 replies; 19+ messages in thread
From: NeilBrown @ 2023-09-04  0:35 UTC (permalink / raw)
  To: Chuck Lever; +Cc: Jeff Layton, linux-nfs

On Thu, 31 Aug 2023, Chuck Lever wrote:
> On Wed, Aug 30, 2023 at 12:54:49PM +1000, NeilBrown wrote:
> > Currently if several items of work become available in quick succession,
> > that number of threads (if available) will be woken.  By the time some
> > of them wake up another thread that was already cache-warm might have
> > come along and completed the work.  Anecdotal evidence suggests as many
> > as 15% of wakes find nothing to do once they get to the point of
> > looking.
> > 
> > This patch changes svc_pool_wake_idle_thread() to wake the first thread
> > on the queue but NOT remove it.  Subsequent calls will wake the same
> > thread.  Once that thread starts it will dequeue itself and after
> > dequeueing some work to do, it will wake the next thread if there is more
> > work ready.  This results in a more orderly increase in the number of
> > busy threads.
> > 
> > As a bonus, this allows us to reduce locking around the idle queue.
> > svc_pool_wake_idle_thread() no longer needs to take a lock (beyond
> > rcu_read_lock()) as it doesn't manipulate the queue, it just looks at
> > the first item.
> > 
> > The thread itself can avoid locking by using the new
> > llist_del_first_this() interface.  This will safely remove the thread
> > itself if it is the head.  If it isn't the head, it will do nothing.
> > If multiple threads call this concurrently only one will succeed.  The
> > others will do nothing, so no corruption can result.
> > 
> > If a thread wakes up and finds that it cannot dequeue itself that means
> > either
> > - that it wasn't woken because it was the head of the queue.  Maybe the
> >   freezer woke it.  In that case it can go back to sleep (after trying
> >   to freeze of course).
> > - some other thread found there was nothing to do very recently, and
> >   placed itself on the head of the queue in front of this thread.
> >   It must check again after placing itself there, so it can be deemed to
> >   be responsible for any pending work, and this thread can go back to
> >   sleep until woken.
> > 
> > No code ever tests for busy threads any more.  Only each thread itself
> > cares if it is busy.  So svc_thread_busy() is no longer needed.
> > 
> > Signed-off-by: NeilBrown <neilb@suse.de>
> > ---
> >  include/linux/sunrpc/svc.h | 11 -----------
> >  net/sunrpc/svc.c           | 14 ++++++--------
> >  net/sunrpc/svc_xprt.c      | 35 ++++++++++++++++++++++-------------
> >  3 files changed, 28 insertions(+), 32 deletions(-)
> > 
> > diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
> > index ad4572630335..dafa362b4fdd 100644
> > --- a/include/linux/sunrpc/svc.h
> > +++ b/include/linux/sunrpc/svc.h
> > @@ -266,17 +266,6 @@ enum {
> >  	RQ_DATA,		/* request has data */
> >  };
> >  
> > -/**
> > - * svc_thread_busy - check if a thread as busy
> > - * @rqstp: the thread which might be busy
> > - *
> > - * A thread is only busy when it is not an the idle list.
> > - */
> > -static inline bool svc_thread_busy(const struct svc_rqst *rqstp)
> > -{
> > -	return !llist_on_list(&rqstp->rq_idle);
> > -}
> > -
> >  #define SVC_NET(rqst) (rqst->rq_xprt ? rqst->rq_xprt->xpt_net : rqst->rq_bc_net)
> >  
> >  /*
> > diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
> > index 5673f30db295..3267d740235e 100644
> > --- a/net/sunrpc/svc.c
> > +++ b/net/sunrpc/svc.c
> > @@ -642,7 +642,6 @@ svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node)
> >  
> >  	folio_batch_init(&rqstp->rq_fbatch);
> >  
> > -	init_llist_node(&rqstp->rq_idle);
> >  	rqstp->rq_server = serv;
> >  	rqstp->rq_pool = pool;
> >  
> > @@ -704,17 +703,16 @@ void svc_pool_wake_idle_thread(struct svc_pool *pool)
> >  	struct llist_node *ln;
> >  
> >  	rcu_read_lock();
> > -	spin_lock_bh(&pool->sp_lock);
> > -	ln = llist_del_first_init(&pool->sp_idle_threads);
> > -	spin_unlock_bh(&pool->sp_lock);
> > +	ln = READ_ONCE(pool->sp_idle_threads.first);
> >  	if (ln) {
> >  		rqstp = llist_entry(ln, struct svc_rqst, rq_idle);
> > -
> >  		WRITE_ONCE(rqstp->rq_qtime, ktime_get());
> > -		wake_up_process(rqstp->rq_task);
> > +		if (!task_is_running(rqstp->rq_task)) {
> > +			wake_up_process(rqstp->rq_task);
> > +			trace_svc_wake_up(rqstp->rq_task->pid);
> > +			percpu_counter_inc(&pool->sp_threads_woken);
> > +		}
> >  		rcu_read_unlock();
> > -		percpu_counter_inc(&pool->sp_threads_woken);
> > -		trace_svc_wake_up(rqstp->rq_task->pid);
> >  		return;
> >  	}
> >  	rcu_read_unlock();
> > diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
> > index 17c43bde35c9..a51570a4cbf2 100644
> > --- a/net/sunrpc/svc_xprt.c
> > +++ b/net/sunrpc/svc_xprt.c
> > @@ -732,20 +732,16 @@ static void svc_rqst_wait_for_work(struct svc_rqst *rqstp)
> >  	if (rqst_should_sleep(rqstp)) {
> >  		set_current_state(TASK_IDLE | TASK_FREEZABLE);
> >  		llist_add(&rqstp->rq_idle, &pool->sp_idle_threads);
> > +		if (likely(rqst_should_sleep(rqstp)))
> > +			schedule();
> >  
> > -		if (unlikely(!rqst_should_sleep(rqstp)))
> > -			/* Work just became available.  This thread cannot simply
> > -			 * choose not to sleep as it *must* wait until removed.
> > -			 * So wake the first waiter - whether it is this
> > -			 * thread or some other, it will get the work done.
> > +		while (!llist_del_first_this(&pool->sp_idle_threads,
> > +					     &rqstp->rq_idle)) {
> > +			/* Cannot @rqstp from idle list, so some other thread
> 
> I was not aware that "@rqstp" was a verb.  ;-)
> 
> Maybe the nice new comment that you are deleting just above here
> would be appropriate to move here.
> 
> 
> > +			 * must have queued itself after finding
> > +			 * no work to do, so they have taken responsibility
> > +			 * for any outstanding work.
> >  			 */
> > -			svc_pool_wake_idle_thread(pool);
> > -
> > -		/* Since a thread cannot remove itself from an llist,
> > -		 * schedule until someone else removes @rqstp from
> > -		 * the idle list.
> > -		 */
> > -		while (!svc_thread_busy(rqstp)) {
> >  			schedule();
> >  			set_current_state(TASK_IDLE | TASK_FREEZABLE);
> >  		}
> > @@ -835,6 +831,15 @@ static void svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt)
> >  	svc_xprt_release(rqstp);
> >  }
> >  
> > +static void wake_next(struct svc_rqst *rqstp)
> 
> Nit: I would prefer a subsystem-specific name for this little guy.
> Makes it a little easier to distinguish from generic scheduler
> functions when looking at perf output.
> 
> How about "svc_thread_wake_next" ?
> 
> 
> > +{
> > +	if (!rqst_should_sleep(rqstp))
> 
> rqst_should_sleep() should also get a better name IMO, but that
> helper was added many patches ago. If you agree to a change, I can
> do that surgery.

What new name are you suggesting?  svc_rqst_should_sleep()?
I'm happy for you to change it to anything that you think is an
improvement, and to do the surgery.

I'll address the eariler comments and resend at least this patch.

Thanks,
NeilBrown


> 
> 
> > +		/* More work pending after I dequeued some,
> > +		 * wake another worker
> > +		 */
> > +		svc_pool_wake_idle_thread(rqstp->rq_pool);
> > +}
> > +
> >  /**
> >   * svc_recv - Receive and process the next request on any transport
> >   * @rqstp: an idle RPC service thread
> > @@ -854,13 +859,16 @@ void svc_recv(struct svc_rqst *rqstp)
> >  
> >  	clear_bit(SP_TASK_PENDING, &pool->sp_flags);
> >  
> > -	if (svc_thread_should_stop(rqstp))
> > +	if (svc_thread_should_stop(rqstp)) {
> > +		wake_next(rqstp);
> >  		return;
> > +	}
> >  
> >  	rqstp->rq_xprt = svc_xprt_dequeue(pool);
> >  	if (rqstp->rq_xprt) {
> >  		struct svc_xprt *xprt = rqstp->rq_xprt;
> >  
> > +		wake_next(rqstp);
> >  		/* Normally we will wait up to 5 seconds for any required
> >  		 * cache information to be provided.  When there are no
> >  		 * idle threads, we reduce the wait time.
> > @@ -885,6 +893,7 @@ void svc_recv(struct svc_rqst *rqstp)
> >  		if (req) {
> >  			list_del(&req->rq_bc_list);
> >  			spin_unlock_bh(&serv->sv_cb_lock);
> > +			wake_next(rqstp);
> >  
> >  			svc_process_bc(req, rqstp);
> >  			return;
> > -- 
> > 2.41.0
> > 
> 
> -- 
> Chuck Lever
> 


^ permalink raw reply	[flat|nested] 19+ messages in thread

end of thread, other threads:[~2023-09-04  0:35 UTC | newest]

Thread overview: 19+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-08-30  2:54 [PATCH 00/10] SUNRPC thread management changes NeilBrown
2023-08-30  2:54 ` [PATCH 01/10] SQUASH: revise comments in SUNRPC: change service idle list to be an llist NeilBrown
2023-08-30  2:54 ` [PATCH 02/10] llist: add interface to check if a node is on a list NeilBrown
2023-08-30  2:54 ` [PATCH 03/10] SQUASH use new llist interfaces in SUNRPC: change service idle list to be an llist NeilBrown
2023-08-30  2:54 ` [PATCH 04/10] llist: add llist_del_first_this() NeilBrown
2023-08-30  2:54 ` [PATCH 05/10] lib: add light-weight queuing mechanism NeilBrown
2023-08-30 15:21   ` Chuck Lever
2023-09-03 23:57     ` NeilBrown
2023-08-30 15:35   ` Chuck Lever
2023-09-03 23:59     ` NeilBrown
2023-08-30 16:03   ` Chuck Lever
2023-09-04  0:02     ` NeilBrown
2023-08-30  2:54 ` [PATCH 06/10] SUNRPC: only have one thread waking up at a time NeilBrown
2023-08-30 15:28   ` Chuck Lever
2023-09-04  0:35     ` NeilBrown
2023-08-30  2:54 ` [PATCH 07/10] SUNRPC: use lwq for sp_sockets - renamed to sp_xprts NeilBrown
2023-08-30  2:54 ` [PATCH 08/10] SUNRPC: change sp_nrthreads to atomic_t NeilBrown
2023-08-30  2:54 ` [PATCH 09/10] SUNRPC: discard sp_lock NeilBrown
2023-08-30  2:54 ` [PATCH 10/10] SUNRPC: change the back-channel queue to lwq NeilBrown

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).