linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
From: Jens Axboe <jens.axboe@oracle.com>
To: linux-kernel@vger.kernel.org, linux-fsdevel@vger.kernel.org
Cc: chris.mason@oracle.com, david@fromorbit.com, hch@infradead.org,
	akpm@linux-foundation.org, jack@suse.cz,
	yanmin_zhang@linux.intel.com, Jens Axboe <jens.axboe@oracle.com>
Subject: [PATCH 05/11] writeback: support > 1 flusher thread per bdi
Date: Mon, 18 May 2009 14:19:46 +0200	[thread overview]
Message-ID: <1242649192-16263-6-git-send-email-jens.axboe@oracle.com> (raw)
In-Reply-To: <1242649192-16263-1-git-send-email-jens.axboe@oracle.com>

Build on the bdi_writeback support by allowing registration of
more than 1 flusher thread. File systems can call bdi_add_flusher_task(bdi)
to add more flusher threads to the device. If they do so, they must also
provide a super_operations function to return the suitable bdi_writeback
struct from any given inode.

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
---
 fs/fs-writeback.c           |  328 ++++++++++++++++++++++++++++++++-----------
 include/linux/backing-dev.h |   31 ++++-
 include/linux/fs.h          |    3 +
 mm/backing-dev.c            |  257 ++++++++++++++++++++++++++--------
 mm/page-writeback.c         |    4 +-
 5 files changed, 479 insertions(+), 144 deletions(-)

diff --git a/fs/fs-writeback.c b/fs/fs-writeback.c
index 50e21e8..efdce88 100644
--- a/fs/fs-writeback.c
+++ b/fs/fs-writeback.c
@@ -34,84 +34,175 @@
  */
 int nr_pdflush_threads;
 
-/**
- * writeback_acquire - attempt to get exclusive writeback access to a device
- * @bdi: the device's backing_dev_info structure
- *
- * It is a waste of resources to have more than one pdflush thread blocked on
- * a single request queue.  Exclusion at the request_queue level is obtained
- * via a flag in the request_queue's backing_dev_info.state.
- *
- * Non-request_queue-backed address_spaces will share default_backing_dev_info,
- * unless they implement their own.  Which is somewhat inefficient, as this
- * may prevent concurrent writeback against multiple devices.
+static void generic_sync_wb_inodes(struct bdi_writeback *wb,
+				   struct super_block *sb,
+				   struct writeback_control *wbc);
+
+/*
+ * Work items for the bdi_writeback threads
  */
-static int writeback_acquire(struct bdi_writeback *wb)
+struct bdi_work {
+	struct list_head list;
+	struct rcu_head rcu_head;
+
+	unsigned long seen;
+	atomic_t pending;
+
+	unsigned long sb_data;
+	unsigned long nr_pages;
+
+	unsigned long state;
+};
+
+static struct super_block *bdi_work_sb(struct bdi_work *work)
 {
-	struct backing_dev_info *bdi = wb->bdi;
+	return (struct super_block *) (work->sb_data & ~1UL);
+}
+
+static inline bool bdi_work_on_stack(struct bdi_work *work)
+{
+	return work->sb_data & 1UL;
+}
+
+static inline void bdi_work_init(struct bdi_work *work, struct super_block *sb,
+				 unsigned long nr_pages)
+{
+	INIT_RCU_HEAD(&work->rcu_head);
+	work->sb_data = (unsigned long) sb;
+	work->nr_pages = nr_pages;
+	work->state = 0;
+}
 
-	return !test_and_set_bit(wb->nr, &bdi->wb_active);
+static inline void bdi_work_init_on_stack(struct bdi_work *work,
+					  struct super_block *sb,
+					  unsigned long nr_pages)
+{
+	bdi_work_init(work, sb, nr_pages);
+	set_bit(0, &work->state);
+	work->sb_data |= 1UL;
 }
 
 /**
  * writeback_in_progress - determine whether there is writeback in progress
  * @bdi: the device's backing_dev_info structure.
  *
- * Determine whether there is writeback in progress against a backing device.
+ * Determine whether there is writeback waiting to be handled against a
+ * backing device.
  */
 int writeback_in_progress(struct backing_dev_info *bdi)
 {
-	return bdi->wb_active != 0;
+	return !list_empty(&bdi->work_list);
 }
 
-/**
- * writeback_release - relinquish exclusive writeback access against a device.
- * @bdi: the device's backing_dev_info structure
- */
-static void writeback_release(struct bdi_writeback *wb)
+static void bdi_work_free(struct rcu_head *head)
 {
-	struct backing_dev_info *bdi = wb->bdi;
+	struct bdi_work *work = container_of(head, struct bdi_work, rcu_head);
 
-	wb->nr_pages = 0;
-	wb->sb = NULL;
-	clear_bit(wb->nr, &bdi->wb_active);
+	if (!bdi_work_on_stack(work))
+		kfree(work);
+	else {
+		clear_bit(0, &work->state);
+		wake_up_bit(&work->state, 0);
+	}
 }
 
