All of lore.kernel.org
 help / color / mirror / Atom feed
From: James Simmons <jsimmons@infradead.org>
To: lustre-devel@lists.lustre.org
Subject: [lustre-devel] [PATCH 12/22] ext4: add htree lock implementation
Date: Sun, 21 Jul 2019 21:23:41 -0400	[thread overview]
Message-ID: <1563758631-29550-13-git-send-email-jsimmons@infradead.org> (raw)
In-Reply-To: <1563758631-29550-1-git-send-email-jsimmons@infradead.org>

Used for parallel lookup

Signed-off-by: James Simmons <jsimmons@infradead.org>
---
 fs/ext4/Makefile     |   4 +-
 fs/ext4/ext4.h       |  80 +++++
 fs/ext4/htree_lock.c | 891 +++++++++++++++++++++++++++++++++++++++++++++++++++
 fs/ext4/htree_lock.h | 187 +++++++++++
 fs/ext4/namei.c      | 461 +++++++++++++++++++++++---
 fs/ext4/super.c      |   1 +
 6 files changed, 1583 insertions(+), 41 deletions(-)
 create mode 100644 fs/ext4/htree_lock.c
 create mode 100644 fs/ext4/htree_lock.h

diff --git a/fs/ext4/Makefile b/fs/ext4/Makefile
index 8fdfcd3..0d0a28f 100644
--- a/fs/ext4/Makefile
+++ b/fs/ext4/Makefile
@@ -6,8 +6,8 @@
 obj-$(CONFIG_EXT4_FS) += ext4.o
 
 ext4-y	:= balloc.o bitmap.o block_validity.o dir.o ext4_jbd2.o extents.o \
-		extents_status.o file.o fsmap.o fsync.o hash.o ialloc.o \
-		indirect.o inline.o inode.o ioctl.o mballoc.o migrate.o \
+		extents_status.o file.o fsmap.o fsync.o hash.o htree_lock.o \
+		ialloc.o indirect.o inline.o inode.o ioctl.o mballoc.o migrate.o \
 		mmp.o move_extent.o namei.o page-io.o readpage.o resize.o \
 		super.o symlink.o sysfs.o xattr.o xattr_trusted.o xattr_user.o
 
diff --git a/fs/ext4/ext4.h b/fs/ext4/ext4.h
index 80601a9..bf74c7c 100644
--- a/fs/ext4/ext4.h
+++ b/fs/ext4/ext4.h
@@ -44,6 +44,8 @@
 
 #include <linux/compiler.h>
 
+#include "htree_lock.h"
+
 /*
  * The fourth extended filesystem constants/structures
  */
