All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH 0/6] loop: cleanup charging io to mem/blkcg
@ 2021-07-05 10:26 Ming Lei
  2021-07-05 10:26 ` [PATCH 1/6] loop: clean up blkcg association Ming Lei
                   ` (5 more replies)
  0 siblings, 6 replies; 22+ messages in thread
From: Ming Lei @ 2021-07-05 10:26 UTC (permalink / raw)
  To: Jens Axboe
  Cc: linux-block, Christoph Hellwig, Ming Lei, Michal Koutný,
	Dan Schatzberg

Hello Guys,

Cleanup charging io to mem/blkcg a bit:

- avoid to store blkcg_css/memcg_css in loop_cmd, and store blkcg_css in
loop_worker instead

- avoid to acquire ->lo_work_lock in IO path

- simplify blkcg_css query via xarray

- other misc cleanup



Ming Lei (6):
  loop: clean up blkcg association
  loop: conver timer for monitoring idle worker into dwork
  loop: add __loop_free_idle_workers() for covering freeing workers in
    clearing FD
  loop: improve loop_process_work
  loop: use xarray to store workers
  loop: don't add worker into idle list

 drivers/block/loop.c | 284 +++++++++++++++++++++++--------------------
 drivers/block/loop.h |   7 +-
 2 files changed, 153 insertions(+), 138 deletions(-)

Cc: Michal Koutný <mkoutny@suse.com>
Cc: Dan Schatzberg <schatzberg.dan@gmail.com>


-- 
2.31.1


^ permalink raw reply	[flat|nested] 22+ messages in thread

* [PATCH 1/6] loop: clean up blkcg association
  2021-07-05 10:26 [PATCH 0/6] loop: cleanup charging io to mem/blkcg Ming Lei
@ 2021-07-05 10:26 ` Ming Lei
  2021-07-06  5:51   ` Christoph Hellwig
  2021-07-05 10:26 ` [PATCH 2/6] loop: conver timer for monitoring idle worker into dwork Ming Lei
                   ` (4 subsequent siblings)
  5 siblings, 1 reply; 22+ messages in thread
From: Ming Lei @ 2021-07-05 10:26 UTC (permalink / raw)
  To: Jens Axboe
  Cc: linux-block, Christoph Hellwig, Ming Lei, Michal Koutný,
	Dan Schatzberg

Each loop_worker is responsible for running requests originated from
same blkcg, so:

1) associate with kthread in the entry of loop_process_work(), and
disassociate in the end of this function, then we can avoid to do
both for each request.

2) remove ->blkcg_css and ->memcg_css from 'loop_cmd' since both are
per loop_worker.

Cc: Michal Koutný <mkoutny@suse.com>
Cc: Dan Schatzberg <schatzberg.dan@gmail.com>
Signed-off-by: Ming Lei <ming.lei@redhat.com>
---
 drivers/block/loop.c | 67 ++++++++++++++++++++------------------------
 drivers/block/loop.h |  2 --
 2 files changed, 30 insertions(+), 39 deletions(-)

diff --git a/drivers/block/loop.c b/drivers/block/loop.c
index 02509bc54242..8378b8455f7c 100644
--- a/drivers/block/loop.c
+++ b/drivers/block/loop.c
@@ -949,10 +949,17 @@ static void loop_queue_work(struct loop_device *lo, struct loop_cmd *cmd)
 	struct loop_worker *cur_worker, *worker = NULL;
 	struct work_struct *work;
 	struct list_head *cmd_list;
+	struct cgroup_subsys_state *blkcg_css = NULL;
+#ifdef CONFIG_BLK_CGROUP
+	struct request *rq = blk_mq_rq_from_pdu(cmd);
+
+	if (rq->bio && rq->bio->bi_blkg)
+		blkcg_css = &bio_blkcg(rq->bio)->css;
+#endif
 
 	spin_lock_irq(&lo->lo_work_lock);
 
-	if (queue_on_root_worker(cmd->blkcg_css))
+	if (queue_on_root_worker(blkcg_css))
 		goto queue_work;
 
 	node = &lo->worker_tree.rb_node;