-static void wb_start_writeback(struct bdi_writeback *wb, struct super_block *sb,
-			       long nr_pages)
+static void wb_clear_pending(struct bdi_writeback *wb, struct bdi_work *work)
 {
-	if (!wb_has_dirty_io(wb))
-		return;
+	/*
+	 * The caller has retrieved the work arguments from this work,
+	 * drop our reference. If this is the last ref, delete and free it
+	 */
+	if (atomic_dec_and_test(&work->pending)) {
+		struct backing_dev_info *bdi = wb->bdi;
 
-	if (writeback_acquire(wb)) {
-		wb->nr_pages = nr_pages;
-		wb->sb = sb;
+		spin_lock(&bdi->wb_lock);
+		list_del_rcu(&work->list);
+		spin_unlock(&bdi->wb_lock);
+
+		call_rcu(&work->rcu_head, bdi_work_free);
+	}
+}
+
+static void wb_start_writeback(struct bdi_writeback *wb, struct bdi_work *work)
+{
+	/*
+	 * If we failed allocating the bdi work item, wake up the wb thread
+	 * always. As a safety precaution, it'll flush out everything
+	 */
+	if (!wb_has_dirty_io(wb) && work)
+		wb_clear_pending(wb, work);
+	else
+		wake_up(&wb->wait);
+}
+
+static int bdi_queue_writeback(struct backing_dev_info *bdi,
+			       struct bdi_work *work)
+{
+	if (work) {
+		work->seen = bdi->wb_mask;
+		atomic_set(&work->pending, bdi->wb_cnt);
 
 		/*
-		 * make above store seen before the task is woken
+		 * Make sure stores are seen before it appears on the list
 		 */
 		smp_mb();
-		wake_up(&wb->wait);
+
+		spin_lock(&bdi->wb_lock);
+		list_add_tail_rcu(&work->list, &bdi->work_list);
+		spin_unlock(&bdi->wb_lock);
 	}
-}
 
-int bdi_start_writeback(struct backing_dev_info *bdi, struct super_block *sb,
-			 long nr_pages)
-{
 	/*
 	 * This only happens the first time someone kicks this bdi, so put
 	 * it out-of-line.
 	 */
-	if (unlikely(!bdi->wb.task)) {
+	if (unlikely(list_empty_careful(&bdi->wb_list))) {
 		bdi_add_default_flusher_task(bdi);
 		return 1;
 	}
 
-	wb_start_writeback(&bdi->wb, sb, nr_pages);
+	if (!bdi_wblist_needs_lock(bdi))
+		wb_start_writeback(&bdi->wb, work);
+	else {
+		struct bdi_writeback *wb;
+		int idx;
+
+		idx = srcu_read_lock(&bdi->srcu);
+
+		list_for_each_entry_rcu(wb, &bdi->wb_list, list)
+			wb_start_writeback(wb, work);
+
+		srcu_read_unlock(&bdi->srcu, idx);
+	}
+
 	return 0;
 }
 
 /*
+ * Used for on-stack allocated work items. The caller needs to wait until
+ * the wb threads have acked the work before it's safe to continue.
+ */
+static void bdi_wait_on_work_start(struct bdi_work *work)
+{
+	wait_on_bit(&work->state, 0, bdi_sched_wait, TASK_UNINTERRUPTIBLE);
+}
+
+int bdi_start_writeback(struct backing_dev_info *bdi, struct super_block *sb,
+			 long nr_pages)
+{
+	struct bdi_work work;
+	int ret;
+
+	bdi_work_init_on_stack(&work, sb, nr_pages);
+
+	ret = bdi_queue_writeback(bdi, &work);
+
+	bdi_wait_on_work_start(&work);
+
+	return ret;
+}
+
+/*
  * The maximum number of pages to writeout in a single bdi flush/kupdate
  * operation.  We do this so we don't hold I_SYNC against an inode for
  * enormous amounts of time, which would block a userspace task which has
@@ -160,18 +251,15 @@ static void wb_kupdated(struct bdi_writeback *wb)
 		wbc.more_io = 0;
 		wbc.encountered_congestion = 0;
 		wbc.nr_to_write = MAX_WRITEBACK_PAGES;
-		generic_sync_bdi_inodes(NULL, &wbc);
+		generic_sync_wb_inodes(wb, NULL, &wbc);
 		if (wbc.nr_to_write > 0)
 			break;	/* All the old data is written */
 		nr_to_write -= MAX_WRITEBACK_PAGES;
 	}
 }
 