@@ -936,6 +938,9 @@ struct ext4_inode_info {
 	__u32	i_dtime;
 	ext4_fsblk_t	i_file_acl;
 
+	/* following fields for parallel directory operations -bzzz */
+	struct semaphore i_append_sem;
+
 	/*
 	 * i_block_group is the number of the block group which contains
 	 * this file's inode.  Constant across the lifetime of the inode,
@@ -2145,6 +2150,72 @@ struct dx_hash_info
  */
 #define HASH_NB_ALWAYS		1
 
+/* assume name-hash is protected by upper layer */
+#define EXT4_HTREE_LOCK_HASH	0
+
+enum ext4_pdo_lk_types {
+#if EXT4_HTREE_LOCK_HASH
+	EXT4_LK_HASH,
+#endif
+	EXT4_LK_DX,		/* index block */
+	EXT4_LK_DE,		/* directory entry block */
+	EXT4_LK_SPIN,		/* spinlock */
+	EXT4_LK_MAX,
+};
+
+/* read-only bit */
+#define EXT4_LB_RO(b)		(1 << (b))
+/* read + write, high bits for writer */
+#define EXT4_LB_RW(b)		((1 << (b)) | (1 << (EXT4_LK_MAX + (b))))
+
+enum ext4_pdo_lock_bits {
+	/* DX lock bits */
+	EXT4_LB_DX_RO		= EXT4_LB_RO(EXT4_LK_DX),
+	EXT4_LB_DX		= EXT4_LB_RW(EXT4_LK_DX),
+	/* DE lock bits */
+	EXT4_LB_DE_RO		= EXT4_LB_RO(EXT4_LK_DE),
+	EXT4_LB_DE		= EXT4_LB_RW(EXT4_LK_DE),
+	/* DX spinlock bits */
+	EXT4_LB_SPIN_RO		= EXT4_LB_RO(EXT4_LK_SPIN),
+	EXT4_LB_SPIN		= EXT4_LB_RW(EXT4_LK_SPIN),
+	/* accurate searching */
+	EXT4_LB_EXACT		= EXT4_LB_RO(EXT4_LK_MAX << 1),
+};
+
+enum ext4_pdo_lock_opc {
+	/* external */
+	EXT4_HLOCK_READDIR	= (EXT4_LB_DE_RO | EXT4_LB_DX_RO),
+	EXT4_HLOCK_LOOKUP	= (EXT4_LB_DE_RO | EXT4_LB_SPIN_RO |
+				   EXT4_LB_EXACT),
+	EXT4_HLOCK_DEL		= (EXT4_LB_DE | EXT4_LB_SPIN_RO |
+				   EXT4_LB_EXACT),
+	EXT4_HLOCK_ADD		= (EXT4_LB_DE | EXT4_LB_SPIN_RO),
+
+	/* internal */
+	EXT4_HLOCK_LOOKUP_SAFE	= (EXT4_LB_DE_RO | EXT4_LB_DX_RO |
+				   EXT4_LB_EXACT),
+	EXT4_HLOCK_DEL_SAFE	= (EXT4_LB_DE | EXT4_LB_DX_RO | EXT4_LB_EXACT),
+	EXT4_HLOCK_SPLIT	= (EXT4_LB_DE | EXT4_LB_DX | EXT4_LB_SPIN),
+};
+
+extern struct htree_lock_head *ext4_htree_lock_head_alloc(unsigned hbits);
+#define ext4_htree_lock_head_free(lhead)	htree_lock_head_free(lhead)
+
+extern struct htree_lock *ext4_htree_lock_alloc(void);
+#define ext4_htree_lock_free(lck)		htree_lock_free(lck)
+
+extern void ext4_htree_lock(struct htree_lock *lck,
+			    struct htree_lock_head *lhead,
+			    struct inode *dir, unsigned flags);
+#define ext4_htree_unlock(lck)			htree_unlock(lck)
+
+extern struct buffer_head *ext4_find_entry_locked(struct inode *dir,
+					const struct qstr *d_name,
+					struct ext4_dir_entry_2 **res_dir,
+					int *inlined, struct htree_lock *lck);
+extern int ext4_add_entry_locked(handle_t *handle, struct dentry *dentry,
+				 struct inode *inode, struct htree_lock *lck);
+
 struct ext4_filename {
 	const struct qstr *usr_fname;
 	struct fscrypt_str disk_name;
@@ -2479,8 +2550,17 @@ void ext4_insert_dentry(struct inode *inode,
 			struct ext4_filename *fname, void *data);
 static inline void ext4_update_dx_flag(struct inode *inode)
 {
+	/* Disable it for ldiskfs, because going from a DX directory to
+	 * a non-DX directory while it is in use will completely break
+	 * the htree-locking.
+	 * If we really want to support this operation in the future,
+	 * we need to exclusively lock the directory@here which will
+	 * increase complexity of code
+	 */
+#if 0
 	if (!ext4_has_feature_dir_index(inode->i_sb))
 		ext4_clear_inode_flag(inode, EXT4_INODE_INDEX);
+#endif
 }
 static const unsigned char ext4_filetype_table[] = {
 	DT_UNKNOWN, DT_REG, DT_DIR, DT_CHR, DT_BLK, DT_FIFO, DT_SOCK, DT_LNK
diff --git a/fs/ext4/htree_lock.c b/fs/ext4/htree_lock.c
new file mode 100644
index 0000000..9c1e8f0
--- /dev/null
+++ b/fs/ext4/htree_lock.c
@@ -0,0 +1,891 @@
+/*
+ * fs/ext4/htree_lock.c
+ *
+ * Copyright (c) 2011, 2012, Intel Corporation.
+ *
+ * Author: Liang Zhen <liang@whamcloud.com>
+ */
+#include <linux/jbd2.h>
+#include <linux/hash.h>
+#include <linux/module.h>
+#include "htree_lock.h"
+
+enum {
+	HTREE_LOCK_BIT_EX	= (1 << HTREE_LOCK_EX),
+	HTREE_LOCK_BIT_PW	= (1 << HTREE_LOCK_PW),
+	HTREE_LOCK_BIT_PR	= (1 << HTREE_LOCK_PR),
+	HTREE_LOCK_BIT_CW	= (1 << HTREE_LOCK_CW),
+	HTREE_LOCK_BIT_CR	= (1 << HTREE_LOCK_CR),
+};
+
+enum {
+	HTREE_LOCK_COMPAT_EX	= 0,
+	HTREE_LOCK_COMPAT_PW	= HTREE_LOCK_COMPAT_EX | HTREE_LOCK_BIT_CR,
+	HTREE_LOCK_COMPAT_PR	= HTREE_LOCK_COMPAT_PW | HTREE_LOCK_BIT_PR,
+	HTREE_LOCK_COMPAT_CW	= HTREE_LOCK_COMPAT_PW | HTREE_LOCK_BIT_CW,
+	HTREE_LOCK_COMPAT_CR	= HTREE_LOCK_COMPAT_CW | HTREE_LOCK_BIT_PR |
+				  HTREE_LOCK_BIT_PW,
+};
+
+static int htree_lock_compat[] = {
+	[HTREE_LOCK_EX]		HTREE_LOCK_COMPAT_EX,
+	[HTREE_LOCK_PW]		HTREE_LOCK_COMPAT_PW,
+	[HTREE_LOCK_PR]		HTREE_LOCK_COMPAT_PR,
+	[HTREE_LOCK_CW]		HTREE_LOCK_COMPAT_CW,
+	[HTREE_LOCK_CR]		HTREE_LOCK_COMPAT_CR,
+};
+
+/* max allowed htree-lock depth.
+ * We only need depth=3 for ext4 although user can have higher value. */
+#define HTREE_LOCK_DEP_MAX	16
+
+#ifdef HTREE_LOCK_DEBUG
+
+static char *hl_name[] = {
+	[HTREE_LOCK_EX]		"EX",
+	[HTREE_LOCK_PW]		"PW",
+	[HTREE_LOCK_PR]		"PR",
+	[HTREE_LOCK_CW]		"CW",
+	[HTREE_LOCK_CR]		"CR",
+};
+
+/* lock stats */
+struct htree_lock_node_stats {
+	unsigned long long	blocked[HTREE_LOCK_MAX];
+	unsigned long long	granted[HTREE_LOCK_MAX];
+	unsigned long long	retried[HTREE_LOCK_MAX];
+	unsigned long long	events;
+};
+
+struct htree_lock_stats {
+	struct htree_lock_node_stats	nodes[HTREE_LOCK_DEP_MAX];
+	unsigned long long	granted[HTREE_LOCK_MAX];
+	unsigned long long	blocked[HTREE_LOCK_MAX];
+};
+
+static struct htree_lock_stats hl_stats;
+
+void htree_lock_stat_reset(void)
+{
+	memset(&hl_stats, 0, sizeof(hl_stats));
+}
+
+void htree_lock_stat_print(int depth)
+{
+	int     i;
+	int	j;
+
+	printk(KERN_DEBUG "HTREE LOCK STATS:\n");
+	for (i = 0; i < HTREE_LOCK_MAX; i++) {
+		printk(KERN_DEBUG "[%s]: G [%10llu], B [%10llu]\n",
+		       hl_name[i], hl_stats.granted[i], hl_stats.blocked[i]);
+	}
+	for (i = 0; i < depth; i++) {
+		printk(KERN_DEBUG "HTREE CHILD [%d] STATS:\n", i);
+		for (j = 0; j < HTREE_LOCK_MAX; j++) {
+			printk(KERN_DEBUG
+				"[%s]: G [%10llu], B [%10llu], R [%10llu]\n",
+				hl_name[j], hl_stats.nodes[i].granted[j],
+				hl_stats.nodes[i].blocked[j],
+				hl_stats.nodes[i].retried[j]);
+		}
+	}
+}
+
+#define lk_grant_inc(m)       do { hl_stats.granted[m]++; } while (0)
+#define lk_block_inc(m)       do { hl_stats.blocked[m]++; } while (0)
+#define ln_grant_inc(d, m)    do { hl_stats.nodes[d].granted[m]++; } while (0)
+#define ln_block_inc(d, m)    do { hl_stats.nodes[d].blocked[m]++; } while (0)
+#define ln_retry_inc(d, m)    do { hl_stats.nodes[d].retried[m]++; } while (0)
+#define ln_event_inc(d)       do { hl_stats.nodes[d].events++; } while (0)
+
+#else /* !DEBUG */
+
+void htree_lock_stat_reset(void) {}
+void htree_lock_stat_print(int depth) {}
+
+#define lk_grant_inc(m)	      do {} while (0)
+#define lk_block_inc(m)	      do {} while (0)
+#define ln_grant_inc(d, m)    do {} while (0)
+#define ln_block_inc(d, m)    do {} while (0)
+#define ln_retry_inc(d, m)    do {} while (0)
+#define ln_event_inc(d)	      do {} while (0)
+
+#endif /* DEBUG */
+
+EXPORT_SYMBOL(htree_lock_stat_reset);
+EXPORT_SYMBOL(htree_lock_stat_print);
+
+#define HTREE_DEP_ROOT		  (-1)
+
+#define htree_spin_lock(lhead, dep)				\
+	bit_spin_lock((dep) + 1, &(lhead)->lh_lock)
+#define htree_spin_unlock(lhead, dep)				\
+	bit_spin_unlock((dep) + 1, &(lhead)->lh_lock)
+
+#define htree_key_event_ignore(child, ln)			\
+	(!((child)->lc_events & (1 << (ln)->ln_mode)))
+
+static int
+htree_key_list_empty(struct htree_lock_node *ln)
+{
+	return list_empty(&ln->ln_major_list) && list_empty(&ln->ln_minor_list);
+}
+
+static void
+htree_key_list_del_init(struct htree_lock_node *ln)
+{
+	struct htree_lock_node *tmp = NULL;
+
+	if (!list_empty(&ln->ln_minor_list)) {
+		tmp = list_entry(ln->ln_minor_list.next,
+				 struct htree_lock_node, ln_minor_list);
+		list_del_init(&ln->ln_minor_list);
+	}
+
+	if (list_empty(&ln->ln_major_list))
+		return;
+
+	if (tmp == NULL) { /* not on minor key list */
+		list_del_init(&ln->ln_major_list);
+	} else {
+		BUG_ON(!list_empty(&tmp->ln_major_list));
+		list_replace_init(&ln->ln_major_list, &tmp->ln_major_list);
+	}
+}
+
+static void
+htree_key_list_replace_init(struct htree_lock_node *old,
+			    struct htree_lock_node *new)
+{
+	if (!list_empty(&old->ln_major_list))
+		list_replace_init(&old->ln_major_list, &new->ln_major_list);
+
+	if (!list_empty(&old->ln_minor_list))
+		list_replace_init(&old->ln_minor_list, &new->ln_minor_list);
+}
+
+static void
+htree_key_event_enqueue(struct htree_lock_child *child,
+			struct htree_lock_node *ln, int dep, void *event)
+{
+	struct htree_lock_node *tmp;
+
+	/* NB: ALWAYS called holding lhead::lh_lock(dep) */
+	BUG_ON(ln->ln_mode == HTREE_LOCK_NL);
+	if (event == NULL || htree_key_event_ignore(child, ln))
+		return;
+
+	/* shouldn't be a very long list */
+	list_for_each_entry(tmp, &ln->ln_alive_list, ln_alive_list) {
+		if (tmp->ln_mode == HTREE_LOCK_NL) {
+			ln_event_inc(dep);
+			if (child->lc_callback != NULL)
+				child->lc_callback(tmp->ln_ev_target, event);
+		}
+	}
+}
+
+static int
+htree_node_lock_enqueue(struct htree_lock *newlk, struct htree_lock *curlk,
+			unsigned dep, int wait, void *event)
+{
+	struct htree_lock_child *child = &newlk->lk_head->lh_children[dep];
+	struct htree_lock_node *newln = &newlk->lk_nodes[dep];
+	struct htree_lock_node *curln = &curlk->lk_nodes[dep];
+
+	/* NB: ALWAYS called holding lhead::lh_lock(dep) */
+	/* NB: we only expect PR/PW lock mode at here, only these two modes are
+	 * allowed for htree_node_lock(asserted in htree_node_lock_internal),
+	 * NL is only used for listener, user can't directly require NL mode */
+	if ((curln->ln_mode == HTREE_LOCK_NL) ||
+	    (curln->ln_mode != HTREE_LOCK_PW &&
+	     newln->ln_mode != HTREE_LOCK_PW)) {
+		/* no conflict, attach it on granted list of @curlk */
+		if (curln->ln_mode != HTREE_LOCK_NL) {
+			list_add(&newln->ln_granted_list,
+				 &curln->ln_granted_list);
+		} else {
+			/* replace key owner */
+			htree_key_list_replace_init(curln, newln);
+		}
+
+		list_add(&newln->ln_alive_list, &curln->ln_alive_list);
+		htree_key_event_enqueue(child, newln, dep, event);
+		ln_grant_inc(dep, newln->ln_mode);
+		return 1; /* still hold lh_lock */
+	}
+
+	if (!wait) { /* can't grant and don't want to wait */
+		ln_retry_inc(dep, newln->ln_mode);
+		newln->ln_mode = HTREE_LOCK_INVAL;
+		return -1; /* don't wait and just return -1 */
+	}
+
+	newlk->lk_task = current;
+	set_current_state(TASK_UNINTERRUPTIBLE);
+	/* conflict, attach it on blocked list of curlk */
+	list_add_tail(&newln->ln_blocked_list, &curln->ln_blocked_list);
+	list_add(&newln->ln_alive_list, &curln->ln_alive_list);
+	ln_block_inc(dep, newln->ln_mode);
+
+	htree_spin_unlock(newlk->lk_head, dep);
+	/* wait to be given the lock */
+	if (newlk->lk_task != NULL)
+		schedule();
+	/* granted, no doubt, wake up will set me RUNNING */
+	if (event == NULL || htree_key_event_ignore(child, newln))
+		return 0; /* granted without lh_lock */
+
+	htree_spin_lock(newlk->lk_head, dep);
+	htree_key_event_enqueue(child, newln, dep, event);
+	return 1; /* still hold lh_lock */
+}
+
+/*
+ * get PR/PW access to particular tree-node according to @dep and @key,
+ * it will return -1 if @wait is false and can't immediately grant this lock.
+ * All listeners(HTREE_LOCK_NL) on @dep and with the same @key will get
+ * @event if it's not NULL.
+ * NB: ALWAYS called holding lhead::lh_lock
+ */
+static int
+htree_node_lock_internal(struct htree_lock_head *lhead, struct htree_lock *lck,
+			 htree_lock_mode_t mode, u32 key, unsigned dep,
+			 int wait, void *event)
+{
+	LIST_HEAD(list);
+	struct htree_lock	*tmp;
+	struct htree_lock	*tmp2;
+	u16			major;
+	u16			minor;
+	u8			reverse;
+	u8			ma_bits;
+	u8			mi_bits;
+
+	BUG_ON(mode != HTREE_LOCK_PW && mode != HTREE_LOCK_PR);
+	BUG_ON(htree_node_is_granted(lck, dep));
+
+	key = hash_long(key, lhead->lh_hbits);
+
+	mi_bits = lhead->lh_hbits >> 1;
+	ma_bits = lhead->lh_hbits - mi_bits;
+
+	lck->lk_nodes[dep].ln_major_key = major = key & ((1U << ma_bits) - 1);
+	lck->lk_nodes[dep].ln_minor_key = minor = key >> ma_bits;
+	lck->lk_nodes[dep].ln_mode = mode;
+
+	/*
+	 * The major key list is an ordered list, so searches are started
+	 * at the end of the list that is numerically closer to major_key,
+	 * so at most half of the list will be walked (for well-distributed
+	 * keys). The list traversal aborts early if the expected key
+	 * location is passed.
+	 */
+	reverse = (major >= (1 << (ma_bits - 1)));
+
+	if (reverse) {
+		list_for_each_entry_reverse(tmp,
+					&lhead->lh_children[dep].lc_list,
+					lk_nodes[dep].ln_major_list) {
+			if (tmp->lk_nodes[dep].ln_major_key == major) {
+				goto search_minor;
+
+			} else if (tmp->lk_nodes[dep].ln_major_key < major) {
+				/* attach _after_ @tmp */
+				list_add(&lck->lk_nodes[dep].ln_major_list,
+					 &tmp->lk_nodes[dep].ln_major_list);
+				goto out_grant_major;
+			}
+		}
+
+		list_add(&lck->lk_nodes[dep].ln_major_list,
+			 &lhead->lh_children[dep].lc_list);
+		goto out_grant_major;
+
+	} else {
+		list_for_each_entry(tmp, &lhead->lh_children[dep].lc_list,
+				    lk_nodes[dep].ln_major_list) {
+			if (tmp->lk_nodes[dep].ln_major_key == major) {
+				goto search_minor;
+
+			} else if (tmp->lk_nodes[dep].ln_major_key > major) {
+				/* insert _before_ @tmp */
+				list_add_tail(&lck->lk_nodes[dep].ln_major_list,
+					&tmp->lk_nodes[dep].ln_major_list);
+				goto out_grant_major;
+			}
+		}
+
+		list_add_tail(&lck->lk_nodes[dep].ln_major_list,
+			      &lhead->lh_children[dep].lc_list);
+		goto out_grant_major;
+	}
+
+ search_minor:
+	/*
+	 * NB: minor_key list doesn't have a "head", @list is just a
+	 * temporary stub for helping list searching, make sure it's removed
+	 * after searching.
+	 * minor_key list is an ordered list too.
+	 */
+	list_add_tail(&list, &tmp->lk_nodes[dep].ln_minor_list);
+
+	reverse = (minor >= (1 << (mi_bits - 1)));
+
+	if (reverse) {
+		list_for_each_entry_reverse(tmp2, &list,
+					    lk_nodes[dep].ln_minor_list) {
+			if (tmp2->lk_nodes[dep].ln_minor_key == minor) {
+				goto out_enqueue;
+
+			} else if (tmp2->lk_nodes[dep].ln_minor_key < minor) {
+				/* attach _after_ @tmp2 */
+				list_add(&lck->lk_nodes[dep].ln_minor_list,
+					 &tmp2->lk_nodes[dep].ln_minor_list);
+				goto out_grant_minor;
+			}
+		}
+
+		list_add(&lck->lk_nodes[dep].ln_minor_list, &list);
+
+	} else {
+		list_for_each_entry(tmp2, &list,
+				    lk_nodes[dep].ln_minor_list) {
+			if (tmp2->lk_nodes[dep].ln_minor_key == minor) {
+				goto out_enqueue;
+
+			} else if (tmp2->lk_nodes[dep].ln_minor_key > minor) {
+				/* insert _before_ @tmp2 */
+				list_add_tail(&lck->lk_nodes[dep].ln_minor_list,
+					&tmp2->lk_nodes[dep].ln_minor_list);
+				goto out_grant_minor;
+			}
+		}
+
+		list_add_tail(&lck->lk_nodes[dep].ln_minor_list, &list);
+	}
+
+ out_grant_minor:
+	if (list.next == &lck->lk_nodes[dep].ln_minor_list) {
+		/* new lock @lck is the first one on minor_key list, which
+		 * means it has the smallest minor_key and it should
+		 * replace @tmp as minor_key owner */
+		list_replace_init(&tmp->lk_nodes[dep].ln_major_list,
+				  &lck->lk_nodes[dep].ln_major_list);
+	}
+	/* remove the temporary head */
+	list_del(&list);
+
+ out_grant_major:
+	ln_grant_inc(dep, lck->lk_nodes[dep].ln_mode);
+	return 1; /* granted with holding lh_lock */
+
+ out_enqueue:
+	list_del(&list); /* remove temprary head */
+	return htree_node_lock_enqueue(lck, tmp2, dep, wait, event);
+}
+
+/*
+ * release the key of @lck at level @dep, and grant any blocked locks.
+ * caller will still listen on @key if @event is not NULL, which means
+ * caller can see a event (by event_cb) while granting any lock with
+ * the same key at level @dep.
+ * NB: ALWAYS called holding lhead::lh_lock
+ * NB: listener will not block anyone because listening mode is HTREE_LOCK_NL
+ */
+static void
+htree_node_unlock_internal(struct htree_lock_head *lhead,
+			   struct htree_lock *curlk, unsigned dep, void *event)
+{
+	struct htree_lock_node	*curln = &curlk->lk_nodes[dep];
+	struct htree_lock	*grtlk = NULL;
+	struct htree_lock_node	*grtln;
+	struct htree_lock	*poslk;
+	struct htree_lock	*tmplk;
+
+	if (!htree_node_is_granted(curlk, dep))
+		return;
+
+	if (!list_empty(&curln->ln_granted_list)) {
+		/* there is another granted lock */
+		grtlk = list_entry(curln->ln_granted_list.next,
+				   struct htree_lock,
+				   lk_nodes[dep].ln_granted_list);
+		list_del_init(&curln->ln_granted_list);
+	}
+
+	if (grtlk == NULL && !list_empty(&curln->ln_blocked_list)) {
+		/*
+		 * @curlk is the only granted lock, so we confirmed:
+		 * a) curln is key owner (attached on major/minor_list),
+		 *    so if there is any blocked lock, it should be attached
+		 *    on curln->ln_blocked_list
+		 * b) we always can grant the first blocked lock
+		 */
+		grtlk = list_entry(curln->ln_blocked_list.next,
+				   struct htree_lock,
+				   lk_nodes[dep].ln_blocked_list);
+		BUG_ON(grtlk->lk_task == NULL);
+		wake_up_process(grtlk->lk_task);
+	}
+
+	if (event != NULL &&
+	    lhead->lh_children[dep].lc_events != HTREE_EVENT_DISABLE) {
+		curln->ln_ev_target = event;
+		curln->ln_mode = HTREE_LOCK_NL; /* listen! */
+	} else {
+		curln->ln_mode = HTREE_LOCK_INVAL;
+	}
+
+	if (grtlk == NULL) { /* I must be the only one locking this key */
+		struct htree_lock_node *tmpln;
+
+		BUG_ON(htree_key_list_empty(curln));
+
+		if (curln->ln_mode == HTREE_LOCK_NL) /* listening */
+			return;
+
+		/* not listening */
+		if (list_empty(&curln->ln_alive_list)) { /* no more listener */
+			htree_key_list_del_init(curln);
+			return;
+		}
+
+		tmpln = list_entry(curln->ln_alive_list.next,
+				   struct htree_lock_node, ln_alive_list);
+
+		BUG_ON(tmpln->ln_mode != HTREE_LOCK_NL);
+
+		htree_key_list_replace_init(curln, tmpln);
+		list_del_init(&curln->ln_alive_list);
+
+		return;
+	}
+
+	/* have a granted lock */
+	grtln = &grtlk->lk_nodes[dep];
+	if (!list_empty(&curln->ln_blocked_list)) {
+		/* only key owner can be on both lists */
+		BUG_ON(htree_key_list_empty(curln));
+
+		if (list_empty(&grtln->ln_blocked_list)) {
+			list_add(&grtln->ln_blocked_list,
+				 &curln->ln_blocked_list);
+		}
+		list_del_init(&curln->ln_blocked_list);
+	}
+	/*
+	 * NB: this is the tricky part:
+	 * We have only two modes for child-lock (PR and PW), also,
+	 * only owner of the key (attached on major/minor_list) can be on
+	 * both blocked_list and granted_list, so @grtlk must be one
+	 * of these two cases:
+	 *
+	 * a) @grtlk is taken from granted_list, which means we've granted
+	 *    more than one lock so @grtlk has to be PR, the first blocked
+	 *    lock must be PW and we can't grant it at all.
+	 *    So even @grtlk is not owner of the key (empty blocked_list),
+	 *    we don't care because we can't grant any lock.
+	 * b) we just grant a new lock which is taken from head of blocked
+	 *    list, and it should be the first granted lock, and it should
+	 *    be the first one linked on blocked_list.
+	 *
+	 * Either way, we can get correct result by iterating blocked_list
+	 * of @grtlk, and don't have to bother on how to find out
+	 * owner of current key.
+	 */
+	list_for_each_entry_safe(poslk, tmplk, &grtln->ln_blocked_list,
+				 lk_nodes[dep].ln_blocked_list) {
+		if (grtlk->lk_nodes[dep].ln_mode == HTREE_LOCK_PW ||
+		    poslk->lk_nodes[dep].ln_mode == HTREE_LOCK_PW)
+			break;
+		/* grant all readers */
+		list_del_init(&poslk->lk_nodes[dep].ln_blocked_list);
+		list_add(&poslk->lk_nodes[dep].ln_granted_list,
+			 &grtln->ln_granted_list);
+
+		BUG_ON(poslk->lk_task == NULL);
+		wake_up_process(poslk->lk_task);
+	}
+
+	/* if @curln is the owner of this key, replace it with @grtln */
+	if (!htree_key_list_empty(curln))
+		htree_key_list_replace_init(curln, grtln);
+
+	if (curln->ln_mode == HTREE_LOCK_INVAL)
+		list_del_init(&curln->ln_alive_list);
+}
+
+/*
+ * it's just wrapper of htree_node_lock_internal, it returns 1 on granted
+ * and 0 only if @wait is false and can't grant it immediately
+ */
+int
+htree_node_lock_try(struct htree_lock *lck, htree_lock_mode_t mode,
+		    u32 key, unsigned dep, int wait, void *event)
+{
+	struct htree_lock_head *lhead = lck->lk_head;
+	int rc;
+
+	BUG_ON(dep >= lck->lk_depth);
+	BUG_ON(lck->lk_mode == HTREE_LOCK_INVAL);
+
+	htree_spin_lock(lhead, dep);
+	rc = htree_node_lock_internal(lhead, lck, mode, key, dep, wait, event);
+	if (rc != 0)
+		htree_spin_unlock(lhead, dep);
+	return rc >= 0;
+}
+EXPORT_SYMBOL(htree_node_lock_try);
+
+/* it's wrapper of htree_node_unlock_internal */
+void
+htree_node_unlock(struct htree_lock *lck, unsigned dep, void *event)
+{
+	struct htree_lock_head *lhead = lck->lk_head;
+
+	BUG_ON(dep >= lck->lk_depth);
+	BUG_ON(lck->lk_mode == HTREE_LOCK_INVAL);
+
+	htree_spin_lock(lhead, dep);
+	htree_node_unlock_internal(lhead, lck, dep, event);
+	htree_spin_unlock(lhead, dep);
+}
+EXPORT_SYMBOL(htree_node_unlock);
+
+/* stop listening on child-lock level @dep */
+void
+htree_node_stop_listen(struct htree_lock *lck, unsigned dep)
+{
+	struct htree_lock_node *ln = &lck->lk_nodes[dep];
+	struct htree_lock_node *tmp;
+
+	BUG_ON(htree_node_is_granted(lck, dep));
+	BUG_ON(!list_empty(&ln->ln_blocked_list));
+	BUG_ON(!list_empty(&ln->ln_granted_list));
+
+	if (!htree_node_is_listening(lck, dep))
+		return;
+
+	htree_spin_lock(lck->lk_head, dep);
+	ln->ln_mode = HTREE_LOCK_INVAL;
+	ln->ln_ev_target = NULL;
+
+	if (htree_key_list_empty(ln)) { /* not owner */
+		list_del_init(&ln->ln_alive_list);
+		goto out;
+	}
+
+	/* I'm the owner... */
+	if (list_empty(&ln->ln_alive_list)) { /* no more listener */
+		htree_key_list_del_init(ln);
+		goto out;
+	}
+
+	tmp = list_entry(ln->ln_alive_list.next,
+			 struct htree_lock_node, ln_alive_list);
+
+	BUG_ON(tmp->ln_mode != HTREE_LOCK_NL);
+	htree_key_list_replace_init(ln, tmp);
+	list_del_init(&ln->ln_alive_list);
+ out:
+	htree_spin_unlock(lck->lk_head, dep);
+}
+EXPORT_SYMBOL(htree_node_stop_listen);
+
+/* release all child-locks if we have any */
+static void
+htree_node_release_all(struct htree_lock *lck)
+{
+	int	i;
+
+	for (i = 0; i < lck->lk_depth; i++) {
+		if (htree_node_is_granted(lck, i))
+			htree_node_unlock(lck, i, NULL);
+		else if (htree_node_is_listening(lck, i))
+			htree_node_stop_listen(lck, i);
+	}
+}
+
+/*
+ * obtain htree lock, it could be blocked inside if there's conflict
+ * with any granted or blocked lock and @wait is true.
+ * NB: ALWAYS called holding lhead::lh_lock
+ */
+static int
+htree_lock_internal(struct htree_lock *lck, int wait)
+{
+	struct htree_lock_head *lhead = lck->lk_head;
+	int	granted = 0;
+	int	blocked = 0;
+	int	i;
+
+	for (i = 0; i < HTREE_LOCK_MAX; i++) {
+		if (lhead->lh_ngranted[i] != 0)
+			granted |= 1 << i;
+		if (lhead->lh_nblocked[i] != 0)
+			blocked |= 1 << i;
+	}
+	if ((htree_lock_compat[lck->lk_mode] & granted) != granted ||
+	    (htree_lock_compat[lck->lk_mode] & blocked) != blocked) {
+		/* will block current lock even it just conflicts with any
+		 * other blocked lock, so lock like EX wouldn't starve */
+		if (!wait)
+			return -1;
+		lhead->lh_nblocked[lck->lk_mode]++;
+		lk_block_inc(lck->lk_mode);
+
+		lck->lk_task = current;
+		list_add_tail(&lck->lk_blocked_list, &lhead->lh_blocked_list);
+
+retry:
+		set_current_state(TASK_UNINTERRUPTIBLE);
+		htree_spin_unlock(lhead, HTREE_DEP_ROOT);
+		/* wait to be given the lock */
+		if (lck->lk_task != NULL)
+			schedule();
+		/* granted, no doubt. wake up will set me RUNNING.
+		 * Since thread would be waken up accidentally,
+		 * so we need check lock whether granted or not again. */
+		if (!list_empty(&lck->lk_blocked_list)) {
+			htree_spin_lock(lhead, HTREE_DEP_ROOT);
+			if (list_empty(&lck->lk_blocked_list)) {
+				htree_spin_unlock(lhead, HTREE_DEP_ROOT);
+				return 0;
+			}
+			goto retry;
+		}
+		return 0; /* without lh_lock */
+	}
+	lhead->lh_ngranted[lck->lk_mode]++;
+	lk_grant_inc(lck->lk_mode);
+	return 1;
+}
+
+/* release htree lock. NB: ALWAYS called holding lhead::lh_lock */
+static void
+htree_unlock_internal(struct htree_lock *lck)
+{
+	struct htree_lock_head *lhead = lck->lk_head;
+	struct htree_lock *tmp;
+	struct htree_lock *tmp2;
+	int granted = 0;
+	int i;
+
+	BUG_ON(lhead->lh_ngranted[lck->lk_mode] == 0);
+
+	lhead->lh_ngranted[lck->lk_mode]--;
+	lck->lk_mode = HTREE_LOCK_INVAL;
+
+	for (i = 0; i < HTREE_LOCK_MAX; i++) {
+		if (lhead->lh_ngranted[i] != 0)
+			granted |= 1 << i;
+	}
+	list_for_each_entry_safe(tmp, tmp2,
+				 &lhead->lh_blocked_list, lk_blocked_list) {
+		/* conflict with any granted lock? */
+		if ((htree_lock_compat[tmp->lk_mode] & granted) != granted)
+			break;
+
+		list_del_init(&tmp->lk_blocked_list);
+
+		BUG_ON(lhead->lh_nblocked[tmp->lk_mode] == 0);
+
+		lhead->lh_nblocked[tmp->lk_mode]--;
+		lhead->lh_ngranted[tmp->lk_mode]++;
+		granted |= 1 << tmp->lk_mode;
+
+		BUG_ON(tmp->lk_task == NULL);
+		wake_up_process(tmp->lk_task);
+	}
+}
+
+/* it's wrapper of htree_lock_internal and exported interface.
+ * It always return 1 with granted lock if @wait is true, it can return 0
+ * if @wait is false and locking request can't be granted immediately */
+int
+htree_lock_try(struct htree_lock *lck, struct htree_lock_head *lhead,
+	       htree_lock_mode_t mode, int wait)
+{
+	int	rc;
+
+	BUG_ON(lck->lk_depth > lhead->lh_depth);
+	BUG_ON(lck->lk_head != NULL);
+	BUG_ON(lck->lk_task != NULL);
+
+	lck->lk_head = lhead;
+	lck->lk_mode = mode;
+
+	htree_spin_lock(lhead, HTREE_DEP_ROOT);
+	rc = htree_lock_internal(lck, wait);
+	if (rc != 0)
+		htree_spin_unlock(lhead, HTREE_DEP_ROOT);
+	return rc >= 0;
+}
+EXPORT_SYMBOL(htree_lock_try);
+
+/* it's wrapper of htree_unlock_internal and exported interface.
+ * It will release all htree_node_locks and htree_lock */
+void
+htree_unlock(struct htree_lock *lck)
+{
+	BUG_ON(lck->lk_head == NULL);
+	BUG_ON(lck->lk_mode == HTREE_LOCK_INVAL);
+
+	htree_node_release_all(lck);
+
+	htree_spin_lock(lck->lk_head, HTREE_DEP_ROOT);
+	htree_unlock_internal(lck);
+	htree_spin_unlock(lck->lk_head, HTREE_DEP_ROOT);
+	lck->lk_head = NULL;
+	lck->lk_task = NULL;
+}
+EXPORT_SYMBOL(htree_unlock);
+
+/* change lock mode */
+void
+htree_change_mode(struct htree_lock *lck, htree_lock_mode_t mode)
+{
+	BUG_ON(lck->lk_mode == HTREE_LOCK_INVAL);
+	lck->lk_mode = mode;
+}
+EXPORT_SYMBOL(htree_change_mode);
+
+/* release htree lock, and lock it again with new mode.
+ * This function will first release all htree_node_locks and htree_lock,
+ * then try to gain htree_lock with new @mode.
+ * It always return 1 with granted lock if @wait is true, it can return 0
+ * if @wait is false and locking request can't be granted immediately */
+int
+htree_change_lock_try(struct htree_lock *lck, htree_lock_mode_t mode, int wait)
+{
+	struct htree_lock_head *lhead = lck->lk_head;
+	int rc;
+
+	BUG_ON(lhead == NULL);
+	BUG_ON(lck->lk_mode == mode);
+	BUG_ON(lck->lk_mode == HTREE_LOCK_INVAL || mode == HTREE_LOCK_INVAL);
+
+	htree_node_release_all(lck);
+
+	htree_spin_lock(lhead, HTREE_DEP_ROOT);
+	htree_unlock_internal(lck);
+	lck->lk_mode = mode;
+	rc = htree_lock_internal(lck, wait);
+	if (rc != 0)
+		htree_spin_unlock(lhead, HTREE_DEP_ROOT);
+	return rc >= 0;
+}
+EXPORT_SYMBOL(htree_change_lock_try);
+
+/* create a htree_lock head with @depth levels (number of child-locks),
+ * it is a per resoruce structure */
+struct htree_lock_head *
+htree_lock_head_alloc(unsigned depth, unsigned hbits, unsigned priv)
+{
+	struct htree_lock_head *lhead;
+	int  i;
+
+	if (depth > HTREE_LOCK_DEP_MAX) {
+		printk(KERN_ERR "%d is larger than max htree_lock depth %d\n",
+			depth, HTREE_LOCK_DEP_MAX);
+		return NULL;
+	}
+
+	lhead = kzalloc(offsetof(struct htree_lock_head,
+				 lh_children[depth]) + priv, GFP_NOFS);
+	if (lhead == NULL)
+		return NULL;
+
+	if (hbits < HTREE_HBITS_MIN)
+		lhead->lh_hbits = HTREE_HBITS_MIN;
+	else if (hbits > HTREE_HBITS_MAX)
+		lhead->lh_hbits = HTREE_HBITS_MAX;
+
+	lhead->lh_lock = 0;
+	lhead->lh_depth = depth;
+	INIT_LIST_HEAD(&lhead->lh_blocked_list);
+	if (priv > 0) {
+		lhead->lh_private = (void *)lhead +
+			offsetof(struct htree_lock_head, lh_children[depth]);
+	}
+
+	for (i = 0; i < depth; i++) {
+		INIT_LIST_HEAD(&lhead->lh_children[i].lc_list);
+		lhead->lh_children[i].lc_events = HTREE_EVENT_DISABLE;
+	}
+	return lhead;
+}
+EXPORT_SYMBOL(htree_lock_head_alloc);
+
+/* free the htree_lock head */
+void
+htree_lock_head_free(struct htree_lock_head *lhead)
+{
+	int     i;
+
+	BUG_ON(!list_empty(&lhead->lh_blocked_list));
+	for (i = 0; i < lhead->lh_depth; i++)
+		BUG_ON(!list_empty(&lhead->lh_children[i].lc_list));
+	kfree(lhead);
+}
+EXPORT_SYMBOL(htree_lock_head_free);
+
+/* register event callback for @events of child-lock at level @dep */
+void
+htree_lock_event_attach(struct htree_lock_head *lhead, unsigned dep,
+			unsigned events, htree_event_cb_t callback)
+{
+	BUG_ON(lhead->lh_depth <= dep);
+	lhead->lh_children[dep].lc_events = events;
+	lhead->lh_children[dep].lc_callback = callback;
+}
+EXPORT_SYMBOL(htree_lock_event_attach);
+
+/* allocate a htree_lock, which is per-thread structure, @pbytes is some
+ * extra-bytes as private data for caller */
+struct htree_lock *
+htree_lock_alloc(unsigned depth, unsigned pbytes)
+{
+	struct htree_lock *lck;
+	int i = offsetof(struct htree_lock, lk_nodes[depth]);
+
+	if (depth > HTREE_LOCK_DEP_MAX) {
+		printk(KERN_ERR "%d is larger than max htree_lock depth %d\n",
+			depth, HTREE_LOCK_DEP_MAX);
+		return NULL;
+	}
+	lck = kzalloc(i + pbytes, GFP_NOFS);
+	if (lck == NULL)
+		return NULL;
+
+	if (pbytes != 0)
+		lck->lk_private = (void *)lck + i;
+	lck->lk_mode = HTREE_LOCK_INVAL;
+	lck->lk_depth = depth;
+	INIT_LIST_HEAD(&lck->lk_blocked_list);
+
+	for (i = 0; i < depth; i++) {
+		struct htree_lock_node *node = &lck->lk_nodes[i];
+
+		node->ln_mode = HTREE_LOCK_INVAL;
+		INIT_LIST_HEAD(&node->ln_major_list);
+		INIT_LIST_HEAD(&node->ln_minor_list);
+		INIT_LIST_HEAD(&node->ln_alive_list);
+		INIT_LIST_HEAD(&node->ln_blocked_list);
+		INIT_LIST_HEAD(&node->ln_granted_list);
+	}
+
+	return lck;
+}
+EXPORT_SYMBOL(htree_lock_alloc);
+
+/* free htree_lock node */
+void
+htree_lock_free(struct htree_lock *lck)
+{
+	BUG_ON(lck->lk_mode != HTREE_LOCK_INVAL);
+	kfree(lck);
+}
+EXPORT_SYMBOL(htree_lock_free);
diff --git a/fs/ext4/htree_lock.h b/fs/ext4/htree_lock.h
new file mode 100644
index 0000000..9dc7788
--- /dev/null
+++ b/fs/ext4/htree_lock.h
@@ -0,0 +1,187 @@
+/*
+ * include/linux/htree_lock.h
+ *
+ * Copyright (c) 2011, 2012, Intel Corporation.
+ *
+ * Author: Liang Zhen <liang@whamcloud.com>
+ */
+
+/*
+ * htree lock
+ *
+ * htree_lock is an advanced lock, it can support five lock modes (concept is
+ * taken from DLM) and it's a sleeping lock.
+ *
+ * most common use case is:
+ * - create a htree_lock_head for data
+ * - each thread (contender) creates it's own htree_lock
+ * - contender needs to call htree_lock(lock_node, mode) to protect data and
+ *   call htree_unlock to release lock
+ *
+ * Also, there is advanced use-case which is more complex, user can have
+ * PW/PR lock on particular key, it's mostly used while user holding shared
+ * lock on the htree (CW, CR)
+ *
+ * htree_lock(lock_node, HTREE_LOCK_CR); lock the htree with CR
+ * htree_node_lock(lock_node, HTREE_LOCK_PR, key...); lock @key with PR
+ * ...
+ * htree_node_unlock(lock_node);; unlock the key
+ *
+ * Another tip is, we can have N-levels of this kind of keys, all we need to
+ * do is specifying N-levels while creating htree_lock_head, then we can
+ * lock/unlock a specific level by:
+ * htree_node_lock(lock_node, mode1, key1, level1...);
+ * do something;
+ * htree_node_lock(lock_node, mode1, key2, level2...);
+ * do something;
+ * htree_node_unlock(lock_node, level2);
+ * htree_node_unlock(lock_node, level1);
+ *
+ * NB: for multi-level, should be careful about locking order to avoid deadlock
+ */
+
+#ifndef _LINUX_HTREE_LOCK_H
+#define _LINUX_HTREE_LOCK_H
+
+#include <linux/list.h>
+#include <linux/spinlock.h>
+#include <linux/sched.h>
+
+/*
+ * Lock Modes
+ * more details can be found here:
+ * http://en.wikipedia.org/wiki/Distributed_lock_manager
+ */
+typedef enum {
+	HTREE_LOCK_EX	= 0, /* exclusive lock: incompatible with all others */
+	HTREE_LOCK_PW,	     /* protected write: allows only CR users */
+	HTREE_LOCK_PR,	     /* protected read: allow PR, CR users */
+	HTREE_LOCK_CW,	     /* concurrent write: allow CR, CW users */
+	HTREE_LOCK_CR,	     /* concurrent read: allow all but EX users */
+	HTREE_LOCK_MAX,	     /* number of lock modes */
+} htree_lock_mode_t;
+
+#define HTREE_LOCK_NL		HTREE_LOCK_MAX
+#define HTREE_LOCK_INVAL	0xdead10c
+
+enum {
+	HTREE_HBITS_MIN		= 2,
+	HTREE_HBITS_DEF		= 14,
+	HTREE_HBITS_MAX		= 32,
+};
+
+enum {
+	HTREE_EVENT_DISABLE	= (0),
+	HTREE_EVENT_RD		= (1 << HTREE_LOCK_PR),
+	HTREE_EVENT_WR		= (1 << HTREE_LOCK_PW),
+	HTREE_EVENT_RDWR	= (HTREE_EVENT_RD | HTREE_EVENT_WR),
+};
+
+struct htree_lock;
+
+typedef void (*htree_event_cb_t)(void *target, void *event);
+
+struct htree_lock_child {
+	struct list_head	lc_list;	/* granted list */
+	htree_event_cb_t	lc_callback;	/* event callback */
+	unsigned		lc_events;	/* event types */
+};
+
+struct htree_lock_head {
+	unsigned long		lh_lock;	/* bits lock */
+	/* blocked lock list (htree_lock) */
+	struct list_head	lh_blocked_list;
+	/* # key levels */
+	u16			lh_depth;
+	/* hash bits for key and limit number of locks */
+	u16			lh_hbits;
+	/* counters for blocked locks */
+	u16			lh_nblocked[HTREE_LOCK_MAX];
+	/* counters for granted locks */
+	u16			lh_ngranted[HTREE_LOCK_MAX];
+	/* private data */
+	void			*lh_private;
+	/* array of children locks */
+	struct htree_lock_child	lh_children[0];
+};
+
+/* htree_lock_node_t is child-lock for a specific key (ln_value) */
+struct htree_lock_node {
+	htree_lock_mode_t	ln_mode;
+	/* major hash key */
+	u16			ln_major_key;
+	/* minor hash key */
+	u16			ln_minor_key;
+	struct list_head	ln_major_list;
+	struct list_head	ln_minor_list;
+	/* alive list, all locks (granted, blocked, listening) are on it */
+	struct list_head	ln_alive_list;
+	/* blocked list */
+	struct list_head	ln_blocked_list;
+	/* granted list */
+	struct list_head	ln_granted_list;
+	void			*ln_ev_target;
+};
+
+struct htree_lock {
+	struct task_struct	*lk_task;
+	struct htree_lock_head	*lk_head;
+	void			*lk_private;
+	unsigned		lk_depth;
+	htree_lock_mode_t	lk_mode;
+	struct list_head	lk_blocked_list;
+	struct htree_lock_node	lk_nodes[0];
+};
+
+/* create a lock head, which stands for a resource */
+struct htree_lock_head *htree_lock_head_alloc(unsigned depth,
+					      unsigned hbits, unsigned priv);
+/* free a lock head */
+void htree_lock_head_free(struct htree_lock_head *lhead);
+/* register event callback for child lock at level @depth */
+void htree_lock_event_attach(struct htree_lock_head *lhead, unsigned depth,
+			     unsigned events, htree_event_cb_t callback);
+/* create a lock handle, which stands for a thread */
+struct htree_lock *htree_lock_alloc(unsigned depth, unsigned pbytes);
+/* free a lock handle */
+void htree_lock_free(struct htree_lock *lck);
+/* lock htree, when @wait is true, 0 is returned if the lock can't
+ * be granted immediately */
+int htree_lock_try(struct htree_lock *lck, struct htree_lock_head *lhead,
+		   htree_lock_mode_t mode, int wait);
+/* unlock htree */
+void htree_unlock(struct htree_lock *lck);
+/* unlock and relock htree with @new_mode */
+int htree_change_lock_try(struct htree_lock *lck,
+			  htree_lock_mode_t new_mode, int wait);
+void htree_change_mode(struct htree_lock *lck, htree_lock_mode_t mode);
+/* require child lock (key) of htree at level @dep, @event will be sent to all
+ * listeners on this @key while lock being granted */
+int htree_node_lock_try(struct htree_lock *lck, htree_lock_mode_t mode,
+			u32 key, unsigned dep, int wait, void *event);
+/* release child lock at level @dep, this lock will listen on it's key
+ * if @event isn't NULL, event_cb will be called against @lck while granting
+ * any other lock at level @dep with the same key */
+void htree_node_unlock(struct htree_lock *lck, unsigned dep, void *event);
+/* stop listening on child lock@level @dep */
+void htree_node_stop_listen(struct htree_lock *lck, unsigned dep);
+/* for debug */
+void htree_lock_stat_print(int depth);
+void htree_lock_stat_reset(void);
+
+#define htree_lock(lck, lh, mode)	htree_lock_try(lck, lh, mode, 1)
+#define htree_change_lock(lck, mode)	htree_change_lock_try(lck, mode, 1)
+
+#define htree_lock_mode(lck)		((lck)->lk_mode)
+
+#define htree_node_lock(lck, mode, key, dep)	\
+	htree_node_lock_try(lck, mode, key, dep, 1, NULL)
+/* this is only safe in thread context of lock owner */
+#define htree_node_is_granted(lck, dep)		\
+	((lck)->lk_nodes[dep].ln_mode != HTREE_LOCK_INVAL && \
+	 (lck)->lk_nodes[dep].ln_mode != HTREE_LOCK_NL)
+/* this is only safe in thread context of lock owner */
+#define htree_node_is_listening(lck, dep)	\
+	((lck)->lk_nodes[dep].ln_mode == HTREE_LOCK_NL)
+
+#endif
diff --git a/fs/ext4/namei.c b/fs/ext4/namei.c
index 9cb86e4..0153c4d 100644
--- a/fs/ext4/namei.c
+++ b/fs/ext4/namei.c
@@ -54,6 +54,7 @@ struct buffer_head *ext4_append(handle_t *handle,
 				struct inode *inode,
 				ext4_lblk_t *block)
 {
+	struct ext4_inode_info *ei = EXT4_I(inode);
 	struct buffer_head *bh;
 	int err;
 
@@ -62,15 +63,24 @@ struct buffer_head *ext4_append(handle_t *handle,
 		      EXT4_SB(inode->i_sb)->s_max_dir_size_kb)))
 		return ERR_PTR(-ENOSPC);
 
+	/* with parallel dir operations all appends
+	 * have to be serialized -bzzz
+	 */
+	down(&ei->i_append_sem);
+
 	*block = inode->i_size >> inode->i_sb->s_blocksize_bits;
 
 	bh = ext4_bread(handle, inode, *block, EXT4_GET_BLOCKS_CREATE);
-	if (IS_ERR(bh))
+	if (IS_ERR(bh)) {
+		up(&ei->i_append_sem);
 		return bh;
+	}
+
 	inode->i_size += inode->i_sb->s_blocksize;
 	EXT4_I(inode)->i_disksize = inode->i_size;
 	BUFFER_TRACE(bh, "get_write_access");
 	err = ext4_journal_get_write_access(handle, bh);
+	up(&ei->i_append_sem);
 	if (err) {
 		brelse(bh);
 		ext4_std_error(inode->i_sb, err);
@@ -250,7 +260,8 @@ static unsigned int inline dx_root_limit(struct inode *dir,
 static struct dx_frame *dx_probe(struct ext4_filename *fname,
 				 struct inode *dir,
 				 struct dx_hash_info *hinfo,
-				 struct dx_frame *frame);
+				 struct dx_frame *frame,
+				 struct htree_lock *lck);
 static void dx_release(struct dx_frame *frames);
 static int dx_make_map(struct inode *dir, struct ext4_dir_entry_2 *de,
 		       unsigned blocksize, struct dx_hash_info *hinfo,
@@ -264,12 +275,13 @@ static void dx_insert_block(struct dx_frame *frame,
 static int ext4_htree_next_block(struct inode *dir, __u32 hash,
 				 struct dx_frame *frame,
 				 struct dx_frame *frames,
-				 __u32 *start_hash);
+				 __u32 *start_hash, struct htree_lock *lck);
 static struct buffer_head * ext4_dx_find_entry(struct inode *dir,
 		struct ext4_filename *fname,
-		struct ext4_dir_entry_2 **res_dir);
+		struct ext4_dir_entry_2 **res_dir, struct htree_lock *lck);
 static int ext4_dx_add_entry(handle_t *handle, struct ext4_filename *fname,
-			     struct inode *dir, struct inode *inode);
+			     struct inode *dir, struct inode *inode,
+			     struct htree_lock *lck);
 
 /* checksumming functions */
 void initialize_dirent_tail(struct ext4_dir_entry_tail *t,
@@ -734,6 +746,226 @@ struct stats dx_show_entries(struct dx_hash_info *hinfo, struct inode *dir,
 }
 #endif /* DX_DEBUG */
 
+/* private data for htree_lock */
+struct ext4_dir_lock_data {
+	unsigned int		ld_flags;	/* bits-map for lock types */
+	unsigned int		ld_count;	/* # entries of the last DX block */
+	struct dx_entry		ld_at_entry;	/* copy of leaf dx_entry */
+	struct dx_entry		*ld_at;		/* position of leaf dx_entry */
+};
+
+#define ext4_htree_lock_data(l)	((struct ext4_dir_lock_data *)(l)->lk_private)
+#define ext4_find_entry(dir, name, dirent, inline) \
+			ext4_find_entry_locked(dir, name, dirent, inline, NULL)
+#define ext4_add_entry(handle, dentry, inode) \
+		       ext4_add_entry_locked(handle, dentry, inode, NULL)
+
+/* NB: ext4_lblk_t is 32 bits so we use high bits to identify invalid blk */
+#define EXT4_HTREE_NODE_CHANGED		(0xcafeULL << 32)
+
+static void ext4_htree_event_cb(void *target, void *event)
+{
+	u64 *block = (u64 *)target;
+
+	if (*block == dx_get_block((struct dx_entry *)event))
+		*block = EXT4_HTREE_NODE_CHANGED;
+}
+
+struct htree_lock_head *ext4_htree_lock_head_alloc(unsigned hbits)
+{
+	struct htree_lock_head *lhead;
+
+	lhead = htree_lock_head_alloc(EXT4_LK_MAX, hbits, 0);
+	if (lhead) {
+		htree_lock_event_attach(lhead, EXT4_LK_SPIN, HTREE_EVENT_WR,
+					ext4_htree_event_cb);
+	}
+	return lhead;
+}
+EXPORT_SYMBOL(ext4_htree_lock_head_alloc);
+
+struct htree_lock *ext4_htree_lock_alloc(void)
+{
+	return htree_lock_alloc(EXT4_LK_MAX,
+				sizeof(struct ext4_dir_lock_data));
+}
+EXPORT_SYMBOL(ext4_htree_lock_alloc);
+
+static htree_lock_mode_t ext4_htree_mode(unsigned flags)
+{
+	switch (flags) {
+	default: /* 0 or unknown flags require EX lock */
+		return HTREE_LOCK_EX;
+	case EXT4_HLOCK_READDIR:
+		return HTREE_LOCK_PR;
+	case EXT4_HLOCK_LOOKUP:
+		return HTREE_LOCK_CR;
+	case EXT4_HLOCK_DEL:
+	case EXT4_HLOCK_ADD:
+		return HTREE_LOCK_CW;
+	}
+}
+
+/* return PR for read-only operations, otherwise return EX */
+static inline htree_lock_mode_t ext4_htree_safe_mode(unsigned flags)
+{
+	int writer = (flags & EXT4_LB_DE) == EXT4_LB_DE;
+
+	/* 0 requires EX lock */
+	return (flags == 0 || writer) ? HTREE_LOCK_EX : HTREE_LOCK_PR;
+}
+
+static int ext4_htree_safe_locked(struct htree_lock *lck)
+{
+	int writer;
+
+	if (!lck || lck->lk_mode == HTREE_LOCK_EX)
+		return 1;
+
+	writer = (ext4_htree_lock_data(lck)->ld_flags & EXT4_LB_DE) ==
+		  EXT4_LB_DE;
+	if (writer) /* all readers & writers are excluded? */
+		return lck->lk_mode == HTREE_LOCK_EX;
+
+	/* all writers are excluded? */
+	return lck->lk_mode == HTREE_LOCK_PR ||
+	       lck->lk_mode == HTREE_LOCK_PW ||
+	       lck->lk_mode == HTREE_LOCK_EX;
+}
+
+/* relock htree_lock with EX mode if it's change operation, otherwise
+ * relock it with PR mode. It's noop if PDO is disabled.
+ */
+static void ext4_htree_safe_relock(struct htree_lock *lck)
+{
+	if (!ext4_htree_safe_locked(lck)) {
+		unsigned int flags = ext4_htree_lock_data(lck)->ld_flags;
+
+		htree_change_lock(lck, ext4_htree_safe_mode(flags));
+	}
+}
+
+void ext4_htree_lock(struct htree_lock *lck, struct htree_lock_head *lhead,
+		     struct inode *dir, unsigned flags)
+{
+	htree_lock_mode_t mode = is_dx(dir) ? ext4_htree_mode(flags) :
+					      ext4_htree_safe_mode(flags);
+
+	ext4_htree_lock_data(lck)->ld_flags = flags;
+	htree_lock(lck, lhead, mode);
+	if (!is_dx(dir))
+		ext4_htree_safe_relock(lck); /* make sure it's safe locked */
+}
+EXPORT_SYMBOL(ext4_htree_lock);
+
+static int ext4_htree_node_lock(struct htree_lock *lck, struct dx_entry *at,
+				unsigned int lmask, int wait, void *ev)
+{
+	u32 key = (at == NULL) ? 0 : dx_get_block(at);
+	u32 mode;
+
+	/* NOOP if htree is well protected or caller doesn't require the lock */
+	if (ext4_htree_safe_locked(lck) ||
+	    !(ext4_htree_lock_data(lck)->ld_flags & lmask))
+		return 1;
+
+	mode = (ext4_htree_lock_data(lck)->ld_flags & lmask) == lmask ?
+		HTREE_LOCK_PW : HTREE_LOCK_PR;
+	while (1) {
+		if (htree_node_lock_try(lck, mode, key, ffz(~lmask), wait, ev))
+			return 1;
+		if (!(lmask & EXT4_LB_SPIN)) /* not a spinlock */
+			return 0;
+		cpu_relax(); /* spin until granted */
+	}
+}
+
+static int ext4_htree_node_locked(struct htree_lock *lck, unsigned lmask)
+{
+	return ext4_htree_safe_locked(lck) ||
+	       htree_node_is_granted(lck, ffz(~lmask));
+}
+
+static void ext4_htree_node_unlock(struct htree_lock *lck,
+				   unsigned int lmask, void *buf)
+{
+	/* NB: it's safe to call mutiple times or even it's not locked */
+	if (!ext4_htree_safe_locked(lck) &&
+	    htree_node_is_granted(lck, ffz(~lmask)))
+		htree_node_unlock(lck, ffz(~lmask), buf);
+}
+
+#define ext4_htree_dx_lock(lck, key)		\
+	ext4_htree_node_lock(lck, key, EXT4_LB_DX, 1, NULL)
+#define ext4_htree_dx_lock_try(lck, key)	\
+	ext4_htree_node_lock(lck, key, EXT4_LB_DX, 0, NULL)
+#define ext4_htree_dx_unlock(lck)		\
+	ext4_htree_node_unlock(lck, EXT4_LB_DX, NULL)
+#define ext4_htree_dx_locked(lck)		\
+	ext4_htree_node_locked(lck, EXT4_LB_DX)
+
+static void ext4_htree_dx_need_lock(struct htree_lock *lck)
+{
+	struct ext4_dir_lock_data *ld;
+
+	if (ext4_htree_safe_locked(lck))
+		return;
+
+	ld = ext4_htree_lock_data(lck);
+	switch (ld->ld_flags) {
+	default:
+		return;
+	case EXT4_HLOCK_LOOKUP:
+		ld->ld_flags = EXT4_HLOCK_LOOKUP_SAFE;
+		return;
+	case EXT4_HLOCK_DEL:
+		ld->ld_flags = EXT4_HLOCK_DEL_SAFE;
+		return;
+	case EXT4_HLOCK_ADD:
+		ld->ld_flags = EXT4_HLOCK_SPLIT;
+		return;
+	}
+}
+
+#define ext4_htree_de_lock(lck, key)		\
+	ext4_htree_node_lock(lck, key, EXT4_LB_DE, 1, NULL)
+#define ext4_htree_de_unlock(lck)		\
+	ext4_htree_node_unlock(lck, EXT4_LB_DE, NULL)
+
+#define ext4_htree_spin_lock(lck, key, event)	\
+	ext4_htree_node_lock(lck, key, EXT4_LB_SPIN, 0, event)
+#define ext4_htree_spin_unlock(lck)		\
+	ext4_htree_node_unlock(lck, EXT4_LB_SPIN, NULL)
+#define ext4_htree_spin_unlock_listen(lck, p)	\
+	ext4_htree_node_unlock(lck, EXT4_LB_SPIN, p)
+
+static void ext4_htree_spin_stop_listen(struct htree_lock *lck)
+{
+	if (!ext4_htree_safe_locked(lck) &&
+	    htree_node_is_listening(lck, ffz(~EXT4_LB_SPIN)))
+		htree_node_stop_listen(lck, ffz(~EXT4_LB_SPIN));
+}
+
+enum {
+	DX_HASH_COL_IGNORE,	/* ignore collision while probing frames */
+	DX_HASH_COL_YES,	/* there is collision and it does matter */
+	DX_HASH_COL_NO,	/* there is no collision */
+};
+
+static int dx_probe_hash_collision(struct htree_lock *lck,
+				   struct dx_entry *entries,
+				   struct dx_entry *at, u32 hash)
+{
+	if (!(lck && ext4_htree_lock_data(lck)->ld_flags & EXT4_LB_EXACT)) {
+		return DX_HASH_COL_IGNORE; /* don't care about collision */
+	} else if (at == entries + dx_get_count(entries) - 1) {
+		return DX_HASH_COL_IGNORE; /* not in any leaf of this DX */
+	} else { /* hash collision? */
+		return ((dx_get_hash(at + 1) & ~1) == hash) ?
+			DX_HASH_COL_YES : DX_HASH_COL_NO;
+	}
+}
+
 /*
  * Probe for a directory leaf block to search.
  *
@@ -745,10 +977,11 @@ struct stats dx_show_entries(struct dx_hash_info *hinfo, struct inode *dir,
  */
 static struct dx_frame *
 dx_probe(struct ext4_filename *fname, struct inode *dir,
-	 struct dx_hash_info *hinfo, struct dx_frame *frame_in)
+	 struct dx_hash_info *hinfo, struct dx_frame *frame_in,
+	 struct htree_lock *lck)
 {
 	unsigned count, indirect;
-	struct dx_entry *at, *entries, *p, *q, *m;
+	struct dx_entry *at, *entries, *p, *q, *m, *dx = NULL;
 	struct dx_root_info *info;
 	struct dx_frame *frame = frame_in;
 	struct dx_frame *ret_err = ERR_PTR(ERR_BAD_DX_DIR);
@@ -811,6 +1044,13 @@ struct stats dx_show_entries(struct dx_hash_info *hinfo, struct inode *dir,
 
 	dxtrace(printk("Look up %x", hash));
 	while (1) {
+		if (indirect == 0) { /* the last index level */
+			/* NB: ext4_htree_dx_lock() could be noop if
+			 * DX-lock flag is not set for current operation
+			 */
+			ext4_htree_dx_lock(lck, dx);
+			ext4_htree_spin_lock(lck, dx, NULL);
+		}
 		count = dx_get_count(entries);
 		if (!count || count > dx_get_limit(entries)) {
 			ext4_warning_inode(dir,
@@ -851,8 +1091,75 @@ struct stats dx_show_entries(struct dx_hash_info *hinfo, struct inode *dir,
 			       dx_get_block(at)));
 		frame->entries = entries;
 		frame->at = at;
-		if (!indirect--)
+
+		if (indirect == 0) { /* the last index level */
+			struct ext4_dir_lock_data *ld;
+			u64 myblock;
+
+			/* By default we only lock DE-block, however, we will
+			 * also lock the last level DX-block if:
+			 * a) there is hash collision
+			 *    we will set DX-lock flag (a few lines below)
+			 *    and redo to lock DX-block
+			 *    see detail in dx_probe_hash_collision()
+			 * b) it's a retry from splitting
+			 *    we need to lock the last level DX-block so nobody
+			 *    else can split any leaf blocks under the same
+			 *    DX-block, see detail in ext4_dx_add_entry()
+			 */
+			if (ext4_htree_dx_locked(lck)) {
+				/* DX-block is locked, just lock DE-block
+				 * and return
+				 */
+				ext4_htree_spin_unlock(lck);
+				if (!ext4_htree_safe_locked(lck))
+					ext4_htree_de_lock(lck, frame->at);
+				return frame;
+			}
+			/* it's pdirop and no DX lock */
+			if (dx_probe_hash_collision(lck, entries, at, hash) ==
+			    DX_HASH_COL_YES) {
+				/* found hash collision, set DX-lock flag
+				 * and retry to abtain DX-lock
+				 */
+				ext4_htree_spin_unlock(lck);
+				ext4_htree_dx_need_lock(lck);
+				continue;
+			}
+			ld = ext4_htree_lock_data(lck);
+			/* because I don't lock DX, so @at can't be trusted
+			 * after I release spinlock so I have to save it
+			 */
+			ld->ld_at = at;
+			ld->ld_at_entry = *at;
+			ld->ld_count = dx_get_count(entries);
+
+			frame->at = &ld->ld_at_entry;
+			myblock = dx_get_block(at);
+
+			/* NB: ordering locking */
+			ext4_htree_spin_unlock_listen(lck, &myblock);
+			/* other thread can split this DE-block because:
+			 * a) I don't have lock for the DE-block yet
+			 * b) I released spinlock on DX-block
+			 * if it happened I can detect it by listening
+			 * splitting event on this DE-block
+			 */
+			ext4_htree_de_lock(lck, frame->at);
+			ext4_htree_spin_stop_listen(lck);
+
+			if (myblock == EXT4_HTREE_NODE_CHANGED) {
+				/* someone split this DE-block before
+				 * I locked it, I need to retry and lock
+				 * valid DE-block
+				 */
+				ext4_htree_de_unlock(lck);
+				continue;
+			}
 			return frame;
+		}
+		dx = at;
+		indirect--;
 		frame++;
 		frame->bh = ext4_read_dirblock(dir, dx_get_block(at), INDEX);
 		if (IS_ERR(frame->bh)) {
@@ -921,7 +1228,7 @@ static void dx_release(struct dx_frame *frames)
 static int ext4_htree_next_block(struct inode *dir, __u32 hash,
 				 struct dx_frame *frame,
 				 struct dx_frame *frames,
-				 __u32 *start_hash)
+				 u32 *start_hash, struct htree_lock *lck)
 {
 	struct dx_frame *p;
 	struct buffer_head *bh;
@@ -936,12 +1243,23 @@ static int ext4_htree_next_block(struct inode *dir, __u32 hash,
 	 * this loop, num_frames indicates the number of interior
 	 * nodes need to be read.
 	 */
+	ext4_htree_de_unlock(lck);
 	while (1) {
-		if (++(p->at) < p->entries + dx_get_count(p->entries))
-			break;
+		if (num_frames > 0 || ext4_htree_dx_locked(lck)) {
+			/* num_frames > 0 :
+			 *   DX block
+			 * ext4_htree_dx_locked:
+			 *   frame->at is reliable pointer returned by dx_probe,
+			 *   otherwise dx_probe already knew no collision
+			 */
+			if (++(p->at) < p->entries + dx_get_count(p->entries))
+				break;
+		}
 		if (p == frames)
 			return 0;
 		num_frames++;
+		if (num_frames == 1)
+			ext4_htree_dx_unlock(lck);
 		p--;
 	}
 
@@ -964,6 +1282,14 @@ static int ext4_htree_next_block(struct inode *dir, __u32 hash,
 	 * block so no check is necessary
 	 */
 	while (num_frames--) {
+		if (num_frames == 0) {
+			/* it's not always necessary, we just don't want to
+			 * detect hash collision again
+			 */
+			ext4_htree_dx_need_lock(lck);
+			ext4_htree_dx_lock(lck, p->at);
+		}
+
 		bh = ext4_read_dirblock(dir, dx_get_block(p->at), INDEX);
 		if (IS_ERR(bh))
 			return PTR_ERR(bh);
@@ -972,10 +1298,10 @@ static int ext4_htree_next_block(struct inode *dir, __u32 hash,
 		p->bh = bh;
 		p->at = p->entries = ((struct dx_node *) bh->b_data)->entries;
 	}
+	ext4_htree_de_lock(lck, p->at);
 	return 1;
 }
 
-
 /*
  * This function fills a red-black tree with information from a
  * directory block.  It returns the number directory entries loaded
@@ -1119,7 +1445,8 @@ int ext4_htree_fill_tree(struct file *dir_file, __u32 start_hash,
 	}
 	hinfo.hash = start_hash;
 	hinfo.minor_hash = 0;
-	frame = dx_probe(NULL, dir, &hinfo, frames);
+	/* assume it's PR locked */
+	frame = dx_probe(NULL, dir, &hinfo, frames, NULL);
 	if (IS_ERR(frame))
 		return PTR_ERR(frame);
 
@@ -1162,7 +1489,7 @@ int ext4_htree_fill_tree(struct file *dir_file, __u32 start_hash,
 		count += ret;
 		hashval = ~0;
 		ret = ext4_htree_next_block(dir, HASH_NB_ALWAYS,
-					    frame, frames, &hashval);
+					    frame, frames, &hashval, NULL);
 		*next_hash = hashval;
 		if (ret < 0) {
 			err = ret;
@@ -1400,7 +1727,7 @@ static int is_dx_internal_node(struct inode *dir, ext4_lblk_t block,
 static struct buffer_head *__ext4_find_entry(struct inode *dir,
 					     struct ext4_filename *fname,
 					     struct ext4_dir_entry_2 **res_dir,
-					     int *inlined)
+					     int *inlined, struct htree_lock *lck)
 {
 	struct super_block *sb;
 	struct buffer_head *bh_use[NAMEI_RA_SIZE];
@@ -1442,7 +1769,7 @@ static struct buffer_head *__ext4_find_entry(struct inode *dir,
 		goto restart;
 	}
 	if (is_dx(dir)) {
-		ret = ext4_dx_find_entry(dir, fname, res_dir);
+		ret = ext4_dx_find_entry(dir, fname, res_dir, lck);
 		/*
 		 * On success, or if the error was file not found,
 		 * return.  Otherwise, fall back to doing a search the
@@ -1452,6 +1779,7 @@ static struct buffer_head *__ext4_find_entry(struct inode *dir,
 			goto cleanup_and_exit;
 		dxtrace(printk(KERN_DEBUG "ext4_find_entry: dx failed, "
 			       "falling back\n"));
+		ext4_htree_safe_relock(lck);
 		ret = NULL;
 	}
 	nblocks = dir->i_size >> EXT4_BLOCK_SIZE_BITS(sb);
@@ -1540,10 +1868,10 @@ static struct buffer_head *__ext4_find_entry(struct inode *dir,
 	return ret;
 }
 
-static struct buffer_head *ext4_find_entry(struct inode *dir,
+struct buffer_head *ext4_find_entry_locked(struct inode *dir,
 					   const struct qstr *d_name,
 					   struct ext4_dir_entry_2 **res_dir,
-					   int *inlined)
+					   int *inlined, struct htree_lock *lck)
 {
 	int err;
 	struct ext4_filename fname;
@@ -1555,11 +1883,12 @@ static struct buffer_head *ext4_find_entry(struct inode *dir,
 	if (err)
 		return ERR_PTR(err);
 
-	bh = __ext4_find_entry(dir, &fname, res_dir, inlined);
+	bh = __ext4_find_entry(dir, &fname, res_dir, inlined, lck);
 
 	ext4_fname_free_filename(&fname);
 	return bh;
 }
+EXPORT_SYMBOL(ext4_find_entry_locked);
 
 static struct buffer_head *ext4_lookup_entry(struct inode *dir,
 					     struct dentry *dentry,
@@ -1575,7 +1904,7 @@ static struct buffer_head *ext4_lookup_entry(struct inode *dir,
 	if (err)
 		return ERR_PTR(err);
 
-	bh = __ext4_find_entry(dir, &fname, res_dir, NULL);
+	bh = __ext4_find_entry(dir, &fname, res_dir, NULL, NULL);
 
 	ext4_fname_free_filename(&fname);
 	return bh;
@@ -1583,7 +1912,8 @@ static struct buffer_head *ext4_lookup_entry(struct inode *dir,
 
 static struct buffer_head * ext4_dx_find_entry(struct inode *dir,
 			struct ext4_filename *fname,
-			struct ext4_dir_entry_2 **res_dir)
+			struct ext4_dir_entry_2 **res_dir,
+			struct htree_lock *lck)
 {
 	struct super_block * sb = dir->i_sb;
 	struct dx_frame frames[EXT4_HTREE_LEVEL], *frame;
@@ -1594,7 +1924,7 @@ static struct buffer_head * ext4_dx_find_entry(struct inode *dir,
 #ifdef CONFIG_FS_ENCRYPTION
 	*res_dir = NULL;
 #endif
-	frame = dx_probe(fname, dir, NULL, frames);
+	frame = dx_probe(fname, dir, NULL, frames, lck);
 	if (IS_ERR(frame))
 		return (struct buffer_head *) frame;
 	do {
@@ -1616,7 +1946,7 @@ static struct buffer_head * ext4_dx_find_entry(struct inode *dir,
 
 		/* Check to see if we should continue to search */
 		retval = ext4_htree_next_block(dir, fname->hinfo.hash, frame,
-					       frames, NULL);
+					       frames, NULL, lck);
 		if (retval < 0) {
 			ext4_warning_inode(dir,
 				"error %d reading directory index block",
@@ -1799,8 +2129,9 @@ static struct ext4_dir_entry_2* dx_pack_dirents(char *base, unsigned blocksize)
  * Returns pointer to de in block into which the new entry will be inserted.
  */
 static struct ext4_dir_entry_2 *do_split(handle_t *handle, struct inode *dir,
-			struct buffer_head **bh,struct dx_frame *frame,
-			struct dx_hash_info *hinfo)
+			struct buffer_head **bh, struct dx_frame *frames,
+			struct dx_frame *frame, struct dx_hash_info *hinfo,
+			struct htree_lock *lck)
 {
 	unsigned blocksize = dir->i_sb->s_blocksize;
 	unsigned count, continued;
@@ -1862,8 +2193,15 @@ static struct ext4_dir_entry_2 *do_split(handle_t *handle, struct inode *dir,
 					hash2, split, count-split));
 
 	/* Fancy dance to stay within two buffers */
-	de2 = dx_move_dirents(data1, data2, map + split, count - split,
-			      blocksize);
+	if (hinfo->hash < hash2) {
+		de2 = dx_move_dirents(data1, data2, map + split,
+				      count - split, blocksize);
+	} else {
+		/* make sure we will add entry to the same block which
+		 * we have already locked
+		 */
+		de2 = dx_move_dirents(data1, data2, map, split, blocksize);
+	}
 	de = dx_pack_dirents(data1, blocksize);
 	de->rec_len = ext4_rec_len_to_disk(data1 + (blocksize - csum_size) -
 					   (char *) de,
@@ -1884,12 +2222,20 @@ static struct ext4_dir_entry_2 *do_split(handle_t *handle, struct inode *dir,
 	dxtrace(dx_show_leaf(dir, hinfo, (struct ext4_dir_entry_2 *) data2,
 			blocksize, 1));
 
-	/* Which block gets the new entry? */
-	if (hinfo->hash >= hash2) {
-		swap(*bh, bh2);
-		de = de2;
+	ext4_htree_spin_lock(lck, frame > frames ? (frame - 1)->at : NULL,
+			     frame->at); /* notify block is being split */
+	if (hinfo->hash < hash2) {
+		dx_insert_block(frame, hash2 + continued, newblock);
+	} else {
+		/* switch block number */
+		dx_insert_block(frame, hash2 + continued,
+				dx_get_block(frame->at));
+		dx_set_block(frame->at, newblock);
+		(frame->at)++;
 	}
-	dx_insert_block(frame, hash2 + continued, newblock);
+	ext4_htree_spin_unlock(lck);
+	ext4_htree_dx_unlock(lck);
+
 	err = ext4_handle_dirty_dirent_node(handle, dir, bh2);
 	if (err)
 		goto journal_error;
@@ -2167,7 +2513,7 @@ static int make_indexed_dir(handle_t *handle, struct ext4_filename *fname,
 	if (retval)
 		goto out_frames;	
 
-	de = do_split(handle,dir, &bh2, frame, &fname->hinfo);
+	de = do_split(handle, dir, &bh2, frames, frame, &fname->hinfo, NULL);
 	if (IS_ERR(de)) {
 		retval = PTR_ERR(de);
 		goto out_frames;
@@ -2276,8 +2622,8 @@ static int ext4_update_dotdot(handle_t *handle, struct dentry *dentry,
  * may not sleep between calling this and putting something into
  * the entry, as someone else might have used it while you slept.
  */
-static int ext4_add_entry(handle_t *handle, struct dentry *dentry,
-			  struct inode *inode)
+int ext4_add_entry_locked(handle_t *handle, struct dentry *dentry,
+			  struct inode *inode, struct htree_lock *lck)
 {
 	struct inode *dir = d_inode(dentry->d_parent);
 	struct buffer_head *bh = NULL;
@@ -2326,9 +2672,10 @@ static int ext4_add_entry(handle_t *handle, struct dentry *dentry,
 		if (dentry->d_name.len == 2 &&
 		    memcmp(dentry->d_name.name, "..", 2) == 0)
 			return ext4_update_dotdot(handle, dentry, inode);
-		retval = ext4_dx_add_entry(handle, &fname, dir, inode);
+		retval = ext4_dx_add_entry(handle, &fname, dir, inode, lck);
 		if (!retval || (retval != ERR_BAD_DX_DIR))
 			goto out;
+		ext4_htree_safe_relock(lck);
 		ext4_clear_inode_flag(dir, EXT4_INODE_INDEX);
 		dx_fallback++;
 		ext4_mark_inode_dirty(handle, dir);
@@ -2378,12 +2725,14 @@ static int ext4_add_entry(handle_t *handle, struct dentry *dentry,
 		ext4_set_inode_state(inode, EXT4_STATE_NEWENTRY);
 	return retval;
 }
+EXPORT_SYMBOL(ext4_add_entry_locked);
 
 /*
  * Returns 0 for success, or a negative error value
  */
 static int ext4_dx_add_entry(handle_t *handle, struct ext4_filename *fname,
-			     struct inode *dir, struct inode *inode)
+			     struct inode *dir, struct inode *inode,
+			     struct htree_lock *lck)
 {
 	struct dx_frame frames[EXT4_HTREE_LEVEL], *frame;
 	struct dx_entry *entries, *at;
@@ -2395,7 +2744,7 @@ static int ext4_dx_add_entry(handle_t *handle, struct ext4_filename *fname,
 
 again:
 	restart = 0;
-	frame = dx_probe(fname, dir, NULL, frames);
+	frame = dx_probe(fname, dir, NULL, frames, lck);
 	if (IS_ERR(frame))
 		return PTR_ERR(frame);
 	entries = frame->entries;
@@ -2430,6 +2779,12 @@ static int ext4_dx_add_entry(handle_t *handle, struct ext4_filename *fname,
 		struct dx_node *node2;
 		struct buffer_head *bh2;
 
+		if (!ext4_htree_safe_locked(lck)) { /* retry with EX lock */
+			ext4_htree_safe_relock(lck);
+			restart = 1;
+			goto cleanup;
+		}
+
 		while (frame > frames) {
 			if (dx_get_count((frame - 1)->entries) <
 			    dx_get_limit((frame - 1)->entries)) {
@@ -2533,8 +2888,34 @@ static int ext4_dx_add_entry(handle_t *handle, struct ext4_filename *fname,
 			restart = 1;
 			goto journal_error;
 		}
+	} else if (!ext4_htree_dx_locked(lck)) {
+		struct ext4_dir_lock_data *ld = ext4_htree_lock_data(lck);
+
+		/* not well protected, require DX lock */
+		ext4_htree_dx_need_lock(lck);
+		at = frame > frames ? (frame - 1)->at : NULL;
+
+		/* NB: no risk of deadlock because it's just a try.
+		 *
+		 * NB: we check ld_count for twice, the first time before
+		 * having DX lock, the second time after holding DX lock.
+		 *
+		 * NB: We never free blocks for directory so far, which
+		 * means value returned by dx_get_count() should equal to
+		 * ld->ld_count if nobody split any DE-block under @at,
+		 * and ld->ld_at still points to valid dx_entry.
+		 */
+		if ((ld->ld_count != dx_get_count(entries)) ||
+		    !ext4_htree_dx_lock_try(lck, at) ||
+		   (ld->ld_count != dx_get_count(entries))) {
+			restart = 1;
+			goto cleanup;
+		}
+		/* OK, I've got DX lock and nothing changed */
+		frame->at = ld->ld_at;
 	}
-	de = do_split(handle, dir, &bh, frame, &fname->hinfo);
+
+	de = do_split(handle, dir, &bh, frames, frame, &fname->hinfo, lck);
 	if (IS_ERR(de)) {
 		err = PTR_ERR(de);
 		goto cleanup;
@@ -2545,6 +2926,8 @@ static int ext4_dx_add_entry(handle_t *handle, struct ext4_filename *fname,
 journal_error:
 	ext4_std_error(dir->i_sb, err); /* this is a no-op if err == 0 */
 cleanup:
+	ext4_htree_dx_unlock(lck);
+	ext4_htree_de_unlock(lck);
 	brelse(bh);
 	dx_release(frames);
 	/* @restart is true means htree-path has been changed, we need to
diff --git a/fs/ext4/super.c b/fs/ext4/super.c
index 564eb35..07242d7 100644
--- a/fs/ext4/super.c
+++ b/fs/ext4/super.c
@@ -1077,6 +1077,7 @@ static struct inode *ext4_alloc_inode(struct super_block *sb)
 		return NULL;
 
 	inode_set_iversion(&ei->vfs_inode, 1);
+	sema_init(&ei->i_append_sem, 1);
 	spin_lock_init(&ei->i_raw_lock);
 	INIT_LIST_HEAD(&ei->i_prealloc_list);
 	spin_lock_init(&ei->i_prealloc_lock);
-- 
1.8.3.1

  parent reply	other threads:[~2019-07-22  1:23 UTC|newest]

Thread overview: 53+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-07-22  1:23 [lustre-devel] [PATCH 00/22] [RFC] ldiskfs patches against 5.2-rc2+ James Simmons
2019-07-22  1:23 ` [lustre-devel] [PATCH 01/22] ext4: add i_fs_version James Simmons
2019-07-22  4:13   ` NeilBrown
2019-07-23  0:07     ` James Simmons
2019-07-31 22:03     ` Andreas Dilger
2019-07-22  1:23 ` [lustre-devel] [PATCH 02/22] ext4: use d_find_alias() in ext4_lookup James Simmons
2019-07-22  4:16   ` NeilBrown
2019-07-22  1:23 ` [lustre-devel] [PATCH 03/22] ext4: prealloc table optimization James Simmons
2019-07-22  4:29   ` NeilBrown
2019-08-05  7:07   ` Artem Blagodarenko
2019-07-22  1:23 ` [lustre-devel] [PATCH 04/22] ext4: export inode management James Simmons
2019-07-22  4:34   ` NeilBrown
2019-07-22  7:16     ` Oleg Drokin
2019-07-22  1:23 ` [lustre-devel] [PATCH 05/22] ext4: various misc changes James Simmons
2019-07-22  1:23 ` [lustre-devel] [PATCH 06/22] ext4: add extra checks for mballoc James Simmons
2019-07-22  4:37   ` NeilBrown
2019-07-22  1:23 ` [lustre-devel] [PATCH 07/22] ext4: update .. for hash indexed directory James Simmons
2019-07-22  4:45   ` NeilBrown
2019-07-22  1:23 ` [lustre-devel] [PATCH 08/22] ext4: kill off struct dx_root James Simmons
2019-07-22  4:52   ` NeilBrown
2019-07-23  2:07     ` Andreas Dilger
2019-08-05  7:31     ` Artem Blagodarenko
2019-07-22  1:23 ` [lustre-devel] [PATCH 09/22] ext4: fix mballoc pa free mismatch James Simmons
2019-07-22  4:56   ` NeilBrown
2019-07-22  1:23 ` [lustre-devel] [PATCH 10/22] ext4: add data in dentry feature James Simmons
2019-07-22  1:23 ` [lustre-devel] [PATCH 11/22] ext4: over ride current_time James Simmons
2019-07-22  5:06   ` NeilBrown
2019-07-22  1:23 ` James Simmons [this message]
2019-07-22  5:10   ` [lustre-devel] [PATCH 12/22] ext4: add htree lock implementation NeilBrown
2019-07-22  1:23 ` [lustre-devel] [PATCH 13/22] ext4: Add a proc interface for max_dir_size James Simmons
2019-07-22  5:14   ` NeilBrown
2019-07-22  1:23 ` [lustre-devel] [PATCH 14/22] ext4: remove inode_lock handling James Simmons
2019-07-22  5:16   ` NeilBrown
2019-07-22  1:23 ` [lustre-devel] [PATCH 15/22] ext4: remove bitmap corruption warnings James Simmons
2019-07-22  5:18   ` NeilBrown
2019-07-22  1:23 ` [lustre-devel] [PATCH 16/22] ext4: add warning for directory htree growth James Simmons
2019-07-22  5:24   ` NeilBrown
2019-07-22  1:23 ` [lustre-devel] [PATCH 17/22] ext4: optimize ext4_journal_callback_add James Simmons
2019-07-22  5:27   ` NeilBrown
2019-07-23  2:01     ` Andreas Dilger
2019-07-22  1:23 ` [lustre-devel] [PATCH 18/22] ext4: attach jinode in writepages James Simmons
2019-07-22  1:23 ` [lustre-devel] [PATCH 19/22] ext4: don't check before replay James Simmons
2019-07-22  5:29   ` NeilBrown
     [not found]     ` <506765DD-0068-469E-ADA4-2C71B8B60114@cloudlinux.com>
2019-07-22  6:46       ` NeilBrown
2019-07-22  6:56         ` Oleg Drokin
2019-07-22  9:51           ` Alexey Lyashkov
2019-07-23  1:57     ` Andreas Dilger
2019-07-23  2:01       ` Oleg Drokin
2019-07-22  1:23 ` [lustre-devel] [PATCH 20/22] ext4: use GFP_NOFS in ext4_inode_attach_jinode James Simmons
2019-07-22  5:30   ` NeilBrown
2019-07-23  1:56     ` Andreas Dilger
2019-07-22  1:23 ` [lustre-devel] [PATCH 21/22] ext4: export ext4_orphan_add James Simmons
2019-07-22  1:23 ` [lustre-devel] [PATCH 22/22] ext4: export mb stream allocator variables James Simmons

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1563758631-29550-13-git-send-email-jsimmons@infradead.org \
    --to=jsimmons@infradead.org \
    --cc=lustre-devel@lists.lustre.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.