@@ -960,10 +967,10 @@ static void loop_queue_work(struct loop_device *lo, struct loop_cmd *cmd)
 	while (*node) {
 		parent = *node;
 		cur_worker = container_of(*node, struct loop_worker, rb_node);
-		if (cur_worker->blkcg_css == cmd->blkcg_css) {
+		if (cur_worker->blkcg_css == blkcg_css) {
 			worker = cur_worker;
 			break;
-		} else if ((long)cur_worker->blkcg_css < (long)cmd->blkcg_css) {
+		} else if ((long)cur_worker->blkcg_css < (long)blkcg_css) {
 			node = &(*node)->rb_left;
 		} else {
 			node = &(*node)->rb_right;
@@ -977,15 +984,10 @@ static void loop_queue_work(struct loop_device *lo, struct loop_cmd *cmd)
 	 * In the event we cannot allocate a worker, just queue on the
 	 * rootcg worker and issue the I/O as the rootcg
 	 */
-	if (!worker) {
-		cmd->blkcg_css = NULL;
-		if (cmd->memcg_css)
-			css_put(cmd->memcg_css);
-		cmd->memcg_css = NULL;
+	if (!worker)
 		goto queue_work;
-	}
 
-	worker->blkcg_css = cmd->blkcg_css;
+	worker->blkcg_css = blkcg_css;
 	css_get(worker->blkcg_css);
 	INIT_WORK(&worker->work, loop_workfn);
 	INIT_LIST_HEAD(&worker->cmd_list);
@@ -2100,19 +2102,6 @@ static blk_status_t loop_queue_rq(struct blk_mq_hw_ctx *hctx,
 		break;
 	}
 
-	/* always use the first bio's css */
-	cmd->blkcg_css = NULL;
-	cmd->memcg_css = NULL;
-#ifdef CONFIG_BLK_CGROUP
-	if (rq->bio && rq->bio->bi_blkg) {
-		cmd->blkcg_css = &bio_blkcg(rq->bio)->css;
-#ifdef CONFIG_MEMCG
-		cmd->memcg_css =
-			cgroup_get_e_css(cmd->blkcg_css->cgroup,
-					&memory_cgrp_subsys);
-#endif
-	}
-#endif
 	loop_queue_work(lo, cmd);
 
 	return BLK_STS_OK;
@@ -2124,28 +2113,14 @@ static void loop_handle_cmd(struct loop_cmd *cmd)
 	const bool write = op_is_write(req_op(rq));
 	struct loop_device *lo = rq->q->queuedata;
 	int ret = 0;
-	struct mem_cgroup *old_memcg = NULL;
 
 	if (write && (lo->lo_flags & LO_FLAGS_READ_ONLY)) {
 		ret = -EIO;
 		goto failed;
 	}
 
-	if (cmd->blkcg_css)
-		kthread_associate_blkcg(cmd->blkcg_css);
-	if (cmd->memcg_css)
-		old_memcg = set_active_memcg(
-			mem_cgroup_from_css(cmd->memcg_css));
-
 	ret = do_req_filebacked(lo, rq);
 
-	if (cmd->blkcg_css)
-		kthread_associate_blkcg(NULL);
-
-	if (cmd->memcg_css) {
-		set_active_memcg(old_memcg);
-		css_put(cmd->memcg_css);
-	}
  failed:
 	/* complete non-aio request */
 	if (!cmd->use_aio || ret) {
@@ -2201,7 +2176,25 @@ static void loop_workfn(struct work_struct *work)
 {
 	struct loop_worker *worker =
 		container_of(work, struct loop_worker, work);
+	struct mem_cgroup *old_memcg = NULL;
+	struct cgroup_subsys_state *memcg_css = NULL;
+
+	kthread_associate_blkcg(worker->blkcg_css);
+#ifdef CONFIG_MEMCG
+	memcg_css = cgroup_get_e_css(worker->blkcg_css->cgroup,
+			&memory_cgrp_subsys);
+#endif
+	if (memcg_css)
+		old_memcg = set_active_memcg(
+				mem_cgroup_from_css(memcg_css));
+
 	loop_process_work(worker, &worker->cmd_list, worker->lo);
+
+	kthread_associate_blkcg(NULL);
+	if (memcg_css) {
+		set_active_memcg(old_memcg);
+		css_put(memcg_css);
+	}
 }
 
 static void loop_rootcg_workfn(struct work_struct *work)
diff --git a/drivers/block/loop.h b/drivers/block/loop.h
index 1988899db63a..a52a3fd89457 100644
--- a/drivers/block/loop.h
+++ b/drivers/block/loop.h
@@ -77,8 +77,6 @@ struct loop_cmd {
 	long ret;
 	struct kiocb iocb;
 	struct bio_vec *bvec;
-	struct cgroup_subsys_state *blkcg_css;
-	struct cgroup_subsys_state *memcg_css;
 };
 
 /* Support for loadable transfer modules */
-- 
2.31.1


^ permalink raw reply	[flat|nested] 22+ messages in thread

* [PATCH 2/6] loop: conver timer for monitoring idle worker into dwork
  2021-07-05 10:26 [PATCH 0/6] loop: cleanup charging io to mem/blkcg Ming Lei
  2021-07-05 10:26 ` [PATCH 1/6] loop: clean up blkcg association Ming Lei
@ 2021-07-05 10:26 ` Ming Lei
  2021-07-06  5:52   ` Christoph Hellwig
  2021-07-05 10:26 ` [PATCH 3/6] loop: add __loop_free_idle_workers() for covering freeing workers in clearing FD Ming Lei
                   ` (3 subsequent siblings)
  5 siblings, 1 reply; 22+ messages in thread
From: Ming Lei @ 2021-07-05 10:26 UTC (permalink / raw)
  To: Jens Axboe
  Cc: linux-block, Christoph Hellwig, Ming Lei, Michal Koutný,
	Dan Schatzberg

Not necessary to use a timer to do that, and dwork is just fine,
then we don't need to always disable interrupt when acquiring
->loop_work_lock.

Cc: Michal Koutný <mkoutny@suse.com>
Cc: Dan Schatzberg <schatzberg.dan@gmail.com>
Signed-off-by: Ming Lei <ming.lei@redhat.com>
---
 drivers/block/loop.c | 76 ++++++++++++++++++++++----------------------
 drivers/block/loop.h |  2 +-
 2 files changed, 39 insertions(+), 39 deletions(-)

diff --git a/drivers/block/loop.c b/drivers/block/loop.c
index 8378b8455f7c..7fa0c70a3ea6 100644
--- a/drivers/block/loop.c
+++ b/drivers/block/loop.c
@@ -929,7 +929,6 @@ struct loop_worker {
 
 static void loop_workfn(struct work_struct *work);
 static void loop_rootcg_workfn(struct work_struct *work);
-static void loop_free_idle_workers(struct timer_list *timer);
 
 #ifdef CONFIG_BLK_CGROUP
 static inline int queue_on_root_worker(struct cgroup_subsys_state *css)
@@ -957,7 +956,7 @@ static void loop_queue_work(struct loop_device *lo, struct loop_cmd *cmd)
 		blkcg_css = &bio_blkcg(rq->bio)->css;
 #endif
 
-	spin_lock_irq(&lo->lo_work_lock);
+	spin_lock(&lo->lo_work_lock);
 
 	if (queue_on_root_worker(blkcg_css))
 		goto queue_work;
@@ -1012,7 +1011,7 @@ static void loop_queue_work(struct loop_device *lo, struct loop_cmd *cmd)
 	}
 	list_add_tail(&cmd->list_entry, cmd_list);
 	queue_work(lo->workqueue, work);
-	spin_unlock_irq(&lo->lo_work_lock);
+	spin_unlock(&lo->lo_work_lock);
 }
 
 static void loop_update_rotational(struct loop_device *lo)
@@ -1134,6 +1133,34 @@ loop_set_status_from_info(struct loop_device *lo,
 	return 0;
 }
 
+static void loop_set_timer(struct loop_device *lo)
+{
+	schedule_delayed_work(&lo->idle_work, LOOP_IDLE_WORKER_TIMEOUT);
+}
+
+static void loop_free_idle_workers(struct work_struct *work)
+{
+	struct loop_device *lo = container_of(work, struct loop_device,
+			idle_work.work);
+	struct loop_worker *pos, *worker;
+
+	spin_lock(&lo->lo_work_lock);
+	list_for_each_entry_safe(worker, pos, &lo->idle_worker_list,
+				idle_list) {
+		if (time_is_after_jiffies(worker->last_ran_at +
+						LOOP_IDLE_WORKER_TIMEOUT))
+			break;
+		list_del(&worker->idle_list);
+		rb_erase(&worker->rb_node, &lo->worker_tree);
+		css_put(worker->blkcg_css);
+		kfree(worker);
+	}
+	if (!list_empty(&lo->idle_worker_list))
+		loop_set_timer(lo);
+	spin_unlock(&lo->lo_work_lock);
+}
+
+
 static int loop_configure(struct loop_device *lo, fmode_t mode,
 			  struct block_device *bdev,
 			  const struct loop_config *config)
@@ -1213,8 +1240,7 @@ static int loop_configure(struct loop_device *lo, fmode_t mode,
 	INIT_LIST_HEAD(&lo->rootcg_cmd_list);
 	INIT_LIST_HEAD(&lo->idle_worker_list);
 	lo->worker_tree = RB_ROOT;
-	timer_setup(&lo->timer, loop_free_idle_workers,
-		TIMER_DEFERRABLE);
+	INIT_DELAYED_WORK(&lo->idle_work, loop_free_idle_workers);
 	lo->use_dio = lo->lo_flags & LO_FLAGS_DIRECT_IO;
 	lo->lo_device = bdev;
 	lo->lo_backing_file = file;
@@ -1304,7 +1330,7 @@ static int __loop_clr_fd(struct loop_device *lo, bool release)
 	blk_mq_freeze_queue(lo->lo_queue);
 
 	destroy_workqueue(lo->workqueue);
-	spin_lock_irq(&lo->lo_work_lock);
+	spin_lock(&lo->lo_work_lock);
 	list_for_each_entry_safe(worker, pos, &lo->idle_worker_list,
 				idle_list) {
 		list_del(&worker->idle_list);
@@ -1312,8 +1338,8 @@ static int __loop_clr_fd(struct loop_device *lo, bool release)
 		css_put(worker->blkcg_css);
 		kfree(worker);
 	}
-	spin_unlock_irq(&lo->lo_work_lock);
-	del_timer_sync(&lo->timer);
+	spin_unlock(&lo->lo_work_lock);
+	cancel_delayed_work_sync(&lo->idle_work);
 
 	spin_lock_irq(&lo->lo_lock);
 	lo->lo_backing_file = NULL;
@@ -2133,11 +2159,6 @@ static void loop_handle_cmd(struct loop_cmd *cmd)
 	}
 }
 
-static void loop_set_timer(struct loop_device *lo)
-{
-	timer_reduce(&lo->timer, jiffies + LOOP_IDLE_WORKER_TIMEOUT);
-}
-
 static void loop_process_work(struct loop_worker *worker,
 			struct list_head *cmd_list, struct loop_device *lo)
 {
@@ -2145,17 +2166,17 @@ static void loop_process_work(struct loop_worker *worker,
 	struct loop_cmd *cmd;
 
 	current->flags |= PF_LOCAL_THROTTLE | PF_MEMALLOC_NOIO;
-	spin_lock_irq(&lo->lo_work_lock);
+	spin_lock(&lo->lo_work_lock);
 	while (!list_empty(cmd_list)) {
 		cmd = container_of(
 			cmd_list->next, struct loop_cmd, list_entry);
 		list_del(cmd_list->next);
-		spin_unlock_irq(&lo->lo_work_lock);
+		spin_unlock(&lo->lo_work_lock);
 
 		loop_handle_cmd(cmd);
 		cond_resched();
 
-		spin_lock_irq(&lo->lo_work_lock);
+		spin_lock(&lo->lo_work_lock);
 	}
 
 	/*
@@ -2168,7 +2189,7 @@ static void loop_process_work(struct loop_worker *worker,
 		list_add_tail(&worker->idle_list, &lo->idle_worker_list);
 		loop_set_timer(lo);
 	}
-	spin_unlock_irq(&lo->lo_work_lock);
+	spin_unlock(&lo->lo_work_lock);
 	current->flags = orig_flags;
 }
 
@@ -2204,27 +2225,6 @@ static void loop_rootcg_workfn(struct work_struct *work)
 	loop_process_work(NULL, &lo->rootcg_cmd_list, lo);
 }
 
-static void loop_free_idle_workers(struct timer_list *timer)
-{
-	struct loop_device *lo = container_of(timer, struct loop_device, timer);
-	struct loop_worker *pos, *worker;
-
-	spin_lock_irq(&lo->lo_work_lock);
-	list_for_each_entry_safe(worker, pos, &lo->idle_worker_list,
-				idle_list) {
-		if (time_is_after_jiffies(worker->last_ran_at +
-						LOOP_IDLE_WORKER_TIMEOUT))
-			break;
-		list_del(&worker->idle_list);
-		rb_erase(&worker->rb_node, &lo->worker_tree);
-		css_put(worker->blkcg_css);
-		kfree(worker);
-	}
-	if (!list_empty(&lo->idle_worker_list))
-		loop_set_timer(lo);
-	spin_unlock_irq(&lo->lo_work_lock);
-}
-
 static const struct blk_mq_ops loop_mq_ops = {
 	.queue_rq       = loop_queue_rq,
 	.complete	= lo_complete_rq,
diff --git a/drivers/block/loop.h b/drivers/block/loop.h
index a52a3fd89457..9df889af1bcf 100644
--- a/drivers/block/loop.h
+++ b/drivers/block/loop.h
@@ -60,7 +60,7 @@ struct loop_device {
 	struct list_head        rootcg_cmd_list;
 	struct list_head        idle_worker_list;
 	struct rb_root          worker_tree;
-	struct timer_list       timer;
+	struct delayed_work	idle_work;
 	bool			use_dio;
 	bool			sysfs_inited;
 
-- 
2.31.1


^ permalink raw reply	[flat|nested] 22+ messages in thread

* [PATCH 3/6] loop: add __loop_free_idle_workers() for covering freeing workers in clearing FD
  2021-07-05 10:26 [PATCH 0/6] loop: cleanup charging io to mem/blkcg Ming Lei
  2021-07-05 10:26 ` [PATCH 1/6] loop: clean up blkcg association Ming Lei
  2021-07-05 10:26 ` [PATCH 2/6] loop: conver timer for monitoring idle worker into dwork Ming Lei
@ 2021-07-05 10:26 ` Ming Lei
  2021-07-05 10:26 ` [PATCH 4/6] loop: improve loop_process_work Ming Lei
                   ` (2 subsequent siblings)
  5 siblings, 0 replies; 22+ messages in thread
From: Ming Lei @ 2021-07-05 10:26 UTC (permalink / raw)
  To: Jens Axboe
  Cc: linux-block, Christoph Hellwig, Ming Lei, Michal Koutný,
	Dan Schatzberg

Add the helper, so we can remove the duplicated code for freeing all
workers in clearing FD.

Cc: Michal Koutný <mkoutny@suse.com>
Cc: Dan Schatzberg <schatzberg.dan@gmail.com>
Signed-off-by: Ming Lei <ming.lei@redhat.com>
---
 drivers/block/loop.c | 24 ++++++++++--------------
 1 file changed, 10 insertions(+), 14 deletions(-)

diff --git a/drivers/block/loop.c b/drivers/block/loop.c
index 7fa0c70a3ea6..b71c8659f140 100644
--- a/drivers/block/loop.c
+++ b/drivers/block/loop.c
@@ -1138,16 +1138,14 @@ static void loop_set_timer(struct loop_device *lo)
 	schedule_delayed_work(&lo->idle_work, LOOP_IDLE_WORKER_TIMEOUT);
 }
 
-static void loop_free_idle_workers(struct work_struct *work)
+static void __loop_free_idle_workers(struct loop_device *lo, bool force)
 {
-	struct loop_device *lo = container_of(work, struct loop_device,
-			idle_work.work);
 	struct loop_worker *pos, *worker;
 
 	spin_lock(&lo->lo_work_lock);
 	list_for_each_entry_safe(worker, pos, &lo->idle_worker_list,
 				idle_list) {
-		if (time_is_after_jiffies(worker->last_ran_at +
+		if (!force && time_is_after_jiffies(worker->last_ran_at +
 						LOOP_IDLE_WORKER_TIMEOUT))
 			break;
 		list_del(&worker->idle_list);
@@ -1160,6 +1158,13 @@ static void loop_free_idle_workers(struct work_struct *work)
 	spin_unlock(&lo->lo_work_lock);
 }
 
+static void loop_free_idle_workers(struct work_struct *work)
+{
+	struct loop_device *lo = container_of(work, struct loop_device,
+			idle_work.work);
+
+	__loop_free_idle_workers(lo, false);
+}
 
 static int loop_configure(struct loop_device *lo, fmode_t mode,
 			  struct block_device *bdev,
@@ -1309,7 +1314,6 @@ static int __loop_clr_fd(struct loop_device *lo, bool release)
 	int err = 0;
 	bool partscan = false;
 	int lo_number;
-	struct loop_worker *pos, *worker;
 
 	mutex_lock(&lo->lo_mutex);
 	if (WARN_ON_ONCE(lo->lo_state != Lo_rundown)) {
@@ -1330,15 +1334,7 @@ static int __loop_clr_fd(struct loop_device *lo, bool release)
 	blk_mq_freeze_queue(lo->lo_queue);
 
 	destroy_workqueue(lo->workqueue);
-	spin_lock(&lo->lo_work_lock);
-	list_for_each_entry_safe(worker, pos, &lo->idle_worker_list,
-				idle_list) {
-		list_del(&worker->idle_list);
-		rb_erase(&worker->rb_node, &lo->worker_tree);
-		css_put(worker->blkcg_css);
-		kfree(worker);
-	}
-	spin_unlock(&lo->lo_work_lock);
+	__loop_free_idle_workers(lo, true);
 	cancel_delayed_work_sync(&lo->idle_work);
 
 	spin_lock_irq(&lo->lo_lock);
-- 
2.31.1


^ permalink raw reply	[flat|nested] 22+ messages in thread

* [PATCH 4/6] loop: improve loop_process_work
  2021-07-05 10:26 [PATCH 0/6] loop: cleanup charging io to mem/blkcg Ming Lei
                   ` (2 preceding siblings ...)
  2021-07-05 10:26 ` [PATCH 3/6] loop: add __loop_free_idle_workers() for covering freeing workers in clearing FD Ming Lei
@ 2021-07-05 10:26 ` Ming Lei
  2021-07-06  5:54   ` Christoph Hellwig
  2021-07-05 10:26 ` [PATCH 5/6] loop: use xarray to store workers Ming Lei
  2021-07-05 10:26 ` [PATCH 6/6] loop: don't add worker into idle list Ming Lei
  5 siblings, 1 reply; 22+ messages in thread
From: Ming Lei @ 2021-07-05 10:26 UTC (permalink / raw)
  To: Jens Axboe
  Cc: linux-block, Christoph Hellwig, Ming Lei, Michal Koutný,
	Dan Schatzberg

Avoid to acquire the spinlock for handling every loop command, and hold
lock once for taking all commands.

Cc: Michal Koutný <mkoutny@suse.com>
Cc: Dan Schatzberg <schatzberg.dan@gmail.com>
Signed-off-by: Ming Lei <ming.lei@redhat.com>
---
 drivers/block/loop.c | 21 +++++++++++++--------
 1 file changed, 13 insertions(+), 8 deletions(-)

diff --git a/drivers/block/loop.c b/drivers/block/loop.c
index b71c8659f140..1234e89f9e37 100644
--- a/drivers/block/loop.c
+++ b/drivers/block/loop.c
@@ -2160,21 +2160,26 @@ static void loop_process_work(struct loop_worker *worker,
 {
 	int orig_flags = current->flags;
 	struct loop_cmd *cmd;
+	LIST_HEAD(list);
 
 	current->flags |= PF_LOCAL_THROTTLE | PF_MEMALLOC_NOIO;
+
 	spin_lock(&lo->lo_work_lock);
-	while (!list_empty(cmd_list)) {
-		cmd = container_of(
-			cmd_list->next, struct loop_cmd, list_entry);
-		list_del(cmd_list->next);
-		spin_unlock(&lo->lo_work_lock);
+ again:
+	list_splice_init(cmd_list, &list);
+	spin_unlock(&lo->lo_work_lock);
 
-		loop_handle_cmd(cmd);
-		cond_resched();
+	while (!list_empty(&list)) {
+		cmd = list_first_entry(&list, struct loop_cmd, list_entry);
+		list_del_init(&cmd->list_entry);
 
-		spin_lock(&lo->lo_work_lock);
+		loop_handle_cmd(cmd);
 	}
 
+	spin_lock(&lo->lo_work_lock);
+	if (!list_empty(cmd_list))
+		goto again;
+
 	/*
 	 * We only add to the idle list if there are no pending cmds
 	 * *and* the worker will not run again which ensures that it
-- 
2.31.1


^ permalink raw reply	[flat|nested] 22+ messages in thread

* [PATCH 5/6] loop: use xarray to store workers
  2021-07-05 10:26 [PATCH 0/6] loop: cleanup charging io to mem/blkcg Ming Lei
                   ` (3 preceding siblings ...)
  2021-07-05 10:26 ` [PATCH 4/6] loop: improve loop_process_work Ming Lei
@ 2021-07-05 10:26 ` Ming Lei
  2021-07-05 10:26 ` [PATCH 6/6] loop: don't add worker into idle list Ming Lei
  5 siblings, 0 replies; 22+ messages in thread
From: Ming Lei @ 2021-07-05 10:26 UTC (permalink / raw)
  To: Jens Axboe
  Cc: linux-block, Christoph Hellwig, Ming Lei, Michal Koutný,
	Dan Schatzberg

css->id is unique in io controller wide, so replace rbtree with xarray
for querying/storing 'blkcg_css' by using css->id as key, then code is
simplified a lot.

Cc: Michal Koutný <mkoutny@suse.com>
Cc: Dan Schatzberg <schatzberg.dan@gmail.com>
Signed-off-by: Ming Lei <ming.lei@redhat.com>
---
 drivers/block/loop.c | 36 ++++++++++++++----------------------
 drivers/block/loop.h |  3 ++-
 2 files changed, 16 insertions(+), 23 deletions(-)

diff --git a/drivers/block/loop.c b/drivers/block/loop.c
index 1234e89f9e37..6e9725521330 100644
--- a/drivers/block/loop.c
+++ b/drivers/block/loop.c
@@ -918,7 +918,6 @@ static void loop_config_discard(struct loop_device *lo)
 }
 
 struct loop_worker {
-	struct rb_node rb_node;
 	struct work_struct work;
 	struct list_head cmd_list;
 	struct list_head idle_list;
@@ -944,11 +943,11 @@ static inline int queue_on_root_worker(struct cgroup_subsys_state *css)
 
 static void loop_queue_work(struct loop_device *lo, struct loop_cmd *cmd)
 {
-	struct rb_node **node = &(lo->worker_tree.rb_node), *parent = NULL;
-	struct loop_worker *cur_worker, *worker = NULL;
+	struct loop_worker *worker = NULL;
 	struct work_struct *work;
 	struct list_head *cmd_list;
 	struct cgroup_subsys_state *blkcg_css = NULL;
+	gfp_t gfp = GFP_NOWAIT | __GFP_NOWARN;
 #ifdef CONFIG_BLK_CGROUP
 	struct request *rq = blk_mq_rq_from_pdu(cmd);
 
@@ -961,24 +960,12 @@ static void loop_queue_work(struct loop_device *lo, struct loop_cmd *cmd)
 	if (queue_on_root_worker(blkcg_css))
 		goto queue_work;
 
-	node = &lo->worker_tree.rb_node;
-
-	while (*node) {
-		parent = *node;
-		cur_worker = container_of(*node, struct loop_worker, rb_node);
-		if (cur_worker->blkcg_css == blkcg_css) {
-			worker = cur_worker;
-			break;
-		} else if ((long)cur_worker->blkcg_css < (long)blkcg_css) {
-			node = &(*node)->rb_left;
-		} else {
-			node = &(*node)->rb_right;
-		}
-	}
+	/* css->id is unique in each cgroup subsystem */
+	worker = xa_load(&lo->workers, blkcg_css->id);
 	if (worker)
 		goto queue_work;
 
-	worker = kzalloc(sizeof(struct loop_worker), GFP_NOWAIT | __GFP_NOWARN);
+	worker = kzalloc(sizeof(*worker), gfp);
 	/*
 	 * In the event we cannot allocate a worker, just queue on the
 	 * rootcg worker and issue the I/O as the rootcg
@@ -992,8 +979,12 @@ static void loop_queue_work(struct loop_device *lo, struct loop_cmd *cmd)
 	INIT_LIST_HEAD(&worker->cmd_list);
 	INIT_LIST_HEAD(&worker->idle_list);
 	worker->lo = lo;
-	rb_link_node(&worker->rb_node, parent, node);
-	rb_insert_color(&worker->rb_node, &lo->worker_tree);
+
+	if (xa_err(xa_store(&lo->workers, blkcg_css->id, worker, gfp))) {
+		kfree(worker);
+		worker = NULL;
+	}
+
 queue_work:
 	if (worker) {
 		/*
@@ -1149,7 +1140,7 @@ static void __loop_free_idle_workers(struct loop_device *lo, bool force)
 						LOOP_IDLE_WORKER_TIMEOUT))
 			break;
 		list_del(&worker->idle_list);
-		rb_erase(&worker->rb_node, &lo->worker_tree);
+		xa_erase(&lo->workers, worker->blkcg_css->id);
 		css_put(worker->blkcg_css);
 		kfree(worker);
 	}
@@ -1244,7 +1235,7 @@ static int loop_configure(struct loop_device *lo, fmode_t mode,
 	INIT_WORK(&lo->rootcg_work, loop_rootcg_workfn);
 	INIT_LIST_HEAD(&lo->rootcg_cmd_list);
 	INIT_LIST_HEAD(&lo->idle_worker_list);
-	lo->worker_tree = RB_ROOT;
+	xa_init(&lo->workers);
 	INIT_DELAYED_WORK(&lo->idle_work, loop_free_idle_workers);
 	lo->use_dio = lo->lo_flags & LO_FLAGS_DIRECT_IO;
 	lo->lo_device = bdev;
@@ -1336,6 +1327,7 @@ static int __loop_clr_fd(struct loop_device *lo, bool release)
 	destroy_workqueue(lo->workqueue);
 	__loop_free_idle_workers(lo, true);
 	cancel_delayed_work_sync(&lo->idle_work);
+	xa_destroy(&lo->workers);
 
 	spin_lock_irq(&lo->lo_lock);
 	lo->lo_backing_file = NULL;
diff --git a/drivers/block/loop.h b/drivers/block/loop.h
index 9df889af1bcf..cab34da1e1bb 100644
--- a/drivers/block/loop.h
+++ b/drivers/block/loop.h
@@ -14,6 +14,7 @@
 #include <linux/blk-mq.h>
 #include <linux/spinlock.h>
 #include <linux/mutex.h>
+#include <linux/xarray.h>
 #include <uapi/linux/loop.h>
 
 /* Possible states of device */
@@ -59,7 +60,7 @@ struct loop_device {
 	struct work_struct      rootcg_work;
 	struct list_head        rootcg_cmd_list;
 	struct list_head        idle_worker_list;
-	struct rb_root          worker_tree;
+	struct xarray		workers;
 	struct delayed_work	idle_work;
 	bool			use_dio;
 	bool			sysfs_inited;
-- 
2.31.1


^ permalink raw reply	[flat|nested] 22+ messages in thread

* [PATCH 6/6] loop: don't add worker into idle list
  2021-07-05 10:26 [PATCH 0/6] loop: cleanup charging io to mem/blkcg Ming Lei
                   ` (4 preceding siblings ...)
  2021-07-05 10:26 ` [PATCH 5/6] loop: use xarray to store workers Ming Lei
@ 2021-07-05 10:26 ` Ming Lei
  2021-07-06 13:55   ` Dan Schatzberg
  5 siblings, 1 reply; 22+ messages in thread
From: Ming Lei @ 2021-07-05 10:26 UTC (permalink / raw)
  To: Jens Axboe
  Cc: linux-block, Christoph Hellwig, Ming Lei, Michal Koutný,
	Dan Schatzberg

We can retrieve any workers via xarray, so not add it into idle list.
Meantime reduce .lo_work_lock coverage, especially we don't need that
in IO path except for adding/deleting worker into xarray.

Also simplify code a bit.

Cc: Michal Koutný <mkoutny@suse.com>
Cc: Dan Schatzberg <schatzberg.dan@gmail.com>
Signed-off-by: Ming Lei <ming.lei@redhat.com>
---
 drivers/block/loop.c | 138 ++++++++++++++++++++++++++-----------------
 1 file changed, 84 insertions(+), 54 deletions(-)

diff --git a/drivers/block/loop.c b/drivers/block/loop.c
index 6e9725521330..146eaa03629b 100644
--- a/drivers/block/loop.c
+++ b/drivers/block/loop.c
@@ -920,10 +920,11 @@ static void loop_config_discard(struct loop_device *lo)
 struct loop_worker {
 	struct work_struct work;
 	struct list_head cmd_list;
-	struct list_head idle_list;
 	struct loop_device *lo;
 	struct cgroup_subsys_state *blkcg_css;
 	unsigned long last_ran_at;
+	spinlock_t lock;
+	refcount_t refcnt;
 };
 
 static void loop_workfn(struct work_struct *work);
@@ -941,13 +942,56 @@ static inline int queue_on_root_worker(struct cgroup_subsys_state *css)
 }
 #endif
 
+static struct loop_worker *loop_alloc_or_get_worker(struct loop_device *lo,
+		struct cgroup_subsys_state *blkcg_css)
+{
+	gfp_t gfp = GFP_NOWAIT | __GFP_NOWARN;
+	struct loop_worker *worker = kzalloc(sizeof(*worker), gfp);
+	struct loop_worker *worker_old;
+
+	if (!worker)
+		return NULL;
+
+	worker->blkcg_css = blkcg_css;
+	INIT_WORK(&worker->work, loop_workfn);
+	INIT_LIST_HEAD(&worker->cmd_list);
+	worker->lo = lo;
+	spin_lock_init(&worker->lock);
+	refcount_set(&worker->refcnt, 2);	/* INIT + INC */
+
+	spin_lock(&lo->lo_work_lock);
+	/* maybe someone is storing a new worker */
+	worker_old = xa_load(&lo->workers, blkcg_css->id);
+	if (!worker_old || !refcount_inc_not_zero(&worker_old->refcnt)) {
+		if (xa_err(xa_store(&lo->workers, blkcg_css->id, worker, gfp))) {
+			kfree(worker);
+			worker = NULL;
+		} else {
+			css_get(worker->blkcg_css);
+		}
+	} else {
+		kfree(worker);
+		worker = worker_old;
+	}
+	spin_unlock(&lo->lo_work_lock);
+
+	return worker;
+}
+
+static void loop_release_worker(struct loop_worker *worker)
+{
+	xa_erase(&worker->lo->workers, worker->blkcg_css->id);
+	css_put(worker->blkcg_css);
+	kfree(worker);
+}
+
 static void loop_queue_work(struct loop_device *lo, struct loop_cmd *cmd)
 {
 	struct loop_worker *worker = NULL;
 	struct work_struct *work;
 	struct list_head *cmd_list;
 	struct cgroup_subsys_state *blkcg_css = NULL;
-	gfp_t gfp = GFP_NOWAIT | __GFP_NOWARN;
+	spinlock_t	*lock;
 #ifdef CONFIG_BLK_CGROUP
 	struct request *rq = blk_mq_rq_from_pdu(cmd);
 
@@ -955,54 +999,38 @@ static void loop_queue_work(struct loop_device *lo, struct loop_cmd *cmd)
 		blkcg_css = &bio_blkcg(rq->bio)->css;
 #endif
 
-	spin_lock(&lo->lo_work_lock);
-
-	if (queue_on_root_worker(blkcg_css))
-		goto queue_work;
-
-	/* css->id is unique in each cgroup subsystem */
-	worker = xa_load(&lo->workers, blkcg_css->id);
-	if (worker)
-		goto queue_work;
-
-	worker = kzalloc(sizeof(*worker), gfp);
-	/*
-	 * In the event we cannot allocate a worker, just queue on the
-	 * rootcg worker and issue the I/O as the rootcg
-	 */
-	if (!worker)
-		goto queue_work;
+	if (!queue_on_root_worker(blkcg_css)) {
+		int ret = 0;
 
-	worker->blkcg_css = blkcg_css;
-	css_get(worker->blkcg_css);
-	INIT_WORK(&worker->work, loop_workfn);
-	INIT_LIST_HEAD(&worker->cmd_list);
-	INIT_LIST_HEAD(&worker->idle_list);
-	worker->lo = lo;
+		rcu_read_lock();
+		/* css->id is unique in each cgroup subsystem */
+		worker = xa_load(&lo->workers, blkcg_css->id);
+		if (worker)
+			ret = refcount_inc_not_zero(&worker->refcnt);
+		rcu_read_unlock();
 
-	if (xa_err(xa_store(&lo->workers, blkcg_css->id, worker, gfp))) {
-		kfree(worker);
-		worker = NULL;
+		if (!worker || !ret)
+			worker = loop_alloc_or_get_worker(lo, blkcg_css);
+		/*
+		 * In the event we cannot allocate a worker, just queue on the
+		 * rootcg worker and issue the I/O as the rootcg
+		 */
 	}
 
-queue_work:
 	if (worker) {
-		/*
-		 * We need to remove from the idle list here while
-		 * holding the lock so that the idle timer doesn't
-		 * free the worker
-		 */
-		if (!list_empty(&worker->idle_list))
-			list_del_init(&worker->idle_list);
 		work = &worker->work;
 		cmd_list = &worker->cmd_list;
+		lock = &worker->lock;
 	} else {
 		work = &lo->rootcg_work;
 		cmd_list = &lo->rootcg_cmd_list;
+		lock = &lo->lo_work_lock;
 	}
+
+	spin_lock(lock);
 	list_add_tail(&cmd->list_entry, cmd_list);
+	spin_unlock(lock);
 	queue_work(lo->workqueue, work);
-	spin_unlock(&lo->lo_work_lock);
 }
 
 static void loop_update_rotational(struct loop_device *lo)
@@ -1131,20 +1159,18 @@ static void loop_set_timer(struct loop_device *lo)
 
 static void __loop_free_idle_workers(struct loop_device *lo, bool force)
 {
-	struct loop_worker *pos, *worker;
+	struct loop_worker *worker;
+	unsigned long id;
 
 	spin_lock(&lo->lo_work_lock);
-	list_for_each_entry_safe(worker, pos, &lo->idle_worker_list,
-				idle_list) {
+	xa_for_each(&lo->workers, id, worker) {
 		if (!force && time_is_after_jiffies(worker->last_ran_at +
 						LOOP_IDLE_WORKER_TIMEOUT))
 			break;
-		list_del(&worker->idle_list);
-		xa_erase(&lo->workers, worker->blkcg_css->id);
-		css_put(worker->blkcg_css);
-		kfree(worker);
+		if (refcount_dec_and_test(&worker->refcnt))
+			loop_release_worker(worker);
 	}
-	if (!list_empty(&lo->idle_worker_list))
+	if (!xa_empty(&lo->workers))
 		loop_set_timer(lo);
 	spin_unlock(&lo->lo_work_lock);
 }
@@ -2148,27 +2174,29 @@ static void loop_handle_cmd(struct loop_cmd *cmd)
 }
 
 static void loop_process_work(struct loop_worker *worker,
-			struct list_head *cmd_list, struct loop_device *lo)
+			struct list_head *cmd_list, spinlock_t *lock)
 {
 	int orig_flags = current->flags;
 	struct loop_cmd *cmd;
 	LIST_HEAD(list);
+	int cnt = 0;
 
 	current->flags |= PF_LOCAL_THROTTLE | PF_MEMALLOC_NOIO;
 
-	spin_lock(&lo->lo_work_lock);
+	spin_lock(lock);
  again:
 	list_splice_init(cmd_list, &list);
-	spin_unlock(&lo->lo_work_lock);
+	spin_unlock(lock);
 
 	while (!list_empty(&list)) {
 		cmd = list_first_entry(&list, struct loop_cmd, list_entry);
 		list_del_init(&cmd->list_entry);
 
 		loop_handle_cmd(cmd);
+		cnt++;
 	}
 
-	spin_lock(&lo->lo_work_lock);
+	spin_lock(lock);
 	if (!list_empty(cmd_list))
 		goto again;
 
@@ -2179,11 +2207,13 @@ static void loop_process_work(struct loop_worker *worker,
 	 */
 	if (worker && !work_pending(&worker->work)) {
 		worker->last_ran_at = jiffies;
-		list_add_tail(&worker->idle_list, &lo->idle_worker_list);
-		loop_set_timer(lo);
+		loop_set_timer(worker->lo);
 	}
-	spin_unlock(&lo->lo_work_lock);
+	spin_unlock(lock);
 	current->flags = orig_flags;
+
+	if (worker && refcount_sub_and_test(cnt, &worker->refcnt))
+		loop_release_worker(worker);
 }
 
 static void loop_workfn(struct work_struct *work)
@@ -2202,7 +2232,7 @@ static void loop_workfn(struct work_struct *work)
 		old_memcg = set_active_memcg(
 				mem_cgroup_from_css(memcg_css));
 
-	loop_process_work(worker, &worker->cmd_list, worker->lo);
+	loop_process_work(worker, &worker->cmd_list, &worker->lock);
 
 	kthread_associate_blkcg(NULL);
 	if (memcg_css) {
@@ -2215,7 +2245,7 @@ static void loop_rootcg_workfn(struct work_struct *work)
 {
 	struct loop_device *lo =
 		container_of(work, struct loop_device, rootcg_work);
-	loop_process_work(NULL, &lo->rootcg_cmd_list, lo);
+	loop_process_work(NULL, &lo->rootcg_cmd_list, &lo->lo_work_lock);
 }
 
 static const struct blk_mq_ops loop_mq_ops = {
-- 
2.31.1


^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [PATCH 1/6] loop: clean up blkcg association
  2021-07-05 10:26 ` [PATCH 1/6] loop: clean up blkcg association Ming Lei
@ 2021-07-06  5:51   ` Christoph Hellwig
  2021-07-08  7:20     ` Ming Lei
  0 siblings, 1 reply; 22+ messages in thread
From: Christoph Hellwig @ 2021-07-06  5:51 UTC (permalink / raw)
  To: Ming Lei
  Cc: Jens Axboe, linux-block, Christoph Hellwig, Michal Koutný,
	Dan Schatzberg

Did I mention that all this loop blkcg integration is complete and utter
garbage and should have never been merged?


>  	struct list_head *cmd_list;
> +	struct cgroup_subsys_state *blkcg_css = NULL;
> +#ifdef CONFIG_BLK_CGROUP
> +	struct request *rq = blk_mq_rq_from_pdu(cmd);
> +
> +	if (rq->bio && rq->bio->bi_blkg)
> +		blkcg_css = &bio_blkcg(rq->bio)->css;
> +#endif

This kind of junk has no business in a driver.  The blkcg code
need to provide a helper for retreiving the blkcg_css from a request,
including a stub for the the !CONFIG_BLK_CGROUP case.

>  		cur_worker = container_of(*node, struct loop_worker, rb_node);
> -		if (cur_worker->blkcg_css == cmd->blkcg_css) {
> +		if (cur_worker->blkcg_css == blkcg_css) {
>  			worker = cur_worker;
>  			break;
> -		} else if ((long)cur_worker->blkcg_css < (long)cmd->blkcg_css) {
> +		} else if ((long)cur_worker->blkcg_css < (long)blkcg_css) {

No need for an else after a break.  And more importantly no need to cast
a pointer to compare it to another pointer of the same type.

> +	struct mem_cgroup *old_memcg = NULL;
> +	struct cgroup_subsys_state *memcg_css = NULL;
> +
> +	kthread_associate_blkcg(worker->blkcg_css);
> +#ifdef CONFIG_MEMCG
> +	memcg_css = cgroup_get_e_css(worker->blkcg_css->cgroup,
> +			&memory_cgrp_subsys);
> +#endif
> +	if (memcg_css)
> +		old_memcg = set_active_memcg(
> +				mem_cgroup_from_css(memcg_css));
> +

This kind of crap also has absolutely no business in a block driver
and the memcg code should provide a proper helper.

^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [PATCH 2/6] loop: conver timer for monitoring idle worker into dwork
  2021-07-05 10:26 ` [PATCH 2/6] loop: conver timer for monitoring idle worker into dwork Ming Lei
@ 2021-07-06  5:52   ` Christoph Hellwig
  2021-07-08  7:23     ` Ming Lei
  0 siblings, 1 reply; 22+ messages in thread
From: Christoph Hellwig @ 2021-07-06  5:52 UTC (permalink / raw)
  To: Ming Lei
  Cc: Jens Axboe, linux-block, Christoph Hellwig, Michal Koutný,
	Dan Schatzberg

On Mon, Jul 05, 2021 at 06:26:03PM +0800, Ming Lei wrote:
> +static void loop_free_idle_workers(struct work_struct *work)
> +{
> +	struct loop_device *lo = container_of(work, struct loop_device,
> +			idle_work.work);
> +	struct loop_worker *pos, *worker;
> +
> +	spin_lock(&lo->lo_work_lock);
> +	list_for_each_entry_safe(worker, pos, &lo->idle_worker_list,
> +				idle_list) {
> +		if (time_is_after_jiffies(worker->last_ran_at +
> +						LOOP_IDLE_WORKER_TIMEOUT))
> +			break;
> +		list_del(&worker->idle_list);
> +		rb_erase(&worker->rb_node, &lo->worker_tree);
> +		css_put(worker->blkcg_css);
> +		kfree(worker);
> +	}
> +	if (!list_empty(&lo->idle_worker_list))
> +		loop_set_timer(lo);
> +	spin_unlock(&lo->lo_work_lock);
> +}
> +
> +

Double empty line.  But that whole fact that the loop driver badly
reimplements work queues is just fucked up.  We need to revert this
shit.

^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [PATCH 4/6] loop: improve loop_process_work
  2021-07-05 10:26 ` [PATCH 4/6] loop: improve loop_process_work Ming Lei
@ 2021-07-06  5:54   ` Christoph Hellwig
  0 siblings, 0 replies; 22+ messages in thread
From: Christoph Hellwig @ 2021-07-06  5:54 UTC (permalink / raw)
  To: Ming Lei
  Cc: Jens Axboe, linux-block, Christoph Hellwig, Michal Koutný,
	Dan Schatzberg

On Mon, Jul 05, 2021 at 06:26:05PM +0800, Ming Lei wrote:
> Avoid to acquire the spinlock for handling every loop command, and hold
> lock once for taking all commands.

I think the proper answer is to stop using this badly reimplement workqueue
that can't even get basic things right.

^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [PATCH 6/6] loop: don't add worker into idle list
  2021-07-05 10:26 ` [PATCH 6/6] loop: don't add worker into idle list Ming Lei
@ 2021-07-06 13:55   ` Dan Schatzberg
  2021-07-07  3:19     ` Ming Lei
  0 siblings, 1 reply; 22+ messages in thread
From: Dan Schatzberg @ 2021-07-06 13:55 UTC (permalink / raw)
  To: Ming Lei; +Cc: Jens Axboe, linux-block, Christoph Hellwig, Michal Koutný

On Mon, Jul 05, 2021 at 06:26:07PM +0800, Ming Lei wrote:
> We can retrieve any workers via xarray, so not add it into idle list.
> Meantime reduce .lo_work_lock coverage, especially we don't need that
> in IO path except for adding/deleting worker into xarray.
> 
> Also simplify code a bit.
> 
> Cc: Michal Koutný <mkoutny@suse.com>
> Cc: Dan Schatzberg <schatzberg.dan@gmail.com>
> Signed-off-by: Ming Lei <ming.lei@redhat.com>
> ---
>  drivers/block/loop.c | 138 ++++++++++++++++++++++++++-----------------
>  1 file changed, 84 insertions(+), 54 deletions(-)
> 
> diff --git a/drivers/block/loop.c b/drivers/block/loop.c
> index 6e9725521330..146eaa03629b 100644
> --- a/drivers/block/loop.c
> +++ b/drivers/block/loop.c
> @@ -920,10 +920,11 @@ static void loop_config_discard(struct loop_device *lo)
>  struct loop_worker {
>  	struct work_struct work;
>  	struct list_head cmd_list;
> -	struct list_head idle_list;
>  	struct loop_device *lo;
>  	struct cgroup_subsys_state *blkcg_css;
>  	unsigned long last_ran_at;
> +	spinlock_t lock;
> +	refcount_t refcnt;
>  };
>  
>  static void loop_workfn(struct work_struct *work);
> @@ -941,13 +942,56 @@ static inline int queue_on_root_worker(struct cgroup_subsys_state *css)
>  }
>  #endif
>  
> +static struct loop_worker *loop_alloc_or_get_worker(struct loop_device *lo,
> +		struct cgroup_subsys_state *blkcg_css)
> +{
> +	gfp_t gfp = GFP_NOWAIT | __GFP_NOWARN;
> +	struct loop_worker *worker = kzalloc(sizeof(*worker), gfp);
> +	struct loop_worker *worker_old;
> +
> +	if (!worker)
> +		return NULL;
> +
> +	worker->blkcg_css = blkcg_css;
> +	INIT_WORK(&worker->work, loop_workfn);
> +	INIT_LIST_HEAD(&worker->cmd_list);
> +	worker->lo = lo;
> +	spin_lock_init(&worker->lock);
> +	refcount_set(&worker->refcnt, 2);	/* INIT + INC */

Can you elaborate on the reference counting a bit more here? I notice
you have a reference per loop_cmd, but there are a couple extra
refcounts that aren't obvious to me. Having a comment describing it
might be useful.

> +
> +	spin_lock(&lo->lo_work_lock);
> +	/* maybe someone is storing a new worker */
> +	worker_old = xa_load(&lo->workers, blkcg_css->id);
> +	if (!worker_old || !refcount_inc_not_zero(&worker_old->refcnt)) {

I gather you increment the refcount here under lo_work_lock to ensure
the worker isn't destroyed before queueing the cmd on it.

> +		if (xa_err(xa_store(&lo->workers, blkcg_css->id, worker, gfp))) {
> +			kfree(worker);
> +			worker = NULL;
> +		} else {
> +			css_get(worker->blkcg_css);
> +		}
> +	} else {
> +		kfree(worker);
> +		worker = worker_old;
> +	}
> +	spin_unlock(&lo->lo_work_lock);
> +
> +	return worker;
> +}
> +
> +static void loop_release_worker(struct loop_worker *worker)
> +{
> +	xa_erase(&worker->lo->workers, worker->blkcg_css->id);
> +	css_put(worker->blkcg_css);
> +	kfree(worker);
> +}
> +
>  static void loop_queue_work(struct loop_device *lo, struct loop_cmd *cmd)
>  {
>  	struct loop_worker *worker = NULL;
>  	struct work_struct *work;
>  	struct list_head *cmd_list;
>  	struct cgroup_subsys_state *blkcg_css = NULL;
> -	gfp_t gfp = GFP_NOWAIT | __GFP_NOWARN;
> +	spinlock_t	*lock;
>  #ifdef CONFIG_BLK_CGROUP
>  	struct request *rq = blk_mq_rq_from_pdu(cmd);
>  
> @@ -955,54 +999,38 @@ static void loop_queue_work(struct loop_device *lo, struct loop_cmd *cmd)
>  		blkcg_css = &bio_blkcg(rq->bio)->css;
>  #endif
>  
> -	spin_lock(&lo->lo_work_lock);
> -
> -	if (queue_on_root_worker(blkcg_css))
> -		goto queue_work;
> -
> -	/* css->id is unique in each cgroup subsystem */
> -	worker = xa_load(&lo->workers, blkcg_css->id);
> -	if (worker)
> -		goto queue_work;
> -
> -	worker = kzalloc(sizeof(*worker), gfp);
> -	/*
> -	 * In the event we cannot allocate a worker, just queue on the
> -	 * rootcg worker and issue the I/O as the rootcg
> -	 */
> -	if (!worker)
> -		goto queue_work;
> +	if (!queue_on_root_worker(blkcg_css)) {
> +		int ret = 0;
>  
> -	worker->blkcg_css = blkcg_css;
> -	css_get(worker->blkcg_css);
> -	INIT_WORK(&worker->work, loop_workfn);
> -	INIT_LIST_HEAD(&worker->cmd_list);
> -	INIT_LIST_HEAD(&worker->idle_list);
> -	worker->lo = lo;
> +		rcu_read_lock();
> +		/* css->id is unique in each cgroup subsystem */
> +		worker = xa_load(&lo->workers, blkcg_css->id);
> +		if (worker)
> +			ret = refcount_inc_not_zero(&worker->refcnt);
> +		rcu_read_unlock();

I don't follow the refcount decrement here. Also, what's the purpose
of the rcu critical section here?

>  
> -	if (xa_err(xa_store(&lo->workers, blkcg_css->id, worker, gfp))) {
> -		kfree(worker);
> -		worker = NULL;
> +		if (!worker || !ret)
> +			worker = loop_alloc_or_get_worker(lo, blkcg_css);
> +		/*
> +		 * In the event we cannot allocate a worker, just queue on the
> +		 * rootcg worker and issue the I/O as the rootcg
> +		 */
>  	}
>  
> -queue_work:
>  	if (worker) {
> -		/*
> -		 * We need to remove from the idle list here while
> -		 * holding the lock so that the idle timer doesn't
> -		 * free the worker
> -		 */
> -		if (!list_empty(&worker->idle_list))
> -			list_del_init(&worker->idle_list);
>  		work = &worker->work;
>  		cmd_list = &worker->cmd_list;
> +		lock = &worker->lock;
>  	} else {
>  		work = &lo->rootcg_work;
>  		cmd_list = &lo->rootcg_cmd_list;
> +		lock = &lo->lo_work_lock;

Is lo_work_lock special here or is it just because the root "worker"
lacks a lock itself? I wonder if a separate spinlock is more clear.

>  	}
> +
> +	spin_lock(lock);
>  	list_add_tail(&cmd->list_entry, cmd_list);
> +	spin_unlock(lock);
>  	queue_work(lo->workqueue, work);
> -	spin_unlock(&lo->lo_work_lock);
>  }
>  
>  static void loop_update_rotational(struct loop_device *lo)
> @@ -1131,20 +1159,18 @@ static void loop_set_timer(struct loop_device *lo)
>  
>  static void __loop_free_idle_workers(struct loop_device *lo, bool force)
>  {
> -	struct loop_worker *pos, *worker;
> +	struct loop_worker *worker;
> +	unsigned long id;
>  
>  	spin_lock(&lo->lo_work_lock);
> -	list_for_each_entry_safe(worker, pos, &lo->idle_worker_list,
> -				idle_list) {
> +	xa_for_each(&lo->workers, id, worker) {
>  		if (!force && time_is_after_jiffies(worker->last_ran_at +
>  						LOOP_IDLE_WORKER_TIMEOUT))
>  			break;
> -		list_del(&worker->idle_list);
> -		xa_erase(&lo->workers, worker->blkcg_css->id);
> -		css_put(worker->blkcg_css);
> -		kfree(worker);
> +		if (refcount_dec_and_test(&worker->refcnt))
> +			loop_release_worker(worker);

This one is puzzling to me. Can't you hit this refcount decrement
superfluously each time the loop timer fires?

>  	}
> -	if (!list_empty(&lo->idle_worker_list))
> +	if (!xa_empty(&lo->workers))
>  		loop_set_timer(lo);
>  	spin_unlock(&lo->lo_work_lock);
>  }
> @@ -2148,27 +2174,29 @@ static void loop_handle_cmd(struct loop_cmd *cmd)
>  }
>  
>  static void loop_process_work(struct loop_worker *worker,
> -			struct list_head *cmd_list, struct loop_device *lo)
> +			struct list_head *cmd_list, spinlock_t *lock)
>  {
>  	int orig_flags = current->flags;
>  	struct loop_cmd *cmd;
>  	LIST_HEAD(list);
> +	int cnt = 0;
>  
>  	current->flags |= PF_LOCAL_THROTTLE | PF_MEMALLOC_NOIO;
>  
> -	spin_lock(&lo->lo_work_lock);
> +	spin_lock(lock);
>   again:
>  	list_splice_init(cmd_list, &list);
> -	spin_unlock(&lo->lo_work_lock);
> +	spin_unlock(lock);
>  
>  	while (!list_empty(&list)) {
>  		cmd = list_first_entry(&list, struct loop_cmd, list_entry);
>  		list_del_init(&cmd->list_entry);
>  
>  		loop_handle_cmd(cmd);
> +		cnt++;
>  	}
>  
> -	spin_lock(&lo->lo_work_lock);
> +	spin_lock(lock);
>  	if (!list_empty(cmd_list))
>  		goto again;
>  
> @@ -2179,11 +2207,13 @@ static void loop_process_work(struct loop_worker *worker,
>  	 */
>  	if (worker && !work_pending(&worker->work)) {
>  		worker->last_ran_at = jiffies;
> -		list_add_tail(&worker->idle_list, &lo->idle_worker_list);
> -		loop_set_timer(lo);
> +		loop_set_timer(worker->lo);
>  	}
> -	spin_unlock(&lo->lo_work_lock);
> +	spin_unlock(lock);
>  	current->flags = orig_flags;
> +
> +	if (worker && refcount_sub_and_test(cnt, &worker->refcnt))
> +		loop_release_worker(worker);
>  }
>  
>  static void loop_workfn(struct work_struct *work)
> @@ -2202,7 +2232,7 @@ static void loop_workfn(struct work_struct *work)
>  		old_memcg = set_active_memcg(
>  				mem_cgroup_from_css(memcg_css));
>  
> -	loop_process_work(worker, &worker->cmd_list, worker->lo);
> +	loop_process_work(worker, &worker->cmd_list, &worker->lock);
>  
>  	kthread_associate_blkcg(NULL);
>  	if (memcg_css) {
> @@ -2215,7 +2245,7 @@ static void loop_rootcg_workfn(struct work_struct *work)
>  {
>  	struct loop_device *lo =
>  		container_of(work, struct loop_device, rootcg_work);
> -	loop_process_work(NULL, &lo->rootcg_cmd_list, lo);
> +	loop_process_work(NULL, &lo->rootcg_cmd_list, &lo->lo_work_lock);
>  }
>  
>  static const struct blk_mq_ops loop_mq_ops = {
> -- 
> 2.31.1
> 

The rest of this patch series looks great. Feel free to add my
Acked-by to the others.

^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [PATCH 6/6] loop: don't add worker into idle list
  2021-07-06 13:55   ` Dan Schatzberg
@ 2021-07-07  3:19     ` Ming Lei
  2021-07-07 13:55       ` Dan Schatzberg
  0 siblings, 1 reply; 22+ messages in thread
From: Ming Lei @ 2021-07-07  3:19 UTC (permalink / raw)
  To: Dan Schatzberg
  Cc: Jens Axboe, linux-block, Christoph Hellwig, Michal Koutný

On Tue, Jul 06, 2021 at 09:55:36AM -0400, Dan Schatzberg wrote:
> On Mon, Jul 05, 2021 at 06:26:07PM +0800, Ming Lei wrote:
> > We can retrieve any workers via xarray, so not add it into idle list.
> > Meantime reduce .lo_work_lock coverage, especially we don't need that
> > in IO path except for adding/deleting worker into xarray.
> > 
> > Also simplify code a bit.
> > 
> > Cc: Michal Koutný <mkoutny@suse.com>
> > Cc: Dan Schatzberg <schatzberg.dan@gmail.com>
> > Signed-off-by: Ming Lei <ming.lei@redhat.com>
> > ---
> >  drivers/block/loop.c | 138 ++++++++++++++++++++++++++-----------------
> >  1 file changed, 84 insertions(+), 54 deletions(-)
> > 
> > diff --git a/drivers/block/loop.c b/drivers/block/loop.c
> > index 6e9725521330..146eaa03629b 100644
> > --- a/drivers/block/loop.c
> > +++ b/drivers/block/loop.c
> > @@ -920,10 +920,11 @@ static void loop_config_discard(struct loop_device *lo)
> >  struct loop_worker {
> >  	struct work_struct work;
> >  	struct list_head cmd_list;
> > -	struct list_head idle_list;
> >  	struct loop_device *lo;
> >  	struct cgroup_subsys_state *blkcg_css;
> >  	unsigned long last_ran_at;
> > +	spinlock_t lock;
> > +	refcount_t refcnt;
> >  };
> >  
> >  static void loop_workfn(struct work_struct *work);
> > @@ -941,13 +942,56 @@ static inline int queue_on_root_worker(struct cgroup_subsys_state *css)
> >  }
> >  #endif
> >  
> > +static struct loop_worker *loop_alloc_or_get_worker(struct loop_device *lo,
> > +		struct cgroup_subsys_state *blkcg_css)
> > +{
> > +	gfp_t gfp = GFP_NOWAIT | __GFP_NOWARN;
> > +	struct loop_worker *worker = kzalloc(sizeof(*worker), gfp);
> > +	struct loop_worker *worker_old;
> > +
> > +	if (!worker)
> > +		return NULL;
> > +
> > +	worker->blkcg_css = blkcg_css;
> > +	INIT_WORK(&worker->work, loop_workfn);
> > +	INIT_LIST_HEAD(&worker->cmd_list);
> > +	worker->lo = lo;
> > +	spin_lock_init(&worker->lock);
> > +	refcount_set(&worker->refcnt, 2);	/* INIT + INC */
> 
> Can you elaborate on the reference counting a bit more here? I notice
> you have a reference per loop_cmd, but there are a couple extra
> refcounts that aren't obvious to me. Having a comment describing it
> might be useful.

One reference is for INIT, another is for INC, so initialized as 2.

The counter pair of INIT reference is refcount_dec_and_test()
in __loop_free_idle_workers(), and it is per-worker.

The counter pair of INC reference is refcount_sub_and_test() in
loop_process_work(), and it is per-cmd.

> 
> > +
> > +	spin_lock(&lo->lo_work_lock);
> > +	/* maybe someone is storing a new worker */
> > +	worker_old = xa_load(&lo->workers, blkcg_css->id);
> > +	if (!worker_old || !refcount_inc_not_zero(&worker_old->refcnt)) {
> 
> I gather you increment the refcount here under lo_work_lock to ensure
> the worker isn't destroyed before queueing the cmd on it.

Right.

> 
> > +		if (xa_err(xa_store(&lo->workers, blkcg_css->id, worker, gfp))) {
> > +			kfree(worker);
> > +			worker = NULL;
> > +		} else {
> > +			css_get(worker->blkcg_css);
> > +		}
> > +	} else {
> > +		kfree(worker);
> > +		worker = worker_old;
> > +	}
> > +	spin_unlock(&lo->lo_work_lock);
> > +
> > +	return worker;
> > +}
> > +
> > +static void loop_release_worker(struct loop_worker *worker)
> > +{
> > +	xa_erase(&worker->lo->workers, worker->blkcg_css->id);
> > +	css_put(worker->blkcg_css);
> > +	kfree(worker);
> > +}
> > +
> >  static void loop_queue_work(struct loop_device *lo, struct loop_cmd *cmd)
> >  {
> >  	struct loop_worker *worker = NULL;
> >  	struct work_struct *work;
> >  	struct list_head *cmd_list;
> >  	struct cgroup_subsys_state *blkcg_css = NULL;
> > -	gfp_t gfp = GFP_NOWAIT | __GFP_NOWARN;
> > +	spinlock_t	*lock;
> >  #ifdef CONFIG_BLK_CGROUP
> >  	struct request *rq = blk_mq_rq_from_pdu(cmd);
> >  
> > @@ -955,54 +999,38 @@ static void loop_queue_work(struct loop_device *lo, struct loop_cmd *cmd)
> >  		blkcg_css = &bio_blkcg(rq->bio)->css;
> >  #endif
> >  
> > -	spin_lock(&lo->lo_work_lock);
> > -
> > -	if (queue_on_root_worker(blkcg_css))
> > -		goto queue_work;
> > -
> > -	/* css->id is unique in each cgroup subsystem */
> > -	worker = xa_load(&lo->workers, blkcg_css->id);
> > -	if (worker)
> > -		goto queue_work;
> > -
> > -	worker = kzalloc(sizeof(*worker), gfp);
> > -	/*
> > -	 * In the event we cannot allocate a worker, just queue on the
> > -	 * rootcg worker and issue the I/O as the rootcg
> > -	 */
> > -	if (!worker)
> > -		goto queue_work;
> > +	if (!queue_on_root_worker(blkcg_css)) {
> > +		int ret = 0;
> >  
> > -	worker->blkcg_css = blkcg_css;
> > -	css_get(worker->blkcg_css);
> > -	INIT_WORK(&worker->work, loop_workfn);
> > -	INIT_LIST_HEAD(&worker->cmd_list);
> > -	INIT_LIST_HEAD(&worker->idle_list);
> > -	worker->lo = lo;
> > +		rcu_read_lock();
> > +		/* css->id is unique in each cgroup subsystem */
> > +		worker = xa_load(&lo->workers, blkcg_css->id);
> > +		if (worker)
> > +			ret = refcount_inc_not_zero(&worker->refcnt);
> > +		rcu_read_unlock();
> 
> I don't follow the refcount decrement here. Also, what's the purpose
> of the rcu critical section here?

xa_load() requires rcu readlock, and the rcu can guarantee that the
'worker' won't be released when grabbing its reference.

> 
> >  
> > -	if (xa_err(xa_store(&lo->workers, blkcg_css->id, worker, gfp))) {
> > -		kfree(worker);
> > -		worker = NULL;
> > +		if (!worker || !ret)
> > +			worker = loop_alloc_or_get_worker(lo, blkcg_css);
> > +		/*
> > +		 * In the event we cannot allocate a worker, just queue on the
> > +		 * rootcg worker and issue the I/O as the rootcg
> > +		 */
> >  	}
> >  
> > -queue_work:
> >  	if (worker) {
> > -		/*
> > -		 * We need to remove from the idle list here while
> > -		 * holding the lock so that the idle timer doesn't
> > -		 * free the worker
> > -		 */
> > -		if (!list_empty(&worker->idle_list))
> > -			list_del_init(&worker->idle_list);
> >  		work = &worker->work;
> >  		cmd_list = &worker->cmd_list;
> > +		lock = &worker->lock;
> >  	} else {
> >  		work = &lo->rootcg_work;
> >  		cmd_list = &lo->rootcg_cmd_list;
> > +		lock = &lo->lo_work_lock;
> 
> Is lo_work_lock special here or is it just because the root "worker"
> lacks a lock itself? I wonder if a separate spinlock is more clear.

'->rootcg_cmd_list' is one per-loop-device list, so it needs one
per-loop-device lock. Yeah, in theory, we can add one lock just for
protecting the list, but it is fine to reuse it since the only extra
use is to reclaim worker which is definitely in slow path, so no
necessary to add a new one, IMO.

> 
> >  	}
> > +
> > +	spin_lock(lock);
> >  	list_add_tail(&cmd->list_entry, cmd_list);
> > +	spin_unlock(lock);
> >  	queue_work(lo->workqueue, work);
> > -	spin_unlock(&lo->lo_work_lock);
> >  }
> >  
> >  static void loop_update_rotational(struct loop_device *lo)
> > @@ -1131,20 +1159,18 @@ static void loop_set_timer(struct loop_device *lo)
> >  
> >  static void __loop_free_idle_workers(struct loop_device *lo, bool force)
> >  {
> > -	struct loop_worker *pos, *worker;
> > +	struct loop_worker *worker;
> > +	unsigned long id;
> >  
> >  	spin_lock(&lo->lo_work_lock);
> > -	list_for_each_entry_safe(worker, pos, &lo->idle_worker_list,
> > -				idle_list) {
> > +	xa_for_each(&lo->workers, id, worker) {
> >  		if (!force && time_is_after_jiffies(worker->last_ran_at +
> >  						LOOP_IDLE_WORKER_TIMEOUT))
> >  			break;
> > -		list_del(&worker->idle_list);
> > -		xa_erase(&lo->workers, worker->blkcg_css->id);
> > -		css_put(worker->blkcg_css);
> > -		kfree(worker);
> > +		if (refcount_dec_and_test(&worker->refcnt))
> > +			loop_release_worker(worker);
> 
> This one is puzzling to me. Can't you hit this refcount decrement
> superfluously each time the loop timer fires?

Not sure I get your point.

As I mentioned above, this one is the counter pair of INIT reference,
but one new lo_cmd may just grab it when queueing rq before erasing the
worker from xarray, so we can't release worker here until the command is
completed.

> 
> >  	}
> > -	if (!list_empty(&lo->idle_worker_list))
> > +	if (!xa_empty(&lo->workers))
> >  		loop_set_timer(lo);
> >  	spin_unlock(&lo->lo_work_lock);
> >  }
> > @@ -2148,27 +2174,29 @@ static void loop_handle_cmd(struct loop_cmd *cmd)
> >  }
> >  
> >  static void loop_process_work(struct loop_worker *worker,
> > -			struct list_head *cmd_list, struct loop_device *lo)
> > +			struct list_head *cmd_list, spinlock_t *lock)
> >  {
> >  	int orig_flags = current->flags;
> >  	struct loop_cmd *cmd;
> >  	LIST_HEAD(list);
> > +	int cnt = 0;
> >  
> >  	current->flags |= PF_LOCAL_THROTTLE | PF_MEMALLOC_NOIO;
> >  
> > -	spin_lock(&lo->lo_work_lock);
> > +	spin_lock(lock);
> >   again:
> >  	list_splice_init(cmd_list, &list);
> > -	spin_unlock(&lo->lo_work_lock);
> > +	spin_unlock(lock);
> >  
> >  	while (!list_empty(&list)) {
> >  		cmd = list_first_entry(&list, struct loop_cmd, list_entry);
> >  		list_del_init(&cmd->list_entry);
> >  
> >  		loop_handle_cmd(cmd);
> > +		cnt++;
> >  	}
> >  
> > -	spin_lock(&lo->lo_work_lock);
> > +	spin_lock(lock);
> >  	if (!list_empty(cmd_list))
> >  		goto again;
> >  
> > @@ -2179,11 +2207,13 @@ static void loop_process_work(struct loop_worker *worker,
> >  	 */
> >  	if (worker && !work_pending(&worker->work)) {
> >  		worker->last_ran_at = jiffies;
> > -		list_add_tail(&worker->idle_list, &lo->idle_worker_list);
> > -		loop_set_timer(lo);
> > +		loop_set_timer(worker->lo);
> >  	}
> > -	spin_unlock(&lo->lo_work_lock);
> > +	spin_unlock(lock);
> >  	current->flags = orig_flags;
> > +
> > +	if (worker && refcount_sub_and_test(cnt, &worker->refcnt))
> > +		loop_release_worker(worker);
> >  }
> >  
> >  static void loop_workfn(struct work_struct *work)
> > @@ -2202,7 +2232,7 @@ static void loop_workfn(struct work_struct *work)
> >  		old_memcg = set_active_memcg(
> >  				mem_cgroup_from_css(memcg_css));
> >  
> > -	loop_process_work(worker, &worker->cmd_list, worker->lo);
> > +	loop_process_work(worker, &worker->cmd_list, &worker->lock);
> >  
> >  	kthread_associate_blkcg(NULL);
> >  	if (memcg_css) {
> > @@ -2215,7 +2245,7 @@ static void loop_rootcg_workfn(struct work_struct *work)
> >  {
> >  	struct loop_device *lo =
> >  		container_of(work, struct loop_device, rootcg_work);
> > -	loop_process_work(NULL, &lo->rootcg_cmd_list, lo);
> > +	loop_process_work(NULL, &lo->rootcg_cmd_list, &lo->lo_work_lock);
> >  }
> >  
> >  static const struct blk_mq_ops loop_mq_ops = {
> > -- 
> > 2.31.1
> > 
> 
> The rest of this patch series looks great. Feel free to add my
> Acked-by to the others.

Thanks for your review!


Thanks,
Ming


^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [PATCH 6/6] loop: don't add worker into idle list
  2021-07-07  3:19     ` Ming Lei
@ 2021-07-07 13:55       ` Dan Schatzberg
  2021-07-08  6:58         ` Ming Lei
  0 siblings, 1 reply; 22+ messages in thread
From: Dan Schatzberg @ 2021-07-07 13:55 UTC (permalink / raw)
  To: Ming Lei; +Cc: Jens Axboe, linux-block, Christoph Hellwig, Michal Koutný

On Wed, Jul 07, 2021 at 11:19:14AM +0800, Ming Lei wrote:
> On Tue, Jul 06, 2021 at 09:55:36AM -0400, Dan Schatzberg wrote:
> > On Mon, Jul 05, 2021 at 06:26:07PM +0800, Ming Lei wrote:
> > >  	}
> > > +
> > > +	spin_lock(lock);
> > >  	list_add_tail(&cmd->list_entry, cmd_list);
> > > +	spin_unlock(lock);
> > >  	queue_work(lo->workqueue, work);
> > > -	spin_unlock(&lo->lo_work_lock);
> > >  }
> > >  
> > >  static void loop_update_rotational(struct loop_device *lo)
> > > @@ -1131,20 +1159,18 @@ static void loop_set_timer(struct loop_device *lo)
> > >  
> > >  static void __loop_free_idle_workers(struct loop_device *lo, bool force)
> > >  {
> > > -	struct loop_worker *pos, *worker;
> > > +	struct loop_worker *worker;
> > > +	unsigned long id;
> > >  
> > >  	spin_lock(&lo->lo_work_lock);
> > > -	list_for_each_entry_safe(worker, pos, &lo->idle_worker_list,
> > > -				idle_list) {
> > > +	xa_for_each(&lo->workers, id, worker) {
> > >  		if (!force && time_is_after_jiffies(worker->last_ran_at +
> > >  						LOOP_IDLE_WORKER_TIMEOUT))
> > >  			break;
> > > -		list_del(&worker->idle_list);
> > > -		xa_erase(&lo->workers, worker->blkcg_css->id);
> > > -		css_put(worker->blkcg_css);
> > > -		kfree(worker);
> > > +		if (refcount_dec_and_test(&worker->refcnt))
> > > +			loop_release_worker(worker);
> > 
> > This one is puzzling to me. Can't you hit this refcount decrement
> > superfluously each time the loop timer fires?
> 
> Not sure I get your point.
> 
> As I mentioned above, this one is the counter pair of INIT reference,
> but one new lo_cmd may just grab it when queueing rq before erasing the
> worker from xarray, so we can't release worker here until the command is
> completed.

Suppose at this point there's still an outstanding loop_cmd to be
serviced for this worker. The refcount_dec_and_test should decrement
the refcount and then fail the conditional, not calling
loop_release_worker. What happens if __loop_free_idle_workers fires
again before the loop_cmd is processed? Won't you decrement the
refcount again, and then end up calling loop_release_worker before the
loop_cmd is processed?

^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [PATCH 6/6] loop: don't add worker into idle list
  2021-07-07 13:55       ` Dan Schatzberg
@ 2021-07-08  6:58         ` Ming Lei
  2021-07-08 14:16           ` Dan Schatzberg
  2021-07-08 14:41           ` Dan Schatzberg
  0 siblings, 2 replies; 22+ messages in thread
From: Ming Lei @ 2021-07-08  6:58 UTC (permalink / raw)
  To: Dan Schatzberg
  Cc: Jens Axboe, linux-block, Christoph Hellwig, Michal Koutný

On Wed, Jul 07, 2021 at 09:55:34AM -0400, Dan Schatzberg wrote:
> On Wed, Jul 07, 2021 at 11:19:14AM +0800, Ming Lei wrote:
> > On Tue, Jul 06, 2021 at 09:55:36AM -0400, Dan Schatzberg wrote:
> > > On Mon, Jul 05, 2021 at 06:26:07PM +0800, Ming Lei wrote:
> > > >  	}
> > > > +
> > > > +	spin_lock(lock);
> > > >  	list_add_tail(&cmd->list_entry, cmd_list);
> > > > +	spin_unlock(lock);
> > > >  	queue_work(lo->workqueue, work);
> > > > -	spin_unlock(&lo->lo_work_lock);
> > > >  }
> > > >  
> > > >  static void loop_update_rotational(struct loop_device *lo)
> > > > @@ -1131,20 +1159,18 @@ static void loop_set_timer(struct loop_device *lo)
> > > >  
> > > >  static void __loop_free_idle_workers(struct loop_device *lo, bool force)
> > > >  {
> > > > -	struct loop_worker *pos, *worker;
> > > > +	struct loop_worker *worker;
> > > > +	unsigned long id;
> > > >  
> > > >  	spin_lock(&lo->lo_work_lock);
> > > > -	list_for_each_entry_safe(worker, pos, &lo->idle_worker_list,
> > > > -				idle_list) {
> > > > +	xa_for_each(&lo->workers, id, worker) {
> > > >  		if (!force && time_is_after_jiffies(worker->last_ran_at +
> > > >  						LOOP_IDLE_WORKER_TIMEOUT))
> > > >  			break;
> > > > -		list_del(&worker->idle_list);
> > > > -		xa_erase(&lo->workers, worker->blkcg_css->id);
> > > > -		css_put(worker->blkcg_css);
> > > > -		kfree(worker);
> > > > +		if (refcount_dec_and_test(&worker->refcnt))
> > > > +			loop_release_worker(worker);
> > > 
> > > This one is puzzling to me. Can't you hit this refcount decrement
> > > superfluously each time the loop timer fires?
> > 
> > Not sure I get your point.
> > 
> > As I mentioned above, this one is the counter pair of INIT reference,
> > but one new lo_cmd may just grab it when queueing rq before erasing the
> > worker from xarray, so we can't release worker here until the command is
> > completed.
> 
> Suppose at this point there's still an outstanding loop_cmd to be
> serviced for this worker. The refcount_dec_and_test should decrement
> the refcount and then fail the conditional, not calling
> loop_release_worker. What happens if __loop_free_idle_workers fires
> again before the loop_cmd is processed? Won't you decrement the
> refcount again, and then end up calling loop_release_worker before the
> loop_cmd is processed?
 
Good catch!

The following one line change should avoid the issue:

diff --git a/drivers/block/loop.c b/drivers/block/loop.c
index 146eaa03629b..3cd51bddfec9 100644
--- a/drivers/block/loop.c
+++ b/drivers/block/loop.c
@@ -980,7 +980,6 @@ static struct loop_worker *loop_alloc_or_get_worker(struct loop_device *lo,
 
 static void loop_release_worker(struct loop_worker *worker)
 {
-	xa_erase(&worker->lo->workers, worker->blkcg_css->id);
 	css_put(worker->blkcg_css);
 	kfree(worker);
 }
@@ -1167,6 +1166,7 @@ static void __loop_free_idle_workers(struct loop_device *lo, bool force)
 		if (!force && time_is_after_jiffies(worker->last_ran_at +
 						LOOP_IDLE_WORKER_TIMEOUT))
 			break;
+		xa_erase(&worker->lo->workers, worker->blkcg_css->id);
 		if (refcount_dec_and_test(&worker->refcnt))
 			loop_release_worker(worker);
 	}


Thanks, 
Ming


^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [PATCH 1/6] loop: clean up blkcg association
  2021-07-06  5:51   ` Christoph Hellwig
@ 2021-07-08  7:20     ` Ming Lei
  0 siblings, 0 replies; 22+ messages in thread
From: Ming Lei @ 2021-07-08  7:20 UTC (permalink / raw)
  To: Christoph Hellwig
  Cc: Jens Axboe, linux-block, Michal Koutný, Dan Schatzberg

On Tue, Jul 06, 2021 at 07:51:09AM +0200, Christoph Hellwig wrote:
> Did I mention that all this loop blkcg integration is complete and utter
> garbage and should have never been merged?

Sorry, I didn't see your comment on that, and loop blkcg patches have been
merged to linus tree. IMO, this approach fixes real issue wrt. cgroup on
loop.

> 
> 
> >  	struct list_head *cmd_list;
> > +	struct cgroup_subsys_state *blkcg_css = NULL;
> > +#ifdef CONFIG_BLK_CGROUP
> > +	struct request *rq = blk_mq_rq_from_pdu(cmd);
> > +
> > +	if (rq->bio && rq->bio->bi_blkg)
> > +		blkcg_css = &bio_blkcg(rq->bio)->css;
> > +#endif
> 
> This kind of junk has no business in a driver.  The blkcg code
> need to provide a helper for retreiving the blkcg_css from a request,
> including a stub for the the !CONFIG_BLK_CGROUP case.

Fine, it should be cleaner to move the above code into one helper.

> 
> >  		cur_worker = container_of(*node, struct loop_worker, rb_node);
> > -		if (cur_worker->blkcg_css == cmd->blkcg_css) {
> > +		if (cur_worker->blkcg_css == blkcg_css) {
> >  			worker = cur_worker;
> >  			break;
> > -		} else if ((long)cur_worker->blkcg_css < (long)cmd->blkcg_css) {
> > +		} else if ((long)cur_worker->blkcg_css < (long)blkcg_css) {
> 
> No need for an else after a break.  And more importantly no need to cast
> a pointer to compare it to another pointer of the same type.

I believe that change doesn't belong to this patch, also the rbtree is killed
in patch 5/6.

> 
> > +	struct mem_cgroup *old_memcg = NULL;
> > +	struct cgroup_subsys_state *memcg_css = NULL;
> > +
> > +	kthread_associate_blkcg(worker->blkcg_css);
> > +#ifdef CONFIG_MEMCG
> > +	memcg_css = cgroup_get_e_css(worker->blkcg_css->cgroup,
> > +			&memory_cgrp_subsys);
> > +#endif
> > +	if (memcg_css)
> > +		old_memcg = set_active_memcg(
> > +				mem_cgroup_from_css(memcg_css));
> > +
> 
> This kind of crap also has absolutely no business in a block driver
> and the memcg code should provide a proper helper.

Except for loop, cgroup_get_e_css() is only used by cgwb, and this use
case is very special, in which the associated memcg has to be set as active,
so memcg has to be visible here.


Thanks,
Ming


^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [PATCH 2/6] loop: conver timer for monitoring idle worker into dwork
  2021-07-06  5:52   ` Christoph Hellwig
@ 2021-07-08  7:23     ` Ming Lei
  0 siblings, 0 replies; 22+ messages in thread
From: Ming Lei @ 2021-07-08  7:23 UTC (permalink / raw)
  To: Christoph Hellwig
  Cc: Jens Axboe, linux-block, Michal Koutný, Dan Schatzberg

On Tue, Jul 06, 2021 at 07:52:27AM +0200, Christoph Hellwig wrote:
> On Mon, Jul 05, 2021 at 06:26:03PM +0800, Ming Lei wrote:
> > +static void loop_free_idle_workers(struct work_struct *work)
> > +{
> > +	struct loop_device *lo = container_of(work, struct loop_device,
> > +			idle_work.work);
> > +	struct loop_worker *pos, *worker;
> > +
> > +	spin_lock(&lo->lo_work_lock);
> > +	list_for_each_entry_safe(worker, pos, &lo->idle_worker_list,
> > +				idle_list) {
> > +		if (time_is_after_jiffies(worker->last_ran_at +
> > +						LOOP_IDLE_WORKER_TIMEOUT))
> > +			break;
> > +		list_del(&worker->idle_list);
> > +		rb_erase(&worker->rb_node, &lo->worker_tree);
> > +		css_put(worker->blkcg_css);
> > +		kfree(worker);
> > +	}
> > +	if (!list_empty(&lo->idle_worker_list))
> > +		loop_set_timer(lo);
> > +	spin_unlock(&lo->lo_work_lock);
> > +}
> > +
> > +
> 
> Double empty line.  But that whole fact that the loop driver badly
> reimplements work queues is just fucked up.  We need to revert this
> shit.

Can you explain why work queues are bad for fixing this blkcg/memcg
issue?


Thanks, 
Ming


^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [PATCH 6/6] loop: don't add worker into idle list
  2021-07-08  6:58         ` Ming Lei
@ 2021-07-08 14:16           ` Dan Schatzberg
  2021-07-08 15:01             ` Ming Lei
  2021-07-08 14:41           ` Dan Schatzberg
  1 sibling, 1 reply; 22+ messages in thread
From: Dan Schatzberg @ 2021-07-08 14:16 UTC (permalink / raw)
  To: Ming Lei; +Cc: Jens Axboe, linux-block, Christoph Hellwig, Michal Koutný

On Thu, Jul 08, 2021 at 02:58:36PM +0800, Ming Lei wrote:
> On Wed, Jul 07, 2021 at 09:55:34AM -0400, Dan Schatzberg wrote:
> > On Wed, Jul 07, 2021 at 11:19:14AM +0800, Ming Lei wrote:
> > > On Tue, Jul 06, 2021 at 09:55:36AM -0400, Dan Schatzberg wrote:
> > > > On Mon, Jul 05, 2021 at 06:26:07PM +0800, Ming Lei wrote:
> > > > >  	}
> > > > > +
> > > > > +	spin_lock(lock);
> > > > >  	list_add_tail(&cmd->list_entry, cmd_list);
> > > > > +	spin_unlock(lock);
> > > > >  	queue_work(lo->workqueue, work);
> > > > > -	spin_unlock(&lo->lo_work_lock);
> > > > >  }
> > > > >  
> > > > >  static void loop_update_rotational(struct loop_device *lo)
> > > > > @@ -1131,20 +1159,18 @@ static void loop_set_timer(struct loop_device *lo)
> > > > >  
> > > > >  static void __loop_free_idle_workers(struct loop_device *lo, bool force)
> > > > >  {
> > > > > -	struct loop_worker *pos, *worker;
> > > > > +	struct loop_worker *worker;
> > > > > +	unsigned long id;
> > > > >  
> > > > >  	spin_lock(&lo->lo_work_lock);
> > > > > -	list_for_each_entry_safe(worker, pos, &lo->idle_worker_list,
> > > > > -				idle_list) {
> > > > > +	xa_for_each(&lo->workers, id, worker) {
> > > > >  		if (!force && time_is_after_jiffies(worker->last_ran_at +
> > > > >  						LOOP_IDLE_WORKER_TIMEOUT))
> > > > >  			break;
> > > > > -		list_del(&worker->idle_list);
> > > > > -		xa_erase(&lo->workers, worker->blkcg_css->id);
> > > > > -		css_put(worker->blkcg_css);
> > > > > -		kfree(worker);
> > > > > +		if (refcount_dec_and_test(&worker->refcnt))
> > > > > +			loop_release_worker(worker);
> > > > 
> > > > This one is puzzling to me. Can't you hit this refcount decrement
> > > > superfluously each time the loop timer fires?
> > > 
> > > Not sure I get your point.
> > > 
> > > As I mentioned above, this one is the counter pair of INIT reference,
> > > but one new lo_cmd may just grab it when queueing rq before erasing the
> > > worker from xarray, so we can't release worker here until the command is
> > > completed.
> > 
> > Suppose at this point there's still an outstanding loop_cmd to be
> > serviced for this worker. The refcount_dec_and_test should decrement
> > the refcount and then fail the conditional, not calling
> > loop_release_worker. What happens if __loop_free_idle_workers fires
> > again before the loop_cmd is processed? Won't you decrement the
> > refcount again, and then end up calling loop_release_worker before the
> > loop_cmd is processed?
>  
> Good catch!
> 
> The following one line change should avoid the issue:
> 
> diff --git a/drivers/block/loop.c b/drivers/block/loop.c
> index 146eaa03629b..3cd51bddfec9 100644
> --- a/drivers/block/loop.c
> +++ b/drivers/block/loop.c
> @@ -980,7 +980,6 @@ static struct loop_worker *loop_alloc_or_get_worker(struct loop_device *lo,
>  
>  static void loop_release_worker(struct loop_worker *worker)
>  {
> -	xa_erase(&worker->lo->workers, worker->blkcg_css->id);
>  	css_put(worker->blkcg_css);
>  	kfree(worker);
>  }
> @@ -1167,6 +1166,7 @@ static void __loop_free_idle_workers(struct loop_device *lo, bool force)
>  		if (!force && time_is_after_jiffies(worker->last_ran_at +
>  						LOOP_IDLE_WORKER_TIMEOUT))
>  			break;
> +		xa_erase(&worker->lo->workers, worker->blkcg_css->id);
>  		if (refcount_dec_and_test(&worker->refcnt))
>  			loop_release_worker(worker);
>  	}

Yeah, I think this resolves the issue. You could end up repeatedly
allocating workers for the same blkcg in the event that you're keeping
the worker busy for the entire LOOP_IDLE_WORKER_TIMEOUT (since it only
updates the last_ran_at when idle). You may want to add a racy check
if the refcount is > 1 to avoid that.

I think there might be a separate issue with the locking here though -
you acquire the lo->lo_work_lock in __loop_free_idle_workers and then
check worker->last_ran_at for each worker. However you only protect
the write to worker->last_ran_at (in loop_process_work) with the
worker->lock which I think means there's a potential data race on
worker->last_ran_at.

^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [PATCH 6/6] loop: don't add worker into idle list
  2021-07-08  6:58         ` Ming Lei
  2021-07-08 14:16           ` Dan Schatzberg
@ 2021-07-08 14:41           ` Dan Schatzberg
  1 sibling, 0 replies; 22+ messages in thread
From: Dan Schatzberg @ 2021-07-08 14:41 UTC (permalink / raw)
  To: Ming Lei; +Cc: Jens Axboe, linux-block, Christoph Hellwig, Michal Koutný

On Thu, Jul 08, 2021 at 02:58:36PM +0800, Ming Lei wrote:
> diff --git a/drivers/block/loop.c b/drivers/block/loop.c
> index 146eaa03629b..3cd51bddfec9 100644
> --- a/drivers/block/loop.c
> +++ b/drivers/block/loop.c
> @@ -980,7 +980,6 @@ static struct loop_worker *loop_alloc_or_get_worker(struct loop_device *lo,
>  
>  static void loop_release_worker(struct loop_worker *worker)
>  {
> -	xa_erase(&worker->lo->workers, worker->blkcg_css->id);
>  	css_put(worker->blkcg_css);
>  	kfree(worker);

Another thought - do you need to change the kfree here to kfree_rcu?
I'm concerned about the scenario where loop_queue_work's xa_load finds
the worker and subsequently __loop_free_idle_workers erases and calls
loop_release_worker. If the worker is freed then the subsequent
refcount_inc_not_zero in loop_queue_work would be a use after free.

^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [PATCH 6/6] loop: don't add worker into idle list
  2021-07-08 14:16           ` Dan Schatzberg
@ 2021-07-08 15:01             ` Ming Lei
  2021-07-08 15:15               ` Dan Schatzberg
  0 siblings, 1 reply; 22+ messages in thread
From: Ming Lei @ 2021-07-08 15:01 UTC (permalink / raw)
  To: Dan Schatzberg
  Cc: Jens Axboe, linux-block, Christoph Hellwig, Michal Koutný

On Thu, Jul 08, 2021 at 10:16:50AM -0400, Dan Schatzberg wrote:
> On Thu, Jul 08, 2021 at 02:58:36PM +0800, Ming Lei wrote:
> > On Wed, Jul 07, 2021 at 09:55:34AM -0400, Dan Schatzberg wrote:
> > > On Wed, Jul 07, 2021 at 11:19:14AM +0800, Ming Lei wrote:
> > > > On Tue, Jul 06, 2021 at 09:55:36AM -0400, Dan Schatzberg wrote:
> > > > > On Mon, Jul 05, 2021 at 06:26:07PM +0800, Ming Lei wrote:
> > > > > >  	}
> > > > > > +
> > > > > > +	spin_lock(lock);
> > > > > >  	list_add_tail(&cmd->list_entry, cmd_list);
> > > > > > +	spin_unlock(lock);
> > > > > >  	queue_work(lo->workqueue, work);
> > > > > > -	spin_unlock(&lo->lo_work_lock);
> > > > > >  }
> > > > > >  
> > > > > >  static void loop_update_rotational(struct loop_device *lo)
> > > > > > @@ -1131,20 +1159,18 @@ static void loop_set_timer(struct loop_device *lo)
> > > > > >  
> > > > > >  static void __loop_free_idle_workers(struct loop_device *lo, bool force)
> > > > > >  {
> > > > > > -	struct loop_worker *pos, *worker;
> > > > > > +	struct loop_worker *worker;
> > > > > > +	unsigned long id;
> > > > > >  
> > > > > >  	spin_lock(&lo->lo_work_lock);
> > > > > > -	list_for_each_entry_safe(worker, pos, &lo->idle_worker_list,
> > > > > > -				idle_list) {
> > > > > > +	xa_for_each(&lo->workers, id, worker) {
> > > > > >  		if (!force && time_is_after_jiffies(worker->last_ran_at +
> > > > > >  						LOOP_IDLE_WORKER_TIMEOUT))
> > > > > >  			break;
> > > > > > -		list_del(&worker->idle_list);
> > > > > > -		xa_erase(&lo->workers, worker->blkcg_css->id);
> > > > > > -		css_put(worker->blkcg_css);
> > > > > > -		kfree(worker);
> > > > > > +		if (refcount_dec_and_test(&worker->refcnt))
> > > > > > +			loop_release_worker(worker);
> > > > > 
> > > > > This one is puzzling to me. Can't you hit this refcount decrement
> > > > > superfluously each time the loop timer fires?
> > > > 
> > > > Not sure I get your point.
> > > > 
> > > > As I mentioned above, this one is the counter pair of INIT reference,
> > > > but one new lo_cmd may just grab it when queueing rq before erasing the
> > > > worker from xarray, so we can't release worker here until the command is
> > > > completed.
> > > 
> > > Suppose at this point there's still an outstanding loop_cmd to be
> > > serviced for this worker. The refcount_dec_and_test should decrement
> > > the refcount and then fail the conditional, not calling
> > > loop_release_worker. What happens if __loop_free_idle_workers fires
> > > again before the loop_cmd is processed? Won't you decrement the
> > > refcount again, and then end up calling loop_release_worker before the
> > > loop_cmd is processed?
> >  
> > Good catch!
> > 
> > The following one line change should avoid the issue:
> > 
> > diff --git a/drivers/block/loop.c b/drivers/block/loop.c
> > index 146eaa03629b..3cd51bddfec9 100644
> > --- a/drivers/block/loop.c
> > +++ b/drivers/block/loop.c
> > @@ -980,7 +980,6 @@ static struct loop_worker *loop_alloc_or_get_worker(struct loop_device *lo,
> >  
> >  static void loop_release_worker(struct loop_worker *worker)
> >  {
> > -	xa_erase(&worker->lo->workers, worker->blkcg_css->id);
> >  	css_put(worker->blkcg_css);
> >  	kfree(worker);
> >  }
> > @@ -1167,6 +1166,7 @@ static void __loop_free_idle_workers(struct loop_device *lo, bool force)
> >  		if (!force && time_is_after_jiffies(worker->last_ran_at +
> >  						LOOP_IDLE_WORKER_TIMEOUT))
> >  			break;
> > +		xa_erase(&worker->lo->workers, worker->blkcg_css->id);
> >  		if (refcount_dec_and_test(&worker->refcnt))
> >  			loop_release_worker(worker);
> >  	}
> 
> Yeah, I think this resolves the issue. You could end up repeatedly
> allocating workers for the same blkcg in the event that you're keeping
> the worker busy for the entire LOOP_IDLE_WORKER_TIMEOUT (since it only
> updates the last_ran_at when idle). You may want to add a racy check
> if the refcount is > 1 to avoid that.

Given the event is very unlikely to trigger, I think we can live
with that.

> 
> I think there might be a separate issue with the locking here though -
> you acquire the lo->lo_work_lock in __loop_free_idle_workers and then
> check worker->last_ran_at for each worker. However you only protect
> the write to worker->last_ran_at (in loop_process_work) with the
> worker->lock which I think means there's a potential data race on
> worker->last_ran_at.

It should be fine since both WRITE and READ on worker->last_ran_at is
atomic. Even though the race is triggered, we still can live with that.


On Thu, Jul 8, 2021 at 10:41 PM Dan Schatzberg <schatzberg.dan@gmail.com> wrote:
>
> On Thu, Jul 08, 2021 at 02:58:36PM +0800, Ming Lei wrote:
...
> Another thought - do you need to change the kfree here to kfree_rcu?
> I'm concerned about the scenario where loop_queue_work's xa_load finds
> the worker and subsequently __loop_free_idle_workers erases and calls
> loop_release_worker. If the worker is freed then the subsequent
> refcount_inc_not_zero in loop_queue_work would be a use after free.

Good catch, will fix it in next version.


Thanks,
Ming


^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [PATCH 6/6] loop: don't add worker into idle list
  2021-07-08 15:01             ` Ming Lei
@ 2021-07-08 15:15               ` Dan Schatzberg
  2021-07-09  0:49                 ` Ming Lei
  0 siblings, 1 reply; 22+ messages in thread
From: Dan Schatzberg @ 2021-07-08 15:15 UTC (permalink / raw)
  To: Ming Lei; +Cc: Jens Axboe, linux-block, Christoph Hellwig, Michal Koutný

On Thu, Jul 08, 2021 at 11:01:54PM +0800, Ming Lei wrote:
> On Thu, Jul 08, 2021 at 10:16:50AM -0400, Dan Schatzberg wrote:
> > On Thu, Jul 08, 2021 at 02:58:36PM +0800, Ming Lei wrote:
> > > On Wed, Jul 07, 2021 at 09:55:34AM -0400, Dan Schatzberg wrote:
> > > > On Wed, Jul 07, 2021 at 11:19:14AM +0800, Ming Lei wrote:
> > > > > On Tue, Jul 06, 2021 at 09:55:36AM -0400, Dan Schatzberg wrote:
> > > > > > On Mon, Jul 05, 2021 at 06:26:07PM +0800, Ming Lei wrote:
> > > > > > >  	}
> > > > > > > +
> > > > > > > +	spin_lock(lock);
> > > > > > >  	list_add_tail(&cmd->list_entry, cmd_list);
> > > > > > > +	spin_unlock(lock);
> > > > > > >  	queue_work(lo->workqueue, work);
> > > > > > > -	spin_unlock(&lo->lo_work_lock);
> > > > > > >  }
> > > > > > >  
> > > > > > >  static void loop_update_rotational(struct loop_device *lo)
> > > > > > > @@ -1131,20 +1159,18 @@ static void loop_set_timer(struct loop_device *lo)
> > > > > > >  
> > > > > > >  static void __loop_free_idle_workers(struct loop_device *lo, bool force)
> > > > > > >  {
> > > > > > > -	struct loop_worker *pos, *worker;
> > > > > > > +	struct loop_worker *worker;
> > > > > > > +	unsigned long id;
> > > > > > >  
> > > > > > >  	spin_lock(&lo->lo_work_lock);
> > > > > > > -	list_for_each_entry_safe(worker, pos, &lo->idle_worker_list,
> > > > > > > -				idle_list) {
> > > > > > > +	xa_for_each(&lo->workers, id, worker) {
> > > > > > >  		if (!force && time_is_after_jiffies(worker->last_ran_at +
> > > > > > >  						LOOP_IDLE_WORKER_TIMEOUT))
> > > > > > >  			break;
> > > > > > > -		list_del(&worker->idle_list);
> > > > > > > -		xa_erase(&lo->workers, worker->blkcg_css->id);
> > > > > > > -		css_put(worker->blkcg_css);
> > > > > > > -		kfree(worker);
> > > > > > > +		if (refcount_dec_and_test(&worker->refcnt))
> > > > > > > +			loop_release_worker(worker);
> > > > > > 
> > > > > > This one is puzzling to me. Can't you hit this refcount decrement
> > > > > > superfluously each time the loop timer fires?
> > > > > 
> > > > > Not sure I get your point.
> > > > > 
> > > > > As I mentioned above, this one is the counter pair of INIT reference,
> > > > > but one new lo_cmd may just grab it when queueing rq before erasing the
> > > > > worker from xarray, so we can't release worker here until the command is
> > > > > completed.
> > > > 
> > > > Suppose at this point there's still an outstanding loop_cmd to be
> > > > serviced for this worker. The refcount_dec_and_test should decrement
> > > > the refcount and then fail the conditional, not calling
> > > > loop_release_worker. What happens if __loop_free_idle_workers fires
> > > > again before the loop_cmd is processed? Won't you decrement the
> > > > refcount again, and then end up calling loop_release_worker before the
> > > > loop_cmd is processed?
> > >  
> > > Good catch!
> > > 
> > > The following one line change should avoid the issue:
> > > 
> > > diff --git a/drivers/block/loop.c b/drivers/block/loop.c
> > > index 146eaa03629b..3cd51bddfec9 100644
> > > --- a/drivers/block/loop.c
> > > +++ b/drivers/block/loop.c
> > > @@ -980,7 +980,6 @@ static struct loop_worker *loop_alloc_or_get_worker(struct loop_device *lo,
> > >  
> > >  static void loop_release_worker(struct loop_worker *worker)
> > >  {
> > > -	xa_erase(&worker->lo->workers, worker->blkcg_css->id);
> > >  	css_put(worker->blkcg_css);
> > >  	kfree(worker);
> > >  }
> > > @@ -1167,6 +1166,7 @@ static void __loop_free_idle_workers(struct loop_device *lo, bool force)
> > >  		if (!force && time_is_after_jiffies(worker->last_ran_at +
> > >  						LOOP_IDLE_WORKER_TIMEOUT))
> > >  			break;
> > > +		xa_erase(&worker->lo->workers, worker->blkcg_css->id);
> > >  		if (refcount_dec_and_test(&worker->refcnt))
> > >  			loop_release_worker(worker);
> > >  	}
> > 
> > Yeah, I think this resolves the issue. You could end up repeatedly
> > allocating workers for the same blkcg in the event that you're keeping
> > the worker busy for the entire LOOP_IDLE_WORKER_TIMEOUT (since it only
> > updates the last_ran_at when idle). You may want to add a racy check
> > if the refcount is > 1 to avoid that.
> 
> Given the event is very unlikely to trigger, I think we can live
> with that.

It doesn't seem unlikely to me - any workload that saturates the
backing device would keep the loop worker constantly with at least one
loop_cmd queued and trigger a free and allocate every
LOOP_IDLE_WORKER_TIMEOUT. Another way to solve this is to just update
last_ran_at before or after each loop_cmd. In any event, I'll defer to
your decision, it's not a critical difference.

> 
> > 
> > I think there might be a separate issue with the locking here though -
> > you acquire the lo->lo_work_lock in __loop_free_idle_workers and then
> > check worker->last_ran_at for each worker. However you only protect
> > the write to worker->last_ran_at (in loop_process_work) with the
> > worker->lock which I think means there's a potential data race on
> > worker->last_ran_at.
> 
> It should be fine since both WRITE and READ on worker->last_ran_at is
> atomic. Even though the race is triggered, we still can live with that.

True, though in this case I think last_ran_at should be atomic_t with
atomic_set and atomic_read.

> 
> 
> On Thu, Jul 8, 2021 at 10:41 PM Dan Schatzberg <schatzberg.dan@gmail.com> wrote:
> >
> > On Thu, Jul 08, 2021 at 02:58:36PM +0800, Ming Lei wrote:
> ...
> > Another thought - do you need to change the kfree here to kfree_rcu?
> > I'm concerned about the scenario where loop_queue_work's xa_load finds
> > the worker and subsequently __loop_free_idle_workers erases and calls
> > loop_release_worker. If the worker is freed then the subsequent
> > refcount_inc_not_zero in loop_queue_work would be a use after free.
> 
> Good catch, will fix it in next version.

Thanks, you can go ahead and add my Acked-by to the updated version of
this patch as well.

^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [PATCH 6/6] loop: don't add worker into idle list
  2021-07-08 15:15               ` Dan Schatzberg
@ 2021-07-09  0:49                 ` Ming Lei
  2021-07-09 13:47                   ` Dan Schatzberg
  0 siblings, 1 reply; 22+ messages in thread
From: Ming Lei @ 2021-07-09  0:49 UTC (permalink / raw)
  To: Dan Schatzberg
  Cc: Jens Axboe, linux-block, Christoph Hellwig, Michal Koutný

On Thu, Jul 08, 2021 at 11:15:13AM -0400, Dan Schatzberg wrote:
> On Thu, Jul 08, 2021 at 11:01:54PM +0800, Ming Lei wrote:
> > On Thu, Jul 08, 2021 at 10:16:50AM -0400, Dan Schatzberg wrote:
> > > On Thu, Jul 08, 2021 at 02:58:36PM +0800, Ming Lei wrote:
> > > > On Wed, Jul 07, 2021 at 09:55:34AM -0400, Dan Schatzberg wrote:
> > > > > On Wed, Jul 07, 2021 at 11:19:14AM +0800, Ming Lei wrote:
> > > > > > On Tue, Jul 06, 2021 at 09:55:36AM -0400, Dan Schatzberg wrote:
> > > > > > > On Mon, Jul 05, 2021 at 06:26:07PM +0800, Ming Lei wrote:
> > > > > > > >  	}
> > > > > > > > +
> > > > > > > > +	spin_lock(lock);
> > > > > > > >  	list_add_tail(&cmd->list_entry, cmd_list);
> > > > > > > > +	spin_unlock(lock);
> > > > > > > >  	queue_work(lo->workqueue, work);
> > > > > > > > -	spin_unlock(&lo->lo_work_lock);
> > > > > > > >  }
> > > > > > > >  
> > > > > > > >  static void loop_update_rotational(struct loop_device *lo)
> > > > > > > > @@ -1131,20 +1159,18 @@ static void loop_set_timer(struct loop_device *lo)
> > > > > > > >  
> > > > > > > >  static void __loop_free_idle_workers(struct loop_device *lo, bool force)
> > > > > > > >  {
> > > > > > > > -	struct loop_worker *pos, *worker;
> > > > > > > > +	struct loop_worker *worker;
> > > > > > > > +	unsigned long id;
> > > > > > > >  
> > > > > > > >  	spin_lock(&lo->lo_work_lock);
> > > > > > > > -	list_for_each_entry_safe(worker, pos, &lo->idle_worker_list,
> > > > > > > > -				idle_list) {
> > > > > > > > +	xa_for_each(&lo->workers, id, worker) {
> > > > > > > >  		if (!force && time_is_after_jiffies(worker->last_ran_at +
> > > > > > > >  						LOOP_IDLE_WORKER_TIMEOUT))
> > > > > > > >  			break;
> > > > > > > > -		list_del(&worker->idle_list);
> > > > > > > > -		xa_erase(&lo->workers, worker->blkcg_css->id);
> > > > > > > > -		css_put(worker->blkcg_css);
> > > > > > > > -		kfree(worker);
> > > > > > > > +		if (refcount_dec_and_test(&worker->refcnt))
> > > > > > > > +			loop_release_worker(worker);
> > > > > > > 
> > > > > > > This one is puzzling to me. Can't you hit this refcount decrement
> > > > > > > superfluously each time the loop timer fires?
> > > > > > 
> > > > > > Not sure I get your point.
> > > > > > 
> > > > > > As I mentioned above, this one is the counter pair of INIT reference,
> > > > > > but one new lo_cmd may just grab it when queueing rq before erasing the
> > > > > > worker from xarray, so we can't release worker here until the command is
> > > > > > completed.
> > > > > 
> > > > > Suppose at this point there's still an outstanding loop_cmd to be
> > > > > serviced for this worker. The refcount_dec_and_test should decrement
> > > > > the refcount and then fail the conditional, not calling
> > > > > loop_release_worker. What happens if __loop_free_idle_workers fires
> > > > > again before the loop_cmd is processed? Won't you decrement the
> > > > > refcount again, and then end up calling loop_release_worker before the
> > > > > loop_cmd is processed?
> > > >  
> > > > Good catch!
> > > > 
> > > > The following one line change should avoid the issue:
> > > > 
> > > > diff --git a/drivers/block/loop.c b/drivers/block/loop.c
> > > > index 146eaa03629b..3cd51bddfec9 100644
> > > > --- a/drivers/block/loop.c
> > > > +++ b/drivers/block/loop.c
> > > > @@ -980,7 +980,6 @@ static struct loop_worker *loop_alloc_or_get_worker(struct loop_device *lo,
> > > >  
> > > >  static void loop_release_worker(struct loop_worker *worker)
> > > >  {
> > > > -	xa_erase(&worker->lo->workers, worker->blkcg_css->id);
> > > >  	css_put(worker->blkcg_css);
> > > >  	kfree(worker);
> > > >  }
> > > > @@ -1167,6 +1166,7 @@ static void __loop_free_idle_workers(struct loop_device *lo, bool force)
> > > >  		if (!force && time_is_after_jiffies(worker->last_ran_at +
> > > >  						LOOP_IDLE_WORKER_TIMEOUT))
> > > >  			break;
> > > > +		xa_erase(&worker->lo->workers, worker->blkcg_css->id);
> > > >  		if (refcount_dec_and_test(&worker->refcnt))
> > > >  			loop_release_worker(worker);
> > > >  	}
> > > 
> > > Yeah, I think this resolves the issue. You could end up repeatedly
> > > allocating workers for the same blkcg in the event that you're keeping
> > > the worker busy for the entire LOOP_IDLE_WORKER_TIMEOUT (since it only
> > > updates the last_ran_at when idle). You may want to add a racy check
> > > if the refcount is > 1 to avoid that.
> > 
> > Given the event is very unlikely to trigger, I think we can live
> > with that.
> 
> It doesn't seem unlikely to me - any workload that saturates the
> backing device would keep the loop worker constantly with at least one
> loop_cmd queued and trigger a free and allocate every
> LOOP_IDLE_WORKER_TIMEOUT. Another way to solve this is to just update
> last_ran_at before or after each loop_cmd. In any event, I'll defer to
> your decision, it's not a critical difference.

Sorry, I missed that ->last_ran_at is only set when the work isn't
pending, then we can cleanup/simplify the reclaim a bit by:

1) keep lo->idle_work to be scheduled in 60 period if there is any
active worker allocated, which is scheduled when allocating/reclaiming
one worker

2) always set ->last_ran_at after retrieving the worker from xarray,
which can be done lockless via WRITE_ONCE(), and it is cheap

3) inside __loop_free_idle_workers(), reclaim one worker only if the
worker is expired and hasn't commands in worker->cmd_list

> 
> > 
> > > 
> > > I think there might be a separate issue with the locking here though -
> > > you acquire the lo->lo_work_lock in __loop_free_idle_workers and then
> > > check worker->last_ran_at for each worker. However you only protect
> > > the write to worker->last_ran_at (in loop_process_work) with the
> > > worker->lock which I think means there's a potential data race on
> > > worker->last_ran_at.
> > 
> > It should be fine since both WRITE and READ on worker->last_ran_at is
> > atomic. Even though the race is triggered, we still can live with that.
> 
> True, though in this case I think last_ran_at should be atomic_t with
> atomic_set and atomic_read.

I think READ_ONCE()/WRITE_ONCE() should be enough, and we can set/get
last_ran_at lockless.

> 
> > 
> > 
> > On Thu, Jul 8, 2021 at 10:41 PM Dan Schatzberg <schatzberg.dan@gmail.com> wrote:
> > >
> > > On Thu, Jul 08, 2021 at 02:58:36PM +0800, Ming Lei wrote:
> > ...
> > > Another thought - do you need to change the kfree here to kfree_rcu?
> > > I'm concerned about the scenario where loop_queue_work's xa_load finds
> > > the worker and subsequently __loop_free_idle_workers erases and calls
> > > loop_release_worker. If the worker is freed then the subsequent
> > > refcount_inc_not_zero in loop_queue_work would be a use after free.
> > 
> > Good catch, will fix it in next version.
> 
> Thanks, you can go ahead and add my Acked-by to the updated version of
> this patch as well.

Thanks for the review!

-- 
Ming


^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [PATCH 6/6] loop: don't add worker into idle list
  2021-07-09  0:49                 ` Ming Lei
@ 2021-07-09 13:47                   ` Dan Schatzberg
  0 siblings, 0 replies; 22+ messages in thread
From: Dan Schatzberg @ 2021-07-09 13:47 UTC (permalink / raw)
  To: Ming Lei; +Cc: Jens Axboe, linux-block, Christoph Hellwig, Michal Koutný

On Fri, Jul 09, 2021 at 08:49:55AM +0800, Ming Lei wrote:
> On Thu, Jul 08, 2021 at 11:15:13AM -0400, Dan Schatzberg wrote:
> > On Thu, Jul 08, 2021 at 11:01:54PM +0800, Ming Lei wrote:
> > > On Thu, Jul 08, 2021 at 10:16:50AM -0400, Dan Schatzberg wrote:
> > > > On Thu, Jul 08, 2021 at 02:58:36PM +0800, Ming Lei wrote:
> > > > > On Wed, Jul 07, 2021 at 09:55:34AM -0400, Dan Schatzberg wrote:
> > > > > > On Wed, Jul 07, 2021 at 11:19:14AM +0800, Ming Lei wrote:
> > > > > > > On Tue, Jul 06, 2021 at 09:55:36AM -0400, Dan Schatzberg wrote:
> > > > > > > > On Mon, Jul 05, 2021 at 06:26:07PM +0800, Ming Lei wrote:
> > > > > > > > >  	}
> > > > > > > > > +
> > > > > > > > > +	spin_lock(lock);
> > > > > > > > >  	list_add_tail(&cmd->list_entry, cmd_list);
> > > > > > > > > +	spin_unlock(lock);
> > > > > > > > >  	queue_work(lo->workqueue, work);
> > > > > > > > > -	spin_unlock(&lo->lo_work_lock);
> > > > > > > > >  }
> > > > > > > > >  
> > > > > > > > >  static void loop_update_rotational(struct loop_device *lo)
> > > > > > > > > @@ -1131,20 +1159,18 @@ static void loop_set_timer(struct loop_device *lo)
> > > > > > > > >  
> > > > > > > > >  static void __loop_free_idle_workers(struct loop_device *lo, bool force)
> > > > > > > > >  {
> > > > > > > > > -	struct loop_worker *pos, *worker;
> > > > > > > > > +	struct loop_worker *worker;
> > > > > > > > > +	unsigned long id;
> > > > > > > > >  
> > > > > > > > >  	spin_lock(&lo->lo_work_lock);
> > > > > > > > > -	list_for_each_entry_safe(worker, pos, &lo->idle_worker_list,
> > > > > > > > > -				idle_list) {
> > > > > > > > > +	xa_for_each(&lo->workers, id, worker) {
> > > > > > > > >  		if (!force && time_is_after_jiffies(worker->last_ran_at +
> > > > > > > > >  						LOOP_IDLE_WORKER_TIMEOUT))
> > > > > > > > >  			break;
> > > > > > > > > -		list_del(&worker->idle_list);
> > > > > > > > > -		xa_erase(&lo->workers, worker->blkcg_css->id);
> > > > > > > > > -		css_put(worker->blkcg_css);
> > > > > > > > > -		kfree(worker);
> > > > > > > > > +		if (refcount_dec_and_test(&worker->refcnt))
> > > > > > > > > +			loop_release_worker(worker);
> > > > > > > > 
> > > > > > > > This one is puzzling to me. Can't you hit this refcount decrement
> > > > > > > > superfluously each time the loop timer fires?
> > > > > > > 
> > > > > > > Not sure I get your point.
> > > > > > > 
> > > > > > > As I mentioned above, this one is the counter pair of INIT reference,
> > > > > > > but one new lo_cmd may just grab it when queueing rq before erasing the
> > > > > > > worker from xarray, so we can't release worker here until the command is
> > > > > > > completed.
> > > > > > 
> > > > > > Suppose at this point there's still an outstanding loop_cmd to be
> > > > > > serviced for this worker. The refcount_dec_and_test should decrement
> > > > > > the refcount and then fail the conditional, not calling
> > > > > > loop_release_worker. What happens if __loop_free_idle_workers fires
> > > > > > again before the loop_cmd is processed? Won't you decrement the
> > > > > > refcount again, and then end up calling loop_release_worker before the
> > > > > > loop_cmd is processed?
> > > > >  
> > > > > Good catch!
> > > > > 
> > > > > The following one line change should avoid the issue:
> > > > > 
> > > > > diff --git a/drivers/block/loop.c b/drivers/block/loop.c
> > > > > index 146eaa03629b..3cd51bddfec9 100644
> > > > > --- a/drivers/block/loop.c
> > > > > +++ b/drivers/block/loop.c
> > > > > @@ -980,7 +980,6 @@ static struct loop_worker *loop_alloc_or_get_worker(struct loop_device *lo,
> > > > >  
> > > > >  static void loop_release_worker(struct loop_worker *worker)
> > > > >  {
> > > > > -	xa_erase(&worker->lo->workers, worker->blkcg_css->id);
> > > > >  	css_put(worker->blkcg_css);
> > > > >  	kfree(worker);
> > > > >  }
> > > > > @@ -1167,6 +1166,7 @@ static void __loop_free_idle_workers(struct loop_device *lo, bool force)
> > > > >  		if (!force && time_is_after_jiffies(worker->last_ran_at +
> > > > >  						LOOP_IDLE_WORKER_TIMEOUT))
> > > > >  			break;
> > > > > +		xa_erase(&worker->lo->workers, worker->blkcg_css->id);
> > > > >  		if (refcount_dec_and_test(&worker->refcnt))
> > > > >  			loop_release_worker(worker);
> > > > >  	}
> > > > 
> > > > Yeah, I think this resolves the issue. You could end up repeatedly
> > > > allocating workers for the same blkcg in the event that you're keeping
> > > > the worker busy for the entire LOOP_IDLE_WORKER_TIMEOUT (since it only
> > > > updates the last_ran_at when idle). You may want to add a racy check
> > > > if the refcount is > 1 to avoid that.
> > > 
> > > Given the event is very unlikely to trigger, I think we can live
> > > with that.
> > 
> > It doesn't seem unlikely to me - any workload that saturates the
> > backing device would keep the loop worker constantly with at least one
> > loop_cmd queued and trigger a free and allocate every
> > LOOP_IDLE_WORKER_TIMEOUT. Another way to solve this is to just update
> > last_ran_at before or after each loop_cmd. In any event, I'll defer to
> > your decision, it's not a critical difference.
> 
> Sorry, I missed that ->last_ran_at is only set when the work isn't
> pending, then we can cleanup/simplify the reclaim a bit by:
> 
> 1) keep lo->idle_work to be scheduled in 60 period if there is any
> active worker allocated, which is scheduled when allocating/reclaiming
> one worker

Makes sense, and you should have lo_work_lock held at both points so
this is safe.

> 
> 2) always set ->last_ran_at after retrieving the worker from xarray,
> which can be done lockless via WRITE_ONCE(), and it is cheap

Yes, or in loop_process_work, doesn't really matter where you do it so
long as it is per-cmd. I think this change alone resolves the issue.

> 
> 3) inside __loop_free_idle_workers(), reclaim one worker only if the
> worker is expired and hasn't commands in worker->cmd_list

Be careful here - the current locking doesn't allow for this because
you don't acquire the per-worker lock in __loop_free_idle_workers, so
accessing worker->cmd_list is a data-race. This is why I suggested
reading the refcount instead as it can be done without holding a lock.

> 
> > 
> > > 
> > > > 
> > > > I think there might be a separate issue with the locking here though -
> > > > you acquire the lo->lo_work_lock in __loop_free_idle_workers and then
> > > > check worker->last_ran_at for each worker. However you only protect
> > > > the write to worker->last_ran_at (in loop_process_work) with the
> > > > worker->lock which I think means there's a potential data race on
> > > > worker->last_ran_at.
> > > 
> > > It should be fine since both WRITE and READ on worker->last_ran_at is
> > > atomic. Even though the race is triggered, we still can live with that.
> > 
> > True, though in this case I think last_ran_at should be atomic_t with
> > atomic_set and atomic_read.
> 
> I think READ_ONCE()/WRITE_ONCE() should be enough, and we can set/get
> last_ran_at lockless.

Makes sense to me

^ permalink raw reply	[flat|nested] 22+ messages in thread

end of thread, other threads:[~2021-07-09 13:47 UTC | newest]

Thread overview: 22+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-07-05 10:26 [PATCH 0/6] loop: cleanup charging io to mem/blkcg Ming Lei
2021-07-05 10:26 ` [PATCH 1/6] loop: clean up blkcg association Ming Lei
2021-07-06  5:51   ` Christoph Hellwig
2021-07-08  7:20     ` Ming Lei
2021-07-05 10:26 ` [PATCH 2/6] loop: conver timer for monitoring idle worker into dwork Ming Lei
2021-07-06  5:52   ` Christoph Hellwig
2021-07-08  7:23     ` Ming Lei
2021-07-05 10:26 ` [PATCH 3/6] loop: add __loop_free_idle_workers() for covering freeing workers in clearing FD Ming Lei
2021-07-05 10:26 ` [PATCH 4/6] loop: improve loop_process_work Ming Lei
2021-07-06  5:54   ` Christoph Hellwig
2021-07-05 10:26 ` [PATCH 5/6] loop: use xarray to store workers Ming Lei
2021-07-05 10:26 ` [PATCH 6/6] loop: don't add worker into idle list Ming Lei
2021-07-06 13:55   ` Dan Schatzberg
2021-07-07  3:19     ` Ming Lei
2021-07-07 13:55       ` Dan Schatzberg
2021-07-08  6:58         ` Ming Lei
2021-07-08 14:16           ` Dan Schatzberg
2021-07-08 15:01             ` Ming Lei
2021-07-08 15:15               ` Dan Schatzberg
2021-07-09  0:49                 ` Ming Lei
2021-07-09 13:47                   ` Dan Schatzberg
2021-07-08 14:41           ` Dan Schatzberg

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.