-static void generic_sync_wb_inodes(struct bdi_writeback *wb,
-				   struct super_block *sb,
-				   struct writeback_control *wbc);
-
-static void wb_writeback(struct bdi_writeback *wb)
+static void __wb_writeback(struct bdi_writeback *wb, long nr_pages,
+			   struct super_block *sb)
 {
 	struct writeback_control wbc = {
 		.bdi			= wb->bdi,
@@ -179,10 +267,10 @@ static void wb_writeback(struct bdi_writeback *wb)
 		.older_than_this	= NULL,
 		.range_cyclic		= 1,
 	};
-	long nr_pages = wb->nr_pages;
 
 	for (;;) {
 		unsigned long background_thresh, dirty_thresh;
+
 		get_dirty_limits(&background_thresh, &dirty_thresh, NULL, NULL);
 		if ((global_page_state(NR_FILE_DIRTY) +
 		    global_page_state(NR_UNSTABLE_NFS) < background_thresh) &&
@@ -193,7 +281,7 @@ static void wb_writeback(struct bdi_writeback *wb)
 		wbc.encountered_congestion = 0;
 		wbc.nr_to_write = MAX_WRITEBACK_PAGES;
 		wbc.pages_skipped = 0;
-		generic_sync_wb_inodes(wb, wb->sb, &wbc);
+		generic_sync_wb_inodes(wb, sb, &wbc);
 		nr_pages -= MAX_WRITEBACK_PAGES - wbc.nr_to_write;
 		/*
 		 * If we ran out of stuff to write, bail unless more_io got set
@@ -207,69 +295,135 @@ static void wb_writeback(struct bdi_writeback *wb)
 }
 
 /*
+ * Return the next bdi_work struct that hasn't been processed by this
+ * wb thread yet
+ */
+static struct bdi_work *get_next_work_item(struct backing_dev_info *bdi,
+					   struct bdi_writeback *wb)
+{
+	struct bdi_work *work, *ret = NULL;
+
+	rcu_read_lock();
+
+	list_for_each_entry_rcu(work, &bdi->work_list, list) {
+		if (!test_and_clear_bit(wb->nr, &work->seen))
+			continue;
+
+		ret = work;
+		break;
+	}
+
+	rcu_read_unlock();
+	return ret;
+}
+
+static void wb_writeback(struct bdi_writeback *wb)
+{
+	struct backing_dev_info *bdi = wb->bdi;
+	struct bdi_work *work;
+
+	while ((work = get_next_work_item(bdi, wb)) != NULL) {
+		struct super_block *sb = bdi_work_sb(work);
+		long nr_pages = work->nr_pages;
+
+		wb_clear_pending(wb, work);
+		__wb_writeback(wb, nr_pages, sb);
+	}
+}
+
+/*
+ * This will be inlined in bdi_writeback_task() once we get rid of any
+ * dirty inodes on the default_backing_dev_info
+ */
+static void wb_do_writeback(struct bdi_writeback *wb)
+{
+	/*
+	 * We get here in two cases:
+	 *
+	 *  schedule_timeout() returned because the dirty writeback
+	 *  interval has elapsed. If that happens, the work item list
+	 *  will be empty and we will proceed to do kupdated style writeout.
+	 *
+	 *  Someone called bdi_start_writeback(), which put one/more work
+	 *  items on the work_list. Process those.
+	 */
+	if (list_empty(&wb->bdi->work_list))
+		wb_kupdated(wb);
+	else
+		wb_writeback(wb);
+}
+
+/*
  * Handle writeback of dirty data for the device backed by this bdi. Also
  * wakes up periodically and does kupdated style flushing.
  */
 int bdi_writeback_task(struct bdi_writeback *wb)
 {
+	DEFINE_WAIT(wait);
+
 	while (!kthread_should_stop()) {
 		unsigned long wait_jiffies;
-		DEFINE_WAIT(wait);
+
+		wb_do_writeback(wb);
 
 		prepare_to_wait(&wb->wait, &wait, TASK_INTERRUPTIBLE);
 		wait_jiffies = msecs_to_jiffies(dirty_writeback_interval * 10);
 		schedule_timeout(wait_jiffies);
 		try_to_freeze();
-
-		/*
-		 * We get here in two cases:
-		 *
-		 *  schedule_timeout() returned because the dirty writeback
-		 *  interval has elapsed. If that happens, we will be able
-		 *  to acquire the writeback lock and will proceed to do
-		 *  kupdated style writeout.
-		 *
-		 *  Someone called bdi_start_writeback(), which will acquire
-		 *  the writeback lock. This means our writeback_acquire()
-		 *  below will fail and we call into bdi_pdflush() for
-		 *  pdflush style writeout.
-		 *
-		 */
-		if (writeback_acquire(wb))
-			wb_kupdated(wb);
-		else
-			wb_writeback(wb);
-
-		writeback_release(wb);
-		finish_wait(&wb->wait, &wait);
 	}
 
+	finish_wait(&wb->wait, &wait);
 	return 0;
 }
 
 void bdi_writeback_all(struct super_block *sb, long nr_pages)
 {
-	struct backing_dev_info *bdi;
+	struct list_head *entry = &bdi_list;
 
 	rcu_read_lock();
 
-restart:
-	list_for_each_entry_rcu(bdi, &bdi_list, bdi_list) {
+	list_for_each_continue_rcu(entry, &bdi_list) {
+		struct backing_dev_info *bdi;
+		struct list_head *next;
+		struct bdi_work *work;
+
+		bdi = list_entry(entry, struct backing_dev_info, bdi_list);
 		if (!bdi_has_dirty_io(bdi))
 			continue;
-		if (bdi_start_writeback(bdi, sb, nr_pages))
-			goto restart;
+
+		/*
+		 * If this allocation fails, we just wakeup the thread and
+		 * let it do kupdate writeback
+		 */
+		work = kmalloc(sizeof(*work), GFP_ATOMIC);
+		if (work)
+			bdi_work_init(work, sb, nr_pages);
+
+		/*
+		 * Prepare to start from previous entry if this one gets moved
+		 * to the bdi_pending list.
+		 */
+		next = entry->prev;
+		if (bdi_queue_writeback(bdi, work))
+			entry = next;
 	}
 
 	rcu_read_unlock();
 }
 
 /*
- * We have only a single wb per bdi, so just return that.
+ * If the filesystem didn't provide a way to map an inode to a dedicated
+ * flusher thread, it doesn't support more than 1 thread. So we know it's
+ * the default thread, return that.
  */
 static inline struct bdi_writeback *inode_get_wb(struct inode *inode)
 {
-	return &inode_to_bdi(inode)->wb;
+	const struct super_operations *sop = inode->i_sb->s_op;
+
+	if (!sop->inode_get_wb)
+		return &inode_to_bdi(inode)->wb;
+
+	return sop->inode_get_wb(inode);
 }
 
 /**
@@ -723,8 +877,24 @@ void generic_sync_bdi_inodes(struct super_block *sb,
 			     struct writeback_control *wbc)
 {
 	struct backing_dev_info *bdi = wbc->bdi;
+	struct bdi_writeback *wb;
+
+	/*
+	 * Common case is just a single wb thread and that is embedded in
+	 * the bdi, so it doesn't need locking
+	 */
+	if (!bdi_wblist_needs_lock(bdi))
+		generic_sync_wb_inodes(&bdi->wb, sb, wbc);
+	else {
+		int idx;
 
-	generic_sync_wb_inodes(&bdi->wb, sb, wbc);
+		idx = srcu_read_lock(&bdi->srcu);
+
+		list_for_each_entry_rcu(wb, &bdi->wb_list, list)
+			generic_sync_wb_inodes(wb, sb, wbc);
+
+		srcu_read_unlock(&bdi->srcu, idx);
+	}
 }
 
 /*
diff --git a/include/linux/backing-dev.h b/include/linux/backing-dev.h
index a0c70f1..6ccfa35 100644
--- a/include/linux/backing-dev.h
+++ b/include/linux/backing-dev.h
@@ -13,6 +13,8 @@
 #include <linux/proportions.h>
 #include <linux/kernel.h>
 #include <linux/fs.h>
+#include <linux/sched.h>
+#include <linux/srcu.h>
 #include <asm/atomic.h>
 
 struct page;
@@ -25,6 +27,7 @@ struct dentry;
 enum bdi_state {
 	BDI_pending,		/* On its way to being activated */
 	BDI_wb_alloc,		/* Default embedded wb allocated */
+	BDI_wblist_lock,	/* bdi->wb_list now needs locking */
 	BDI_async_congested,	/* The async (write) queue is getting full */
 	BDI_sync_congested,	/* The sync queue is getting full */
 	BDI_unused,		/* Available bits start here */
@@ -41,6 +44,8 @@ enum bdi_stat_item {
 #define BDI_STAT_BATCH (8*(1+ilog2(nr_cpu_ids)))
 
 struct bdi_writeback {
+	struct list_head list;			/* hangs off the bdi */
+
 	struct backing_dev_info *bdi;		/* our parent bdi */
 	unsigned int nr;
 
@@ -49,13 +54,13 @@ struct bdi_writeback {
 	struct list_head	b_dirty;	/* dirty inodes */
 	struct list_head	b_io;		/* parked for writeback */
 	struct list_head	b_more_io;	/* parked for more writeback */
-
-	unsigned long		nr_pages;
-	struct super_block	*sb;
 };
 
+#define BDI_MAX_FLUSHERS	32
+
 struct backing_dev_info {
 	struct rcu_head rcu_head;
+	struct srcu_struct srcu; /* for wb_list read side protection */
 	struct list_head bdi_list;
 	unsigned long ra_pages;	/* max readahead in PAGE_CACHE_SIZE units */
 	unsigned long state;	/* Always use atomic bitops on this */
@@ -74,8 +79,12 @@ struct backing_dev_info {
 	unsigned int max_ratio, max_prop_frac;
 
 	struct bdi_writeback wb;  /* default writeback info for this bdi */
-	unsigned long wb_active;  /* bitmap of active tasks */
-	unsigned long wb_mask;	  /* number of registered tasks */
+	spinlock_t wb_lock;	  /* protects update side of wb_list */
+	struct list_head wb_list; /* the flusher threads hanging off this bdi */
+	unsigned long wb_mask;	  /* bitmask of registered tasks */
+	unsigned int wb_cnt;	  /* number of registered tasks */
+
+	struct list_head work_list;
 
 	struct device *dev;
 
@@ -97,11 +106,17 @@ int bdi_start_writeback(struct backing_dev_info *bdi, struct super_block *sb,
 int bdi_writeback_task(struct bdi_writeback *wb);
 void bdi_writeback_all(struct super_block *sb, long nr_pages);
 void bdi_add_default_flusher_task(struct backing_dev_info *bdi);
+void bdi_add_flusher_task(struct backing_dev_info *bdi);
 int bdi_has_dirty_io(struct backing_dev_info *bdi);
 
 extern spinlock_t bdi_lock;
 extern struct list_head bdi_list;
 
+static inline int bdi_wblist_needs_lock(struct backing_dev_info *bdi)
+{
+	return test_bit(BDI_wblist_lock, &bdi->state);
+}
+
 static inline int wb_has_dirty_io(struct bdi_writeback *wb)
 {
 	return !list_empty(&wb->b_dirty) ||
@@ -314,4 +329,10 @@ static inline bool mapping_cap_swap_backed(struct address_space *mapping)
 	return bdi_cap_swap_backed(mapping->backing_dev_info);
 }
 
+static inline int bdi_sched_wait(void *word)
+{
+	schedule();
+	return 0;
+}
+
 #endif		/* _LINUX_BACKING_DEV_H */
diff --git a/include/linux/fs.h b/include/linux/fs.h
index ecdc544..d3bda5d 100644
--- a/include/linux/fs.h
+++ b/include/linux/fs.h
@@ -1550,11 +1550,14 @@ extern ssize_t vfs_readv(struct file *, const struct iovec __user *,
 extern ssize_t vfs_writev(struct file *, const struct iovec __user *,
 		unsigned long, loff_t *);
 
+struct bdi_writeback;
+
 struct super_operations {
    	struct inode *(*alloc_inode)(struct super_block *sb);
 	void (*destroy_inode)(struct inode *);
 
    	void (*dirty_inode) (struct inode *);
+	struct bdi_writeback *(*inode_get_wb) (struct inode *);
 	int (*write_inode) (struct inode *, int);
 	void (*drop_inode) (struct inode *);
 	void (*delete_inode) (struct inode *);
diff --git a/mm/backing-dev.c b/mm/backing-dev.c
index 677a8c6..b4bcb14 100644
--- a/mm/backing-dev.c
+++ b/mm/backing-dev.c
@@ -199,7 +199,42 @@ static int __init default_bdi_init(void)
 }
 subsys_initcall(default_bdi_init);
 
-static void bdi_wb_init(struct bdi_writeback *wb, struct backing_dev_info *bdi)
+static int wb_assign_nr(struct backing_dev_info *bdi, struct bdi_writeback *wb)
+{
+	unsigned long mask = BDI_MAX_FLUSHERS - 1;
+	unsigned int nr;
+
+	do {
+		if ((bdi->wb_mask & mask) == mask)
+			return 1;
+
+		nr = find_first_zero_bit(&bdi->wb_mask, BDI_MAX_FLUSHERS);
+	} while (test_and_set_bit(nr, &bdi->wb_mask));
+
+	wb->nr = nr;
+
+	spin_lock(&bdi->wb_lock);
+	bdi->wb_cnt++;
+	spin_unlock(&bdi->wb_lock);
+
+	return 0;
+}
+
+static void bdi_put_wb(struct backing_dev_info *bdi, struct bdi_writeback *wb)
+{
+	clear_bit(wb->nr, &bdi->wb_mask);
+
+	if (wb == &bdi->wb)
+		clear_bit(BDI_wb_alloc, &bdi->state);
+	else
+		kfree(wb);
+
+	spin_lock(&bdi->wb_lock);
+	bdi->wb_cnt--;
+	spin_unlock(&bdi->wb_lock);
+}
+
+static int bdi_wb_init(struct bdi_writeback *wb, struct backing_dev_info *bdi)
 {
 	memset(wb, 0, sizeof(*wb));
 
@@ -208,6 +243,30 @@ static void bdi_wb_init(struct bdi_writeback *wb, struct backing_dev_info *bdi)
 	INIT_LIST_HEAD(&wb->b_dirty);
 	INIT_LIST_HEAD(&wb->b_io);
 	INIT_LIST_HEAD(&wb->b_more_io);
+
+	return wb_assign_nr(bdi, wb);
+}
+
+static struct bdi_writeback *bdi_new_wb(struct backing_dev_info *bdi)
+{
+	struct bdi_writeback *wb;
+
+	/*
+	 * Default bdi->wb is already assigned, so just return it
+	 */
+	if (!test_and_set_bit(BDI_wb_alloc, &bdi->state))
+		wb = &bdi->wb;
+	else {
+		wb = kmalloc(sizeof(struct bdi_writeback), GFP_KERNEL);
+		if (wb) {
+			if (bdi_wb_init(wb, bdi)) {
+				kfree(wb);
+				wb = NULL;
+			}
+		}
+	}
+
+	return wb;
 }
 
 static void bdi_flush_io(struct backing_dev_info *bdi)
@@ -223,35 +282,26 @@ static void bdi_flush_io(struct backing_dev_info *bdi)
 	generic_sync_bdi_inodes(NULL, &wbc);
 }
 
-static int wb_assign_nr(struct backing_dev_info *bdi, struct bdi_writeback *wb)
+static void bdi_task_init(struct backing_dev_info *bdi,
+			  struct bdi_writeback *wb)
 {
-	set_bit(0, &bdi->wb_mask);
-	wb->nr = 0;
-	return 0;
-}
+	struct task_struct *tsk = current;
+	int was_empty;
 
-static void bdi_put_wb(struct backing_dev_info *bdi, struct bdi_writeback *wb)
-{
-	clear_bit(wb->nr, &bdi->wb_mask);
-	clear_bit(BDI_wb_alloc, &bdi->state);
-}
+	/*
+	 * Add us to the active bdi_list. If we are adding threads beyond
+	 * the default embedded bdi_writeback, then we need to start using
+	 * proper locking. Check the list for empty first, then set the
+	 * BDI_wblist_lock flag if there's > 1 entry on the list now
+	 */
+	spin_lock(&bdi->wb_lock);
 
-static struct bdi_writeback *bdi_new_wb(struct backing_dev_info *bdi)
-{
-	struct bdi_writeback *wb;
+	was_empty = list_empty(&bdi->wb_list);
+	list_add_tail_rcu(&wb->list, &bdi->wb_list);
+	if (!was_empty)
+		set_bit(BDI_wblist_lock, &bdi->state);
 
-	set_bit(BDI_wb_alloc, &bdi->state);
-	wb = &bdi->wb;
-	wb_assign_nr(bdi, wb);
-	return wb;
-}
-
-static int bdi_start_fn(void *ptr)
-{
-	struct bdi_writeback *wb = ptr;
-	struct backing_dev_info *bdi = wb->bdi;
-	struct task_struct *tsk = current;
-	int ret;
+	spin_unlock(&bdi->wb_lock);
 
 	tsk->flags |= PF_FLUSHER | PF_SWAPWRITE;
 	set_freezable();
@@ -260,6 +310,15 @@ static int bdi_start_fn(void *ptr)
 	 * Our parent may run at a different priority, just set us to normal
 	 */
 	set_user_nice(tsk, 0);
+}
+
+static int bdi_start_fn(void *ptr)
+{
+	struct bdi_writeback *wb = ptr;
+	struct backing_dev_info *bdi = wb->bdi;
+	int ret;
+
+	bdi_task_init(bdi, wb);
 
 	/*
 	 * Clear pending bit and wakeup anybody waiting to tear us down
@@ -267,25 +326,65 @@ static int bdi_start_fn(void *ptr)
 	clear_bit(BDI_pending, &bdi->state);
 	wake_up_bit(&bdi->state, BDI_pending);
 
+	/*
+	 * Make us discoverable on the bdi_list again
+	 */
+	spin_lock(&bdi_lock);
+	list_add_tail_rcu(&bdi->bdi_list, &bdi_list);
+	spin_unlock(&bdi_lock);
+
 	ret = bdi_writeback_task(wb);
 
+	/*
+	 * Remove us from the list
+	 */
+	spin_lock(&bdi->wb_lock);
+	list_del_rcu(&wb->list);
+	spin_unlock(&bdi->wb_lock);
+
+	/*
+	 * wait for rcu grace period to end, so we can free wb
+	 */
+	synchronize_srcu(&bdi->srcu);
+
 	bdi_put_wb(bdi, wb);
 	return ret;
 }
 
 int bdi_has_dirty_io(struct backing_dev_info *bdi)
 {
-	return wb_has_dirty_io(&bdi->wb);
+	struct bdi_writeback *wb;
+	int ret = 0;
+
+	if (!bdi_wblist_needs_lock(bdi))
+		ret = wb_has_dirty_io(&bdi->wb);
+	else {
+		int idx;
+
+		idx = srcu_read_lock(&bdi->srcu);
+
+		list_for_each_entry_rcu(wb, &bdi->wb_list, list) {
+			ret = wb_has_dirty_io(wb);
+			if (ret)
+				break;
+		}
+
+		srcu_read_unlock(&bdi->srcu, idx);
+	}
+
+	return ret;
 }
 
 static int bdi_forker_task(void *ptr)
 {
 	struct bdi_writeback *me = ptr;
+	DEFINE_WAIT(wait);
+
+	bdi_task_init(me->bdi, me);
 
 	for (;;) {
 		struct backing_dev_info *bdi;
 		struct bdi_writeback *wb;
-		DEFINE_WAIT(wait);
 
 		/*
 		 * Should never trigger on the default bdi
@@ -301,7 +400,6 @@ static int bdi_forker_task(void *ptr)
 		if (list_empty(&bdi_pending_list))
 			schedule();
 
-		finish_wait(&me->wait, &wait);
 repeat:
 		bdi = NULL;
 		spin_lock_bh(&bdi_lock);
@@ -344,6 +442,7 @@ readd_flush:
 		}
 	}
 
+	finish_wait(&me->wait, &wait);
 	return 0;
 }
 
@@ -367,34 +466,68 @@ static void bdi_add_to_pending(struct rcu_head *head)
 	wake_up(&default_backing_dev_info.wb.wait);
 }
 
-/*
- * Add a new flusher task that gets created for any bdi
- * that has dirty data pending writeout
- */
-void bdi_add_default_flusher_task(struct backing_dev_info *bdi)
+static void bdi_add_one_flusher_task(struct backing_dev_info *bdi,
+				     int(*func)(struct backing_dev_info *))
 {
 	if (!bdi_cap_writeback_dirty(bdi))
 		return;
 
 	/*
-	 * Someone already marked this pending for task creation
+	 * Check with the helper whether to proceed adding a task. Will only
+	 * abort if we two or more simultanous calls to
+	 * bdi_add_default_flusher_task() occured, further additions will block
+	 * waiting for previous additions to finish.
 	 */
-	if (test_and_set_bit(BDI_pending, &bdi->state))
-		return;
+	if (!func(bdi)) {
+		spin_lock_bh(&bdi_lock);
+		list_del_rcu(&bdi->bdi_list);
+		spin_unlock_bh(&bdi_lock);
 
-	spin_lock_bh(&bdi_lock);
-	list_del_rcu(&bdi->bdi_list);
-	spin_unlock_bh(&bdi_lock);
+		/*
+		 * We need to wait for the current grace period to end,
+		 * in case others were browsing the bdi_list as well.
+		 * So defer the adding and wakeup to after the RCU
+		 * grace period has ended.
+		 */
+		call_rcu(&bdi->rcu_head, bdi_add_to_pending);
+	}
+}
 
-	/*
-	 * We need to wait for the current grace period to end,
-	 * in case others were browsing the bdi_list as well.
-	 * So defer the adding and wakeup to after the RCU
-	 * grace period has ended.
-	 */
-	call_rcu(&bdi->rcu_head, bdi_add_to_pending);
+static int flusher_add_helper_block(struct backing_dev_info *bdi)
+{
+	wait_on_bit_lock(&bdi->state, BDI_pending, bdi_sched_wait,
+				TASK_UNINTERRUPTIBLE);
+	return 0;
+}
+
+static int flusher_add_helper_test(struct backing_dev_info *bdi)
+{
+	return test_and_set_bit(BDI_pending, &bdi->state);
+}
+
+/*
+ * Add the default flusher task that gets created for any bdi
+ * that has dirty data pending writeout
+ */
+void bdi_add_default_flusher_task(struct backing_dev_info *bdi)
+{
+	bdi_add_one_flusher_task(bdi, flusher_add_helper_test);
 }
 
+/**
+ * bdi_add_flusher_task - add one more flusher task to this @bdi
+ *  @bdi:	the bdi
+ *
+ * Add an additional flusher task to this @bdi. Will block waiting on
+ * previous additions, if any.
+ *
+ */
+void bdi_add_flusher_task(struct backing_dev_info *bdi)
+{
+	bdi_add_one_flusher_task(bdi, flusher_add_helper_block);
+}
+EXPORT_SYMBOL(bdi_add_flusher_task);
+
 int bdi_register(struct backing_dev_info *bdi, struct device *parent,
 		const char *fmt, ...)
 {
@@ -454,17 +587,13 @@ int bdi_register_dev(struct backing_dev_info *bdi, dev_t dev)
 }
 EXPORT_SYMBOL(bdi_register_dev);
 
-static int sched_wait(void *word)
-{
-	schedule();
-	return 0;
-}
-
 /*
  * Remove bdi from global list and shutdown any threads we have running
  */
 static void bdi_wb_shutdown(struct backing_dev_info *bdi)
 {
+	struct bdi_writeback *wb;
+
 	if (!bdi_cap_writeback_dirty(bdi))
 		return;
 
@@ -472,7 +601,8 @@ static void bdi_wb_shutdown(struct backing_dev_info *bdi)
 	 * If setup is pending, wait for that to complete first
 	 * Make sure nobody finds us on the bdi_list anymore
 	 */
-	wait_on_bit(&bdi->state, BDI_pending, sched_wait, TASK_UNINTERRUPTIBLE);
+	wait_on_bit(&bdi->state, BDI_pending, bdi_sched_wait,
+			TASK_UNINTERRUPTIBLE);
 
 	/*
 	 * Make sure nobody finds us on the bdi_list anymore
@@ -488,9 +618,11 @@ static void bdi_wb_shutdown(struct backing_dev_info *bdi)
 	synchronize_rcu();
 
 	/*
-	 * Finally, kill the kernel thread
+	 * Finally, kill the kernel threads. We don't need to be RCU
+	 * safe anymore, since the bdi is gone from visibility.
 	 */
-	kthread_stop(bdi->wb.task);
+	list_for_each_entry(wb, &bdi->wb_list, list)
+		kthread_stop(wb->task);
 }
 
 void bdi_unregister(struct backing_dev_info *bdi)
@@ -515,8 +647,12 @@ int bdi_init(struct backing_dev_info *bdi)
 	bdi->min_ratio = 0;
 	bdi->max_ratio = 100;
 	bdi->max_prop_frac = PROP_FRAC_BASE;
+	spin_lock_init(&bdi->wb_lock);
+	bdi->wb_mask = 0;
+	bdi->wb_cnt = 0;
 	INIT_LIST_HEAD(&bdi->bdi_list);
-	bdi->wb_mask = bdi->wb_active = 0;
+	INIT_LIST_HEAD(&bdi->wb_list);
+	INIT_LIST_HEAD(&bdi->work_list);
 
 	bdi_wb_init(&bdi->wb, bdi);
 
@@ -526,10 +662,15 @@ int bdi_init(struct backing_dev_info *bdi)
 			goto err;
 	}
 
+	err = init_srcu_struct(&bdi->srcu);
+	if (err)
+		goto err;
+
 	bdi->dirty_exceeded = 0;
 	err = prop_local_init_percpu(&bdi->completions);
 
 	if (err) {
+		cleanup_srcu_struct(&bdi->srcu);
 err:
 		while (i--)
 			percpu_counter_destroy(&bdi->bdi_stat[i]);
@@ -547,6 +688,8 @@ void bdi_destroy(struct backing_dev_info *bdi)
 
 	bdi_unregister(bdi);
 
+	cleanup_srcu_struct(&bdi->srcu);
+
 	for (i = 0; i < NR_BDI_STAT_ITEMS; i++)
 		percpu_counter_destroy(&bdi->bdi_stat[i]);
 
diff --git a/mm/page-writeback.c b/mm/page-writeback.c
index 76269f8..de3178a 100644
--- a/mm/page-writeback.c
+++ b/mm/page-writeback.c
@@ -667,8 +667,7 @@ void throttle_vm_writeout(gfp_t gfp_mask)
 
 /*
  * Start writeback of `nr_pages' pages.  If `nr_pages' is zero, write back
- * the whole world.  Returns 0 if a pdflush thread was dispatched.  Returns
- * -1 if all pdflush threads were busy.
+ * the whole world.
  */
 void wakeup_flusher_threads(long nr_pages)
 {
@@ -676,7 +675,6 @@ void wakeup_flusher_threads(long nr_pages)
 		nr_pages = global_page_state(NR_FILE_DIRTY) +
 				global_page_state(NR_UNSTABLE_NFS);
 	bdi_writeback_all(NULL, nr_pages);
-	return;
 }
 
 static void laptop_timer_fn(unsigned long unused);
-- 
1.6.3.rc0.1.gf800


  parent reply	other threads:[~2009-05-18 12:25 UTC|newest]

Thread overview: 54+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2009-05-18 12:19 [PATCH 0/11] Per-bdi writeback flusher threads #4 Jens Axboe
2009-05-18 12:19 ` [PATCH 01/11] writeback: move dirty inodes from super_block to backing_dev_info Jens Axboe
2009-05-18 12:19 ` [PATCH 02/11] writeback: switch to per-bdi threads for flushing data Jens Axboe
2009-05-19 10:20   ` Richard Kennedy
2009-05-19 12:23     ` Jens Axboe
2009-05-19 13:45       ` Richard Kennedy
2009-05-19 17:56         ` Jens Axboe
2009-05-19 22:11           ` Peter Zijlstra
2009-05-20 11:18   ` Jan Kara
2009-05-20 11:32     ` Jens Axboe
2009-05-20 12:11       ` Jan Kara
2009-05-20 12:16         ` Jens Axboe
2009-05-20 12:24           ` Christoph Hellwig
2009-05-20 12:48             ` Jens Axboe
2009-05-20 12:37   ` Christoph Hellwig
2009-05-20 12:49     ` Jens Axboe
2009-05-20 14:02       ` Anton Altaparmakov
2009-05-18 12:19 ` [PATCH 03/11] writeback: get rid of pdflush completely Jens Axboe
2009-05-18 12:19 ` [PATCH 04/11] writeback: separate the flushing state/task from the bdi Jens Axboe
2009-05-20 11:34   ` Jan Kara
2009-05-20 11:39     ` Jens Axboe
2009-05-20 12:06       ` Jan Kara
2009-05-20 12:09         ` Jens Axboe
2009-05-18 12:19 ` Jens Axboe [this message]
2009-05-18 12:19 ` [PATCH 06/11] writeback: include default_backing_dev_info in writeback Jens Axboe
2009-05-18 12:19 ` [PATCH 07/11] writeback: allow sleepy exit of default writeback task Jens Axboe
2009-05-18 12:19 ` [PATCH 08/11] writeback: btrfs must register its backing_devices Jens Axboe
2009-05-18 12:19 ` [PATCH 09/11] writeback: add some debug inode list counters to bdi stats Jens Axboe
2009-05-18 12:19 ` [PATCH 10/11] writeback: add name to backing_dev_info Jens Axboe
2009-05-18 12:19 ` [PATCH 11/11] writeback: check for registered bdi in flusher add and inode dirty Jens Axboe
2009-05-19  6:11 ` [PATCH 0/11] Per-bdi writeback flusher threads #4 Zhang, Yanmin
2009-05-19  6:20   ` Jens Axboe
2009-05-19  6:43     ` Zhang, Yanmin
2009-05-20  7:51     ` Zhang, Yanmin
2009-05-20  8:09       ` Jens Axboe
2009-05-20  8:54         ` Jens Axboe
2009-05-20  9:19           ` Zhang, Yanmin
2009-05-20  9:25             ` Jens Axboe
2009-05-20 11:19               ` Jens Axboe
2009-05-21  6:33                 ` Zhang, Yanmin
2009-05-21  9:10                   ` Jan Kara
2009-05-22  1:28                     ` Zhang, Yanmin
2009-05-22  8:15                       ` Jens Axboe
2009-05-22 20:44                         ` Jens Axboe
2009-05-23 19:15                           ` Jens Axboe
2009-05-25  8:02                             ` Zhang, Yanmin
2009-05-25  8:06                               ` Jens Axboe
2009-05-25  8:43                               ` Zhang, Yanmin
2009-05-25  8:48                                 ` Jens Axboe
2009-05-25  8:54                         ` Zhang, Yanmin
2009-05-22  7:53                     ` Jens Axboe
2009-05-22  7:53                       ` Jens Axboe
2009-05-25 15:57 ` Richard Kennedy
2009-05-25 17:05   ` Jens Axboe

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1242649192-16263-6-git-send-email-jens.axboe@oracle.com \
    --to=jens.axboe@oracle.com \
    --cc=akpm@linux-foundation.org \
    --cc=chris.mason@oracle.com \
    --cc=david@fromorbit.com \
    --cc=hch@infradead.org \
    --cc=jack@suse.cz \
    --cc=linux-fsdevel@vger.kernel.org \
    --cc=linux-kernel@vger.kernel.org \
    --cc=yanmin_zhang@linux.intel.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).