parallel directory operations for EXT3. htree consists of index blocks and leaves. leaves are protected against modifications and lookups. so, any process which wants to operate on some leaf have to lock this leaf. in order to decrease lock collisions while splitting we use bit-based spinlock to protect index blocks. this mean we can't sleep in the middle of index block modification and we have to make modifications in strict order, so that any user walking through the htree will not lose any leaf and will not select wrong leaf. diff -puN fs/ext3/namei.c~ext3-pdirops fs/ext3/namei.c --- linux-2.5.73/fs/ext3/namei.c~ext3-pdirops Mon Jul 7 00:00:24 2003 +++ linux-2.5.73-alexey/fs/ext3/namei.c Mon Jul 7 00:10:33 2003 @@ -53,6 +53,9 @@ static struct buffer_head *ext3_append(h { struct buffer_head *bh; + /* with parallel dir operations all appends + * have to be serialized -bzzz */ + down(&EXT3_I(inode)->i_append_sem); *block = inode->i_size >> inode->i_sb->s_blocksize_bits; if ((bh = ext3_bread(handle, inode, *block, 1, err))) { @@ -60,6 +63,8 @@ static struct buffer_head *ext3_append(h EXT3_I(inode)->i_disksize = inode->i_size; ext3_journal_get_write_access(handle,bh); } + up(&EXT3_I(inode)->i_append_sem); + return bh; } @@ -136,6 +141,8 @@ struct dx_frame struct buffer_head *bh; struct dx_entry *entries; struct dx_entry *at; + unsigned long leaf; + unsigned int curidx; }; struct dx_map_entry @@ -144,6 +151,32 @@ struct dx_map_entry u32 offs; }; +/* FIXME: this should be reworked using bb_spin_lock + * introduced in -mm tree + */ +#define BH_DXLock 25 + +static inline void dx_lock_bh(struct buffer_head volatile *bh) +{ + preempt_disable(); +#ifdef CONFIG_SMP + while (test_and_set_bit(BH_DXLock, &bh->b_state)) { + while (test_bit(BH_DXLock, &bh->b_state)) + cpu_relax(); + } +#endif +} + +static inline void dx_unlock_bh(struct buffer_head *bh) +{ +#ifdef CONFIG_SMP + smp_mb__before_clear_bit(); + clear_bit(BH_DXLock, &bh->b_state); +#endif + preempt_enable(); +} + + #ifdef CONFIG_EXT3_INDEX static inline unsigned dx_get_block (struct dx_entry *entry); static void dx_set_block (struct dx_entry *entry, unsigned value); @@ -155,7 +188,7 @@ static void dx_set_count (struct dx_entr static void dx_set_limit (struct dx_entry *entries, unsigned value); static unsigned dx_root_limit (struct inode *dir, unsigned infosize); static unsigned dx_node_limit (struct inode *dir); -static struct dx_frame *dx_probe(struct dentry *dentry, +static struct dx_frame *dx_probe(struct qstr *name, struct inode *dir, struct dx_hash_info *hinfo, struct dx_frame *frame, @@ -167,15 +200,19 @@ static void dx_sort_map(struct dx_map_en static struct ext3_dir_entry_2 *dx_move_dirents (char *from, char *to, struct dx_map_entry *offsets, int count); static struct ext3_dir_entry_2* dx_pack_dirents (char *base, int size); -static void dx_insert_block (struct dx_frame *frame, u32 hash, u32 block); +static void dx_insert_block (struct inode *, struct dx_frame *, u32, u32, u32); static int ext3_htree_next_block(struct inode *dir, __u32 hash, struct dx_frame *frame, struct dx_frame *frames, __u32 *start_hash); static struct buffer_head * ext3_dx_find_entry(struct dentry *dentry, - struct ext3_dir_entry_2 **res_dir, int *err); + struct ext3_dir_entry_2 **res_dir, int *err, + int rwlock, void **lock); static int ext3_dx_add_entry(handle_t *handle, struct dentry *dentry, struct inode *inode); +static inline void *ext3_lock_htree(struct inode *, unsigned long, int); +static inline void ext3_unlock_htree(struct inode *, void *); + /* * Future: use high four bits of block for coalesce-on-delete flags @@ -319,6 +356,94 @@ struct stats dx_show_entries(struct dx_h #endif /* DX_DEBUG */ /* + * dx_find_position + * + * search position of specified hash in index + * + */ + +struct dx_entry * dx_find_position(struct dx_entry * entries, u32 hash) +{ + struct dx_entry *p, *q, *m; + int count; + + count = dx_get_count(entries); + p = entries + 1; + q = entries + count - 1; + while (p <= q) + { + m = p + (q - p)/2; + if (dx_get_hash(m) > hash) + q = m - 1; + else + p = m + 1; + } + return p - 1; +} + +/* + * returns 1 if path is unchanged + */ +int dx_check_path(struct dx_frame *frame, u32 hash) +{ + struct dx_entry *p; + int ret = 1; + + dx_lock_bh(frame->bh); + p = dx_find_position(frame->entries, hash); + if (frame->leaf != dx_get_block(p)) + ret = 0; + dx_unlock_bh(frame->bh); + + return ret; +} + +/* + * 0 - changed + * 1 - hasn't changed + */ +static int +dx_check_full_path(struct dx_frame *frames, struct dx_hash_info *hinfo) +{ + struct dx_entry *p; + struct dx_frame *frame = frames; + u32 leaf; + + /* check first level */ + dx_lock_bh(frame->bh); + p = dx_find_position(frame->entries, hinfo->hash); + leaf = dx_get_block(p); + dx_unlock_bh(frame->bh); + + if (leaf != frame->leaf) + return 0; + + /* is there 2nd level? */ + frame++; + if (frame->bh == NULL) + return 1; + + /* check second level */ + dx_lock_bh(frame->bh); + + /* probably 1st level got changed, check it */ + if (!dx_check_path(frames, hinfo->hash)) { + /* path changed */ + dx_unlock_bh(frame->bh); + return 0; + } + + p = dx_find_position(frame->entries, hinfo->hash); + leaf = dx_get_block(p); + dx_unlock_bh(frame->bh); + + if (leaf != frame->leaf) + return 0; + + return 1; +} + +/* * Probe for a directory leaf block to search. * * dx_probe can return ERR_BAD_DX_DIR, which means there was a format @@ -328,19 +453,20 @@ struct stats dx_show_entries(struct dx_h * back to userspace. */ static struct dx_frame * -dx_probe(struct dentry *dentry, struct inode *dir, +dx_probe(struct qstr *name, struct inode *dir, struct dx_hash_info *hinfo, struct dx_frame *frame_in, int *err) { - unsigned count, indirect; - struct dx_entry *at, *entries, *p, *q, *m; + unsigned indirect; + struct dx_entry *at, *entries; struct dx_root *root; struct buffer_head *bh; struct dx_frame *frame = frame_in; u32 hash; + unsigned int curidx; frame->bh = NULL; - if (dentry) - dir = dentry->d_parent->d_inode; + frame[1].bh = NULL; + if (!(bh = ext3_bread (NULL,dir, 0, 0, err))) goto fail; root = (struct dx_root *) bh->b_data; @@ -356,8 +482,8 @@ dx_probe(struct dentry *dentry, struct i } hinfo->hash_version = root->info.hash_version; hinfo->seed = EXT3_SB(dir->i_sb)->s_hash_seed; - if (dentry) - ext3fs_dirhash(dentry->d_name.name, dentry->d_name.len, hinfo); + if (name) + ext3fs_dirhash(name->name, name->len, hinfo); hash = hinfo->hash; if (root->info.unused_flags & 1) { @@ -369,7 +495,19 @@ dx_probe(struct dentry *dentry, struct i goto fail; } +repeat: + curidx = 0; + entries = (struct dx_entry *) (((char *)&root->info) + + root->info.info_length); + assert(dx_get_limit(entries) == dx_root_limit(dir, + root->info.info_length)); + dxtrace (printk("Look up %x", hash)); + dx_lock_bh(bh); + /* indirect must be initialized under bh lock because + * 2nd level creation procedure may change it and dx_probe() + * will suggest htree is still single-level -bzzz */ if ((indirect = root->info.indirect_levels) > 1) { + dx_unlock_bh(bh); ext3_warning(dir->i_sb, __FUNCTION__, "Unimplemented inode hash depth: %#06x", root->info.indirect_levels); @@ -377,56 +515,46 @@ dx_probe(struct dentry *dentry, struct i *err = ERR_BAD_DX_DIR; goto fail; } - - entries = (struct dx_entry *) (((char *)&root->info) + - root->info.info_length); - assert(dx_get_limit(entries) == dx_root_limit(dir, - root->info.info_length)); - dxtrace (printk("Look up %x", hash)); + while (1) { - count = dx_get_count(entries); - assert (count && count <= dx_get_limit(entries)); - p = entries + 1; - q = entries + count - 1; - while (p <= q) - { - m = p + (q - p)/2; - dxtrace(printk(".")); - if (dx_get_hash(m) > hash) - q = m - 1; - else - p = m + 1; - } - - if (0) // linear search cross check - { - unsigned n = count - 1; - at = entries; - while (n--) - { - dxtrace(printk(",")); - if (dx_get_hash(++at) > hash) - { - at--; - break; - } - } - assert (at == p - 1); - } - - at = p - 1; - dxtrace(printk(" %x->%u\n", at == entries? 0: dx_get_hash(at), dx_get_block(at))); + at = dx_find_position(entries, hinfo->hash); + dxtrace(printk(" %x->%u\n", + at == entries? 0: dx_get_hash(at), + dx_get_block(at))); frame->bh = bh; frame->entries = entries; frame->at = at; - if (!indirect--) return frame; - if (!(bh = ext3_bread (NULL,dir, dx_get_block(at), 0, err))) + frame->curidx = curidx; + frame->leaf = dx_get_block(at); + if (!indirect--) { + dx_unlock_bh(bh); + return frame; + } + + /* step into next htree level */ + curidx = dx_get_block(at); + dx_unlock_bh(bh); + if (!(bh = ext3_bread (NULL,dir, frame->leaf, 0, err))) goto fail2; + + dx_lock_bh(bh); + /* splitting may change root index block and move + * hash we're looking for into another index block + * so, we have to check this situation and repeat + * from begining if path got changed -bzzz */ + if (!dx_check_path(frame, hash)) { + dx_unlock_bh(bh); + bh = frame->bh; + indirect++; + goto repeat; + } + at = entries = ((struct dx_node *) bh->b_data)->entries; assert (dx_get_limit(entries) == dx_node_limit (dir)); frame++; } + dx_unlock_bh(bh); fail2: while (frame >= frame_in) { brelse(frame->bh); @@ -440,8 +568,7 @@ static void dx_release (struct dx_frame { if (frames[0].bh == NULL) return; - - if (((struct dx_root *) frames[0].bh->b_data)->info.indirect_levels) + if (frames[1].bh != NULL) brelse(frames[1].bh); brelse(frames[0].bh); } @@ -482,8 +609,10 @@ static int ext3_htree_next_block(struct * nodes need to be read. */ while (1) { - if (++(p->at) < p->entries + dx_get_count(p->entries)) + if (++(p->at) < p->entries + dx_get_count(p->entries)) { + p->leaf = dx_get_block(p->at); break; + } if (p == frames) return 0; num_frames++; @@ -509,13 +638,18 @@ static int ext3_htree_next_block(struct * block so no check is necessary */ while (num_frames--) { - if (!(bh = ext3_bread(NULL, dir, dx_get_block(p->at), + u32 idx; + + idx = p->leaf = dx_get_block(p->at); + if (!(bh = ext3_bread(NULL, dir, p->leaf, 0, &err))) return err; /* Failure */ p++; brelse (p->bh); p->bh = bh; p->at = p->entries = ((struct dx_node *) bh->b_data)->entries; + p->curidx = idx; + p->leaf = dx_get_block(p->at); } return 1; } @@ -602,7 +736,7 @@ int ext3_htree_fill_tree(struct file *di } hinfo.hash = start_hash; hinfo.minor_hash = 0; - frame = dx_probe(0, dir_file->f_dentry->d_inode, &hinfo, frames, &err); + frame = dx_probe(NULL, dir_file->f_dentry->d_inode, &hinfo, frames, &err); if (!frame) return err; @@ -674,7 +808,8 @@ static int dx_make_map (struct ext3_dir_ count++; } /* XXX: do we need to check rec_len == 0 case? -Chris */ - de = (struct ext3_dir_entry_2 *) ((char *) de + le16_to_cpu(de->rec_len)); + de = (struct ext3_dir_entry_2 *)((char*)de + + le16_to_cpu(de->rec_len)); } return count; } @@ -707,7 +842,8 @@ static void dx_sort_map (struct dx_map_e } while(more); } -static void dx_insert_block(struct dx_frame *frame, u32 hash, u32 block) +static void dx_insert_block(struct inode *dir, struct dx_frame *frame, + u32 hash, u32 block, u32 idx) { struct dx_entry *entries = frame->entries; struct dx_entry *old = frame->at, *new = old + 1; @@ -719,6 +855,7 @@ static void dx_insert_block(struct dx_fr dx_set_hash(new, hash); dx_set_block(new, block); dx_set_count(entries, count + 1); + } #endif @@ -799,7 +936,8 @@ static inline int search_dirblock(struct * to brelse() it when appropriate. */ static struct buffer_head * ext3_find_entry (struct dentry *dentry, - struct ext3_dir_entry_2 ** res_dir) + struct ext3_dir_entry_2 ** res_dir, + int rwlock, void **lock) { struct super_block * sb; struct buffer_head * bh_use[NAMEI_RA_SIZE]; @@ -815,6 +953,7 @@ static struct buffer_head * ext3_find_en int namelen; const u8 *name; unsigned blocksize; + int do_not_use_dx = 0; *res_dir = NULL; sb = dir->i_sb; @@ -823,9 +962,10 @@ static struct buffer_head * ext3_find_en name = dentry->d_name.name; if (namelen > EXT3_NAME_LEN) return NULL; +repeat: #ifdef CONFIG_EXT3_INDEX if (is_dx(dir)) { - bh = ext3_dx_find_entry(dentry, res_dir, &err); + bh = ext3_dx_find_entry(dentry, res_dir, &err, rwlock, lock); /* * On success, or if the error was file not found, * return. Otherwise, fall back to doing a search the @@ -834,8 +974,14 @@ static struct buffer_head * ext3_find_en if (bh || (err != ERR_BAD_DX_DIR)) return bh; dxtrace(printk("ext3_find_entry: dx failed, falling back\n")); + do_not_use_dx = 1; } #endif + *lock = ext3_lock_htree(dir, 0, rwlock); + if (is_dx(dir) && !do_not_use_dx) { + ext3_unlock_htree(dir, *lock); + goto repeat; + } nblocks = dir->i_size >> EXT3_BLOCK_SIZE_BITS(sb); start = EXT3_I(dir)->i_dir_start_lookup; if (start >= nblocks) @@ -906,12 +1052,17 @@ cleanup_and_exit: /* Clean up the read-ahead blocks */ for (; ra_ptr < ra_max; ra_ptr++) brelse (bh_use[ra_ptr]); + if (!ret) { + ext3_unlock_htree(dir, *lock); + *lock = NULL; + } return ret; } #ifdef CONFIG_EXT3_INDEX static struct buffer_head * ext3_dx_find_entry(struct dentry *dentry, - struct ext3_dir_entry_2 **res_dir, int *err) + struct ext3_dir_entry_2 **res_dir, int *err, + int rwlock, void **lock) { struct super_block * sb; struct dx_hash_info hinfo; @@ -920,34 +1071,47 @@ static struct buffer_head * ext3_dx_find struct ext3_dir_entry_2 *de, *top; struct buffer_head *bh; unsigned long block; - int retval; + int retval, offset; int namelen = dentry->d_name.len; const u8 *name = dentry->d_name.name; struct inode *dir = dentry->d_parent->d_inode; sb = dir->i_sb; - if (!(frame = dx_probe (dentry, 0, &hinfo, frames, err))) +repeat: + if (!(frame = dx_probe (&dentry->d_name, dir, &hinfo, frames, err))) return NULL; + + *lock = ext3_lock_htree(dir, frame->leaf, rwlock); + /* while locking leaf we just found may get splitted + * so, we need another leaf. check this */ + if (!dx_check_full_path(frames, &hinfo)) { + ext3_unlock_htree(dir, *lock); + dx_release(frames); + goto repeat; + } + hash = hinfo.hash; do { - block = dx_get_block(frame->at); + block = frame->leaf; if (!(bh = ext3_bread (NULL,dir, block, 0, err))) goto errout; de = (struct ext3_dir_entry_2 *) bh->b_data; top = (struct ext3_dir_entry_2 *) ((char *) de + sb->s_blocksize - EXT3_DIR_REC_LEN(0)); - for (; de < top; de = ext3_next_entry(de)) - if (ext3_match (namelen, name, de)) { - if (!ext3_check_dir_entry("ext3_find_entry", - dir, de, bh, - (block<b_data))) { - brelse (bh); - goto errout; + for (offset = 0; de < top; de = ext3_next_entry(de)) { + if (ext3_match (namelen, name, de)) { + if (!ext3_check_dir_entry("ext3_find_entry", + dir, de, bh, + (block<b_data))) { + brelse (bh); + goto errout; + } + *res_dir = de; + dx_release (frames); + return bh; } - *res_dir = de; - dx_release (frames); - return bh; + offset += le16_to_cpu(de->rec_len); } brelse (bh); /* Check to see if we should continue to search */ @@ -965,6 +1129,8 @@ static struct buffer_head * ext3_dx_find *err = -ENOENT; errout: dxtrace(printk("%s not found\n", name)); + ext3_unlock_htree(dir, *lock); + *lock = NULL; dx_release (frames); return NULL; } @@ -975,14 +1141,16 @@ static struct dentry *ext3_lookup(struct struct inode * inode; struct ext3_dir_entry_2 * de; struct buffer_head * bh; + void *lock; if (dentry->d_name.len > EXT3_NAME_LEN) return ERR_PTR(-ENAMETOOLONG); - bh = ext3_find_entry(dentry, &de); + bh = ext3_find_entry(dentry, &de, 0, &lock); inode = NULL; if (bh) { unsigned long ino = le32_to_cpu(de->inode); + ext3_unlock_htree(dir, lock); brelse (bh); inode = iget(dir->i_sb, ino); @@ -995,7 +1163,6 @@ static struct dentry *ext3_lookup(struct return NULL; } - struct dentry *ext3_get_parent(struct dentry *child) { unsigned long ino; @@ -1004,16 +1171,18 @@ struct dentry *ext3_get_parent(struct de struct dentry dotdot; struct ext3_dir_entry_2 * de; struct buffer_head *bh; + void *lock; dotdot.d_name.name = ".."; dotdot.d_name.len = 2; dotdot.d_parent = child; /* confusing, isn't it! */ - bh = ext3_find_entry(&dotdot, &de); + bh = ext3_find_entry(&dotdot, &de, 0, &lock); inode = NULL; if (!bh) return ERR_PTR(-ENOENT); ino = le32_to_cpu(de->inode); + ext3_unlock_htree(child->d_inode, lock); brelse(bh); inode = iget(child->d_inode->i_sb, ino); @@ -1053,7 +1222,8 @@ dx_move_dirents(char *from, char *to, st unsigned rec_len = 0; while (count--) { - struct ext3_dir_entry_2 *de = (struct ext3_dir_entry_2 *) (from + map->offs); + struct ext3_dir_entry_2 *de = + (struct ext3_dir_entry_2 *) (from + map->offs); rec_len = EXT3_DIR_REC_LEN(de->name_len); memcpy (to, de, rec_len); ((struct ext3_dir_entry_2 *) to)->rec_len = @@ -1067,7 +1237,8 @@ dx_move_dirents(char *from, char *to, st static struct ext3_dir_entry_2* dx_pack_dirents(char *base, int size) { - struct ext3_dir_entry_2 *next, *to, *prev, *de = (struct ext3_dir_entry_2 *) base; + struct ext3_dir_entry_2 *next, *to, *prev; + struct ext3_dir_entry_2 *de = (struct ext3_dir_entry_2 *) base; unsigned rec_len = 0; prev = to = de; @@ -1089,7 +1260,8 @@ static struct ext3_dir_entry_2* dx_pack_ static struct ext3_dir_entry_2 *do_split(handle_t *handle, struct inode *dir, struct buffer_head **bh,struct dx_frame *frame, - struct dx_hash_info *hinfo, int *error) + struct dx_hash_info *hinfo, void **target, + int *error) { unsigned blocksize = dir->i_sb->s_blocksize; unsigned count, continued; @@ -1136,23 +1308,30 @@ static struct ext3_dir_entry_2 *do_split hash2 = map[split].hash; continued = hash2 == map[split - 1].hash; dxtrace(printk("Split block %i at %x, %i/%i\n", - dx_get_block(frame->at), hash2, split, count-split)); - + frame->leaf, hash2, split, count-split)); + /* Fancy dance to stay within two buffers */ de2 = dx_move_dirents(data1, data2, map + split, count - split); de = dx_pack_dirents(data1,blocksize); de->rec_len = cpu_to_le16(data1 + blocksize - (char *) de); de2->rec_len = cpu_to_le16(data2 + blocksize - (char *) de2); - dxtrace(dx_show_leaf (hinfo, (struct ext3_dir_entry_2 *) data1, blocksize, 1)); - dxtrace(dx_show_leaf (hinfo, (struct ext3_dir_entry_2 *) data2, blocksize, 1)); + dxtrace(dx_show_leaf(hinfo,(struct ext3_dir_entry_2*) data1, blocksize, 1)); + dxtrace(dx_show_leaf(hinfo,(struct ext3_dir_entry_2*) data2, blocksize, 1)); /* Which block gets the new entry? */ + *target = NULL; if (hinfo->hash >= hash2) { swap(*bh, bh2); de = de2; - } - dx_insert_block (frame, hash2 + continued, newblock); + + /* entry will be stored into new block + * we have to lock it before add_dirent_to_buf */ + *target = ext3_lock_htree(dir, newblock, 1); + } + dx_lock_bh(frame->bh); + dx_insert_block (dir, frame, hash2 + continued, newblock, frame->curidx); + dx_unlock_bh(frame->bh); err = ext3_journal_dirty_metadata (handle, bh2); if (err) goto journal_error; @@ -1226,7 +1405,8 @@ static int add_dirent_to_buf(handle_t *h nlen = EXT3_DIR_REC_LEN(de->name_len); rlen = le16_to_cpu(de->rec_len); if (de->inode) { - struct ext3_dir_entry_2 *de1 = (struct ext3_dir_entry_2 *)((char *)de + nlen); + struct ext3_dir_entry_2 *de1 = + (struct ext3_dir_entry_2 *)((char *)de + nlen); de1->rec_len = cpu_to_le16(rlen - nlen); de->rec_len = cpu_to_le16(nlen); de = de1; @@ -1284,7 +1464,8 @@ static int make_indexed_dir(handle_t *ha unsigned blocksize; struct dx_hash_info hinfo; u32 block; - + void *lock, *new_lock; + blocksize = dir->i_sb->s_blocksize; dxtrace(printk("Creating index\n")); retval = ext3_journal_get_write_access(handle, bh); @@ -1295,7 +1476,6 @@ static int make_indexed_dir(handle_t *ha } root = (struct dx_root *) bh->b_data; - EXT3_I(dir)->i_flags |= EXT3_INDEX_FL; bh2 = ext3_append (handle, dir, &block, &retval); if (!(bh2)) { brelse(bh); @@ -1303,6 +1483,8 @@ static int make_indexed_dir(handle_t *ha } data1 = bh2->b_data; + lock = ext3_lock_htree(dir, block, 1); + /* The 0th block becomes the root, move the dirents out */ de = (struct ext3_dir_entry_2 *) &root->info; len = ((char *) root) + blocksize - (char *) de; @@ -1331,13 +1513,25 @@ static int make_indexed_dir(handle_t *ha frame->entries = entries; frame->at = entries; frame->bh = bh; + frame->curidx = 0; + frame->leaf = 0; + frame[1].bh = NULL; bh = bh2; - de = do_split(handle,dir, &bh, frame, &hinfo, &retval); + de = do_split(handle,dir, &bh, frame, &hinfo, &new_lock, &retval); dx_release (frames); if (!(de)) - return retval; + goto cleanup; - return add_dirent_to_buf(handle, dentry, inode, de, bh); + retval = add_dirent_to_buf(handle, dentry, inode, de, bh); +cleanup: + if (new_lock) + ext3_unlock_htree(dir, new_lock); + /* we mark directory indexed in order to + * avoid races while htree being created -bzzz */ + EXT3_I(dir)->i_flags |= EXT3_INDEX_FL; + ext3_unlock_htree(dir, lock); + + return retval; } #endif @@ -1366,11 +1560,13 @@ static int ext3_add_entry (handle_t *han unsigned blocksize; unsigned nlen, rlen; u32 block, blocks; + void *lock; sb = dir->i_sb; blocksize = sb->s_blocksize; if (!dentry->d_name.len) return -EINVAL; +repeat: #ifdef CONFIG_EXT3_INDEX if (is_dx(dir)) { retval = ext3_dx_add_entry(handle, dentry, inode); @@ -1381,36 +1577,53 @@ static int ext3_add_entry (handle_t *han ext3_mark_inode_dirty(handle, dir); } #endif + lock = ext3_lock_htree(dir, 0, 1); + if (is_dx(dir)) { + /* we got lock for block 0 + * probably previous holder of the lock + * created htree -bzzz */ + ext3_unlock_htree(dir, lock); + goto repeat; + } + blocks = dir->i_size >> sb->s_blocksize_bits; for (block = 0, offset = 0; block < blocks; block++) { bh = ext3_bread(handle, dir, block, 0, &retval); - if(!bh) + if(!bh) { + ext3_unlock_htree(dir, lock); return retval; + } retval = add_dirent_to_buf(handle, dentry, inode, 0, bh); - if (retval != -ENOSPC) + if (retval != -ENOSPC) { + ext3_unlock_htree(dir, lock); return retval; + } #ifdef CONFIG_EXT3_INDEX if (blocks == 1 && !dx_fallback && - EXT3_HAS_COMPAT_FEATURE(sb, EXT3_FEATURE_COMPAT_DIR_INDEX)) - return make_indexed_dir(handle, dentry, inode, bh); + EXT3_HAS_COMPAT_FEATURE(sb, EXT3_FEATURE_COMPAT_DIR_INDEX)) { + retval = make_indexed_dir(handle, dentry, inode, bh); + ext3_unlock_htree(dir, lock); + return retval; + } #endif brelse(bh); } bh = ext3_append(handle, dir, &block, &retval); - if (!bh) + if (!bh) { + ext3_unlock_htree(dir, lock); return retval; + } de = (struct ext3_dir_entry_2 *) bh->b_data; de->inode = 0; de->rec_len = cpu_to_le16(rlen = blocksize); nlen = 0; - return add_dirent_to_buf(handle, dentry, inode, de, bh); + retval = add_dirent_to_buf(handle, dentry, inode, de, bh); + ext3_unlock_htree(dir, lock); + return retval; } #ifdef CONFIG_EXT3_INDEX -/* - * Returns 0 for success, or a negative error value - */ static int ext3_dx_add_entry(handle_t *handle, struct dentry *dentry, struct inode *inode) { @@ -1422,15 +1635,28 @@ static int ext3_dx_add_entry(handle_t *h struct super_block * sb = dir->i_sb; struct ext3_dir_entry_2 *de; int err; + int curidx; + void *idx_lock, *leaf_lock, *newleaf_lock; - frame = dx_probe(dentry, 0, &hinfo, frames, &err); +repeat: + frame = dx_probe(&dentry->d_name, dir, &hinfo, frames, &err); if (!frame) return err; - entries = frame->entries; - at = frame->at; - if (!(bh = ext3_bread(handle,dir, dx_get_block(frame->at), 0, &err))) + /* we're going to chage leaf, so lock it first */ + leaf_lock = ext3_lock_htree(dir, frame->leaf, 1); + + /* while locking leaf we just found may get splitted + * so we need to check this */ + if (!dx_check_full_path(frames, &hinfo)) { + ext3_unlock_htree(dir, leaf_lock); + dx_release(frames); + goto repeat; + } + if (!(bh = ext3_bread(handle,dir, frame->leaf, 0, &err))) { + printk("can't ext3_bread(%d) = %d\n", (int) frame->leaf, err); goto cleanup; + } BUFFER_TRACE(bh, "get_write_access"); err = ext3_journal_get_write_access(handle, bh); @@ -1443,6 +1669,35 @@ static int ext3_dx_add_entry(handle_t *h goto cleanup; } + /* our leaf has no enough space. hence, we have to + * split it. so lock index for this leaf first */ + curidx = frame->curidx; + idx_lock = ext3_lock_htree(dir, curidx, 1); + + /* now check did path get changed? */ + dx_release(frames); + + frame = dx_probe(&dentry->d_name, dentry->d_parent->d_inode, + &hinfo, frames, &err); + if (!frame) { + /* FIXME: error handling here */ + brelse(bh); + ext3_unlock_htree(dir, idx_lock); + return err; + } + + if (frame->curidx != curidx) { + /* path has been changed. we have to drop old lock + * and repeat */ + brelse(bh); + ext3_unlock_htree(dir, idx_lock); + ext3_unlock_htree(dir, leaf_lock); + dx_release(frames); + goto repeat; + } + entries = frame->entries; + at = frame->at; + /* Block full, should compress but for now just split */ dxtrace(printk("using %u of %u node entries\n", dx_get_count(entries), dx_get_limit(entries))); @@ -1454,7 +1709,8 @@ static int ext3_dx_add_entry(handle_t *h struct dx_entry *entries2; struct dx_node *node2; struct buffer_head *bh2; - + void *nb_lock; + if (levels && (dx_get_count(frames->entries) == dx_get_limit(frames->entries))) { ext3_warning(sb, __FUNCTION__, @@ -1465,6 +1721,7 @@ static int ext3_dx_add_entry(handle_t *h bh2 = ext3_append (handle, dir, &newblock, &err); if (!(bh2)) goto cleanup; + nb_lock = ext3_lock_htree(dir, newblock, 1); node2 = (struct dx_node *)(bh2->b_data); entries2 = node2->entries; node2->fake.rec_len = cpu_to_le16(sb->s_blocksize); @@ -1476,27 +1733,73 @@ static int ext3_dx_add_entry(handle_t *h if (levels) { unsigned icount1 = icount/2, icount2 = icount - icount1; unsigned hash2 = dx_get_hash(entries + icount1); - dxtrace(printk("Split index %i/%i\n", icount1, icount2)); + void *ri_lock; - BUFFER_TRACE(frame->bh, "get_write_access"); /* index root */ + /* we have to protect root htree index against + * another dx_add_entry() which would want to + * split it too -bzzz */ + ri_lock = ext3_lock_htree(dir, 0, 1); + + /* as root index block blocked we must repeat + * searching for current position of our 2nd index -bzzz */ + dx_lock_bh(frame->bh); + frames->at = dx_find_position(frames->entries, hinfo.hash); + dx_unlock_bh(frame->bh); + + dxtrace(printk("Split index %i/%i\n", icount1, icount2)); + + BUFFER_TRACE(frame->bh, "get_write_access"); err = ext3_journal_get_write_access(handle, frames[0].bh); if (err) goto journal_error; - + + /* copy index into new one */ memcpy ((char *) entries2, (char *) (entries + icount1), icount2 * sizeof(struct dx_entry)); - dx_set_count (entries, icount1); dx_set_count (entries2, icount2); dx_set_limit (entries2, dx_node_limit(dir)); /* Which index block gets the new entry? */ if (at - entries >= icount1) { + /* unlock index we won't use */ + ext3_unlock_htree(dir, idx_lock); + idx_lock = nb_lock; frame->at = at = at - entries - icount1 + entries2; - frame->entries = entries = entries2; + frame->entries = entries2; + frame->curidx = curidx = newblock; swap(frame->bh, bh2); + } else { + /* we'll use old index,so new one may be freed */ + ext3_unlock_htree(dir, nb_lock); } - dx_insert_block (frames + 0, hash2, newblock); + + /* NOTE: very subtle piece of code + * competing dx_probe() may find 2nd level index in root + * index, then we insert new index here and set new count + * in that 2nd level index. so, dx_probe() may see 2nd + * level index w/o hash it looks for. the solution is + * to check root index after we locked just founded 2nd + * level index -bzzz */ + dx_lock_bh(frames[0].bh); + dx_insert_block (dir, frames + 0, hash2, newblock, 0); + dx_unlock_bh(frames[0].bh); + + /* now old and new 2nd level index blocks contain + * all pointers, so dx_probe() may find it in the both. + * it's OK -bzzz */ + + dx_lock_bh(frame->bh); + dx_set_count(entries, icount1); + dx_unlock_bh(frame->bh); + + /* now old 2nd level index block points to first half + * of leafs. it's importand that dx_probe() must + * check root index block for changes under + * dx_lock_bh(frame->bh) -bzzz */ + + ext3_unlock_htree(dir, ri_lock); + dxtrace(dx_show_index ("node", frames[1].entries)); dxtrace(dx_show_index ("node", ((struct dx_node *) bh2->b_data)->entries)); @@ -1505,38 +1808,62 @@ static int ext3_dx_add_entry(handle_t *h goto journal_error; brelse (bh2); } else { + unsigned long leaf = frame->leaf; + dxtrace(printk("Creating second level index...\n")); memcpy((char *) entries2, (char *) entries, icount * sizeof(struct dx_entry)); dx_set_limit(entries2, dx_node_limit(dir)); - + /* Set up root */ + dx_lock_bh(frames[0].bh); dx_set_count(entries, 1); dx_set_block(entries + 0, newblock); ((struct dx_root *) frames[0].bh->b_data)->info.indirect_levels = 1; + dx_unlock_bh(frames[0].bh); /* Add new access path frame */ frame = frames + 1; frame->at = at = at - entries + entries2; frame->entries = entries = entries2; frame->bh = bh2; + frame->curidx = newblock; + frame->leaf = leaf; err = ext3_journal_get_write_access(handle, frame->bh); if (err) goto journal_error; + + /* first level index was root. it's already initialized */ + /* we my unlock it now */ + ext3_unlock_htree(dir, idx_lock); + + /* current index is just created 2nd level index */ + curidx = newblock; + idx_lock = nb_lock; } + ext3_journal_dirty_metadata(handle, frames[0].bh); } - de = do_split(handle, dir, &bh, frame, &hinfo, &err); + de = do_split(handle, dir, &bh, frame, &hinfo, &newleaf_lock, &err); if (!de) goto cleanup; + + /* index splitted */ + ext3_unlock_htree(dir, idx_lock); + err = add_dirent_to_buf(handle, dentry, inode, de, bh); + + if (newleaf_lock) + ext3_unlock_htree(dir, newleaf_lock); + bh = 0; goto cleanup; journal_error: ext3_std_error(dir->i_sb, err); cleanup: + ext3_unlock_htree(dir, leaf_lock); if (bh) brelse(bh); dx_release(frames); @@ -1967,13 +2294,14 @@ static int ext3_rmdir (struct inode * di struct buffer_head * bh; struct ext3_dir_entry_2 * de; handle_t *handle; + void *lock; handle = ext3_journal_start(dir, EXT3_DELETE_TRANS_BLOCKS); if (IS_ERR(handle)) return PTR_ERR(handle); retval = -ENOENT; - bh = ext3_find_entry (dentry, &de); + bh = ext3_find_entry (dentry, &de, 1, &lock); if (!bh) goto end_rmdir; @@ -1984,14 +2312,19 @@ static int ext3_rmdir (struct inode * di DQUOT_INIT(inode); retval = -EIO; - if (le32_to_cpu(de->inode) != inode->i_ino) + if (le32_to_cpu(de->inode) != inode->i_ino) { + ext3_unlock_htree(dir, lock); goto end_rmdir; + } retval = -ENOTEMPTY; - if (!empty_dir (inode)) + if (!empty_dir (inode)) { + ext3_unlock_htree(dir, lock); goto end_rmdir; + } retval = ext3_delete_entry(handle, dir, de, bh); + ext3_unlock_htree(dir, lock); if (retval) goto end_rmdir; if (inode->i_nlink != 2) @@ -2024,6 +2357,7 @@ static int ext3_unlink(struct inode * di struct buffer_head * bh; struct ext3_dir_entry_2 * de; handle_t *handle; + void *lock; handle = ext3_journal_start(dir, EXT3_DELETE_TRANS_BLOCKS); if (IS_ERR(handle)) @@ -2033,7 +2367,7 @@ static int ext3_unlink(struct inode * di handle->h_sync = 1; retval = -ENOENT; - bh = ext3_find_entry (dentry, &de); + bh = ext3_find_entry (dentry, &de, 1, &lock); if (!bh) goto end_unlink; @@ -2041,8 +2375,10 @@ static int ext3_unlink(struct inode * di DQUOT_INIT(inode); retval = -EIO; - if (le32_to_cpu(de->inode) != inode->i_ino) + if (le32_to_cpu(de->inode) != inode->i_ino) { + ext3_unlock_htree(dir, lock); goto end_unlink; + } if (!inode->i_nlink) { ext3_warning (inode->i_sb, "ext3_unlink", @@ -2051,6 +2387,7 @@ static int ext3_unlink(struct inode * di inode->i_nlink = 1; } retval = ext3_delete_entry(handle, dir, de, bh); + ext3_unlock_htree(dir, lock); if (retval) goto end_unlink; dir->i_ctime = dir->i_mtime = CURRENT_TIME; @@ -2163,6 +2500,7 @@ static int ext3_rename (struct inode * o struct buffer_head * old_bh, * new_bh, * dir_bh; struct ext3_dir_entry_2 * old_de, * new_de; int retval; + void *lock1 = NULL, *lock2 = NULL, *lock3 = NULL; old_bh = new_bh = dir_bh = NULL; @@ -2174,7 +2512,10 @@ static int ext3_rename (struct inode * o if (IS_DIRSYNC(old_dir) || IS_DIRSYNC(new_dir)) handle->h_sync = 1; - old_bh = ext3_find_entry (old_dentry, &old_de); + if (old_dentry->d_parent == new_dentry->d_parent) + down(&EXT3_I(old_dentry->d_parent->d_inode)->i_rename_sem); + + old_bh = ext3_find_entry (old_dentry, &old_de, 1, &lock1 /* FIXME */); /* * Check for inode number is _not_ due to possible IO errors. * We might rmdir the source, keep it as pwd of some process @@ -2187,7 +2528,7 @@ static int ext3_rename (struct inode * o goto end_rename; new_inode = new_dentry->d_inode; - new_bh = ext3_find_entry (new_dentry, &new_de); + new_bh = ext3_find_entry (new_dentry, &new_de, 1, &lock2 /* FIXME */); if (new_bh) { if (!new_inode) { brelse (new_bh); @@ -2250,7 +2591,7 @@ static int ext3_rename (struct inode * o struct buffer_head *old_bh2; struct ext3_dir_entry_2 *old_de2; - old_bh2 = ext3_find_entry(old_dentry, &old_de2); + old_bh2 = ext3_find_entry(old_dentry, &old_de2, 1, &lock3 /* FIXME */); if (old_bh2) { retval = ext3_delete_entry(handle, old_dir, old_de2, old_bh2); @@ -2293,11 +2634,43 @@ static int ext3_rename (struct inode * o retval = 0; end_rename: + if (lock1) + ext3_unlock_htree(old_dentry->d_parent->d_inode, lock1); + if (lock2) + ext3_unlock_htree(new_dentry->d_parent->d_inode, lock2); + if (lock3) + ext3_unlock_htree(old_dentry->d_parent->d_inode, lock3); + if (old_dentry->d_parent == new_dentry->d_parent) + up(&EXT3_I(old_dentry->d_parent->d_inode)->i_rename_sem); brelse (dir_bh); brelse (old_bh); brelse (new_bh); ext3_journal_stop(handle); return retval; +} + +/* + * this locking primitives are used to protect parts + * of dir's htree. protection unit is block: leaf or index + */ +static inline void *ext3_lock_htree(struct inode *dir, + unsigned long value, int rwlock) +{ + void *lock; + + if (!test_opt(dir->i_sb, PDIROPS)) + return NULL; + lock = dynlock_lock(&EXT3_I(dir)->i_htree_lock, + value, rwlock, GFP_KERNEL); + return lock; +} + +static inline void ext3_unlock_htree(struct inode *dir, + void *lock) +{ + if (!test_opt(dir->i_sb, PDIROPS)) + return; + dynlock_unlock(&EXT3_I(dir)->i_htree_lock, lock); } /* diff -puN fs/ext3/super.c~ext3-pdirops fs/ext3/super.c --- linux-2.5.73/fs/ext3/super.c~ext3-pdirops Mon Jul 7 00:00:24 2003 +++ linux-2.5.73-alexey/fs/ext3/super.c Mon Jul 7 00:00:24 2003 @@ -504,6 +504,9 @@ static struct inode *ext3_alloc_inode(st ei->i_default_acl = EXT3_ACL_NOT_CACHED; #endif ei->vfs_inode.i_version = 1; + dynlock_init(&ei->i_htree_lock); + sema_init(&ei->i_append_sem, 1); + sema_init(&ei->i_rename_sem, 1); return &ei->vfs_inode; } @@ -712,6 +715,8 @@ static int parse_options (char * options return 0; } } + else if (!strcmp (this_char, "pdirops")) + set_opt (sbi->s_mount_opt, PDIROPS); else if (!strcmp (this_char, "grpid") || !strcmp (this_char, "bsdgroups")) set_opt (sbi->s_mount_opt, GRPID); @@ -890,6 +895,10 @@ static int ext3_setup_super(struct super ext3_check_blocks_bitmap (sb); ext3_check_inodes_bitmap (sb); } +#endif +#ifdef S_PDIROPS + if (test_opt (sb, PDIROPS)) + sb->s_flags |= S_PDIROPS; #endif setup_ro_after(sb); return res; diff -puN include/linux/ext3_fs.h~ext3-pdirops include/linux/ext3_fs.h --- linux-2.5.73/include/linux/ext3_fs.h~ext3-pdirops Mon Jul 7 00:00:24 2003 +++ linux-2.5.73-alexey/include/linux/ext3_fs.h Mon Jul 7 00:00:24 2003 @@ -312,6 +312,7 @@ struct ext3_inode { /* * Mount flags */ +#define EXT3_MOUNT_PDIROPS 0x800000/* Parallel dir operations */ #define EXT3_MOUNT_CHECK 0x0001 /* Do mount-time checks */ #define EXT3_MOUNT_OLDALLOC 0x0002 /* Don't use the new Orlov allocator */ #define EXT3_MOUNT_GRPID 0x0004 /* Create files with directory's group */ diff -puN include/linux/ext3_fs_i.h~ext3-pdirops include/linux/ext3_fs_i.h --- linux-2.5.73/include/linux/ext3_fs_i.h~ext3-pdirops Mon Jul 7 00:00:24 2003 +++ linux-2.5.73-alexey/include/linux/ext3_fs_i.h Mon Jul 7 00:00:24 2003 @@ -17,6 +17,7 @@ #define _LINUX_EXT3_FS_I #include +#include /* * second extended file system inode data in memory @@ -98,6 +99,11 @@ struct ext3_inode_info { */ struct rw_semaphore truncate_sem; struct inode vfs_inode; + + /* following fields for parallel directory operations -bzzz */ + struct dynlock i_htree_lock; + struct semaphore i_append_sem; + struct semaphore i_rename_sem; }; #endif /* _LINUX_EXT3_FS_I */ _