linux-erofs.lists.ozlabs.org archive mirror
 help / color / mirror / Atom feed
* [PATCH v2 1/8] erofs-utils: use erofs_atomic_t for inode->i_count
@ 2024-04-22  0:34 Gao Xiang
  2024-04-22  0:34 ` [PATCH v2 2/8] erofs-utils: lib: prepare for later deferred work Gao Xiang
                   ` (6 more replies)
  0 siblings, 7 replies; 8+ messages in thread
From: Gao Xiang @ 2024-04-22  0:34 UTC (permalink / raw)
  To: linux-erofs; +Cc: Gao Xiang

From: Gao Xiang <hsiangkao@linux.alibaba.com>

Since `inode->i_count` can be touched for more than one thread if
multi-threading is enabled.

Signed-off-by: Gao Xiang <hsiangkao@linux.alibaba.com>
---
patchset v1->v2:
 - Fix `--all-fragments` functionality;
 - Fix issues pointed out by Yifan. 

 include/erofs/atomic.h   | 10 ++++++++++
 include/erofs/inode.h    |  2 +-
 include/erofs/internal.h |  3 ++-
 lib/inode.c              |  5 +++--
 4 files changed, 16 insertions(+), 4 deletions(-)

diff --git a/include/erofs/atomic.h b/include/erofs/atomic.h
index 214cdb1..f28687e 100644
--- a/include/erofs/atomic.h
+++ b/include/erofs/atomic.h
@@ -25,4 +25,14 @@ __n;})
 #define erofs_atomic_test_and_set(ptr) \
 	__atomic_test_and_set(ptr, __ATOMIC_RELAXED)
 
+#define erofs_atomic_add_return(ptr, i) \
+	__atomic_add_fetch(ptr, i, __ATOMIC_RELAXED)
+
+#define erofs_atomic_sub_return(ptr, i) \
+	__atomic_sub_fetch(ptr, i, __ATOMIC_RELAXED)
+
+#define erofs_atomic_inc_return(ptr) erofs_atomic_add_return(ptr, 1)
+
+#define erofs_atomic_dec_return(ptr) erofs_atomic_sub_return(ptr, 1)
+
 #endif
diff --git a/include/erofs/inode.h b/include/erofs/inode.h
index d5a732a..5d6bc98 100644
--- a/include/erofs/inode.h
+++ b/include/erofs/inode.h
@@ -17,7 +17,7 @@ extern "C"
 
 static inline struct erofs_inode *erofs_igrab(struct erofs_inode *inode)
 {
-	++inode->i_count;
+	(void)erofs_atomic_inc_return(&inode->i_count);
 	return inode;
 }
 
diff --git a/include/erofs/internal.h b/include/erofs/internal.h
index 4cd2059..f31e548 100644
--- a/include/erofs/internal.h
+++ b/include/erofs/internal.h
@@ -25,6 +25,7 @@ typedef unsigned short umode_t;
 #ifdef HAVE_PTHREAD_H
 #include <pthread.h>
 #endif
+#include "atomic.h"
 
 #ifndef PATH_MAX
 #define PATH_MAX        4096    /* # chars in a path name including nul */
@@ -169,7 +170,7 @@ struct erofs_inode {
 		/* (mkfs.erofs) next pointer for directory dumping */
 		struct erofs_inode *next_dirwrite;
 	};
-	unsigned int i_count;
+	erofs_atomic_t i_count;
 	struct erofs_sb_info *sbi;
 	struct erofs_inode *i_parent;
 
diff --git a/lib/inode.c b/lib/inode.c
index 7508c74..55969d9 100644
--- a/lib/inode.c
+++ b/lib/inode.c
@@ -129,9 +129,10 @@ struct erofs_inode *erofs_iget_by_nid(erofs_nid_t nid)
 unsigned int erofs_iput(struct erofs_inode *inode)
 {
 	struct erofs_dentry *d, *t;
+	unsigned long got = erofs_atomic_dec_return(&inode->i_count);
 
-	if (inode->i_count > 1)
-		return --inode->i_count;
+	if (got >= 1)
+		return got;
 
 	list_for_each_entry_safe(d, t, &inode->i_subdirs, d_child)
 		free(d);
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH v2 2/8] erofs-utils: lib: prepare for later deferred work
  2024-04-22  0:34 [PATCH v2 1/8] erofs-utils: use erofs_atomic_t for inode->i_count Gao Xiang
@ 2024-04-22  0:34 ` Gao Xiang
  2024-04-22  0:34 ` [PATCH v2 3/8] erofs-utils: lib: split out erofs_commit_compressed_file() Gao Xiang
                   ` (5 subsequent siblings)
  6 siblings, 0 replies; 8+ messages in thread
From: Gao Xiang @ 2024-04-22  0:34 UTC (permalink / raw)
  To: linux-erofs; +Cc: Gao Xiang

From: Gao Xiang <hsiangkao@linux.alibaba.com>

Split out ordered metadata operations and add the following helpers:

 - erofs_mkfs_jobfn()

 - erofs_mkfs_go()

to handle these mkfs job items for multi-threadding support.

Signed-off-by: Gao Xiang <hsiangkao@linux.alibaba.com>
---
 lib/inode.c | 69 ++++++++++++++++++++++++++++++++++++++++++++---------
 1 file changed, 58 insertions(+), 11 deletions(-)

diff --git a/lib/inode.c b/lib/inode.c
index 55969d9..1ff05e1 100644
--- a/lib/inode.c
+++ b/lib/inode.c
@@ -1133,6 +1133,57 @@ static int erofs_mkfs_handle_nondirectory(struct erofs_inode *inode)
 	return 0;
 }
 
+enum erofs_mkfs_jobtype {	/* ordered job types */
+	EROFS_MKFS_JOB_NDIR,
+	EROFS_MKFS_JOB_DIR,
+	EROFS_MKFS_JOB_DIR_BH,
+};
+
+struct erofs_mkfs_jobitem {
+	enum erofs_mkfs_jobtype type;
+	union {
+		struct erofs_inode *inode;
+	} u;
+};
+
+static int erofs_mkfs_jobfn(struct erofs_mkfs_jobitem *item)
+{
+	struct erofs_inode *inode = item->u.inode;
+	int ret;
+
+	if (item->type == EROFS_MKFS_JOB_NDIR)
+		return erofs_mkfs_handle_nondirectory(inode);
+
+	if (item->type == EROFS_MKFS_JOB_DIR) {
+		ret = erofs_prepare_inode_buffer(inode);
+		if (ret)
+			return ret;
+		inode->bh->op = &erofs_skip_write_bhops;
+		if (IS_ROOT(inode))	/* assign root NID */
+			erofs_fixup_meta_blkaddr(inode);
+		return 0;
+	}
+
+	if (item->type == EROFS_MKFS_JOB_DIR_BH) {
+		erofs_write_dir_file(inode);
+		erofs_write_tail_end(inode);
+		inode->bh->op = &erofs_write_inode_bhops;
+		erofs_iput(inode);
+		return 0;
+	}
+	return -EINVAL;
+}
+
+int erofs_mkfs_go(struct erofs_sb_info *sbi,
+		  enum erofs_mkfs_jobtype type, void *elem, int size)
+{
+	struct erofs_mkfs_jobitem item;
+
+	item.type = type;
+	memcpy(&item.u, elem, size);
+	return erofs_mkfs_jobfn(&item);
+}
+
 static int erofs_mkfs_handle_directory(struct erofs_inode *dir)
 {
 	DIR *_dir;
@@ -1213,11 +1264,7 @@ static int erofs_mkfs_handle_directory(struct erofs_inode *dir)
 	else
 		dir->i_nlink = i_nlink;
 
-	ret = erofs_prepare_inode_buffer(dir);
-	if (ret)
-		return ret;
-	dir->bh->op = &erofs_skip_write_bhops;
-	return 0;
+	return erofs_mkfs_go(dir->sbi, EROFS_MKFS_JOB_DIR, &dir, sizeof(dir));
 
 err_closedir:
 	closedir(_dir);
@@ -1243,7 +1290,8 @@ static int erofs_mkfs_handle_inode(struct erofs_inode *inode)
 		return ret;
 
 	if (!S_ISDIR(inode->i_mode))
-		ret = erofs_mkfs_handle_nondirectory(inode);
+		ret = erofs_mkfs_go(inode->sbi, EROFS_MKFS_JOB_NDIR,
+				    &inode, sizeof(inode));
 	else
 		ret = erofs_mkfs_handle_directory(inode);
 	erofs_info("file %s dumped (mode %05o)", erofs_fspath(inode->i_srcpath),
@@ -1268,7 +1316,6 @@ struct erofs_inode *erofs_mkfs_build_tree_from_path(const char *path)
 	err = erofs_mkfs_handle_inode(root);
 	if (err)
 		return ERR_PTR(err);
-	erofs_fixup_meta_blkaddr(root);
 
 	do {
 		int err;
@@ -1302,10 +1349,10 @@ struct erofs_inode *erofs_mkfs_build_tree_from_path(const char *path)
 		}
 		*last = dumpdir;	/* fixup the last (or the only) one */
 		dumpdir = head;
-		erofs_write_dir_file(dir);
-		erofs_write_tail_end(dir);
-		dir->bh->op = &erofs_write_inode_bhops;
-		erofs_iput(dir);
+		err = erofs_mkfs_go(dir->sbi, EROFS_MKFS_JOB_DIR_BH,
+				    &dir, sizeof(dir));
+		if (err)
+			return ERR_PTR(err);
 	} while (dumpdir);
 
 	return root;
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH v2 3/8] erofs-utils: lib: split out erofs_commit_compressed_file()
  2024-04-22  0:34 [PATCH v2 1/8] erofs-utils: use erofs_atomic_t for inode->i_count Gao Xiang
  2024-04-22  0:34 ` [PATCH v2 2/8] erofs-utils: lib: prepare for later deferred work Gao Xiang
@ 2024-04-22  0:34 ` Gao Xiang
  2024-04-22  0:34 ` [PATCH v2 4/8] erofs-utils: rearrange several fields for multi-threaded mkfs Gao Xiang
                   ` (4 subsequent siblings)
  6 siblings, 0 replies; 8+ messages in thread
From: Gao Xiang @ 2024-04-22  0:34 UTC (permalink / raw)
  To: linux-erofs; +Cc: Gao Xiang

From: Gao Xiang <hsiangkao@linux.alibaba.com>

Just split out on-disk compressed metadata commit logic.

Signed-off-by: Gao Xiang <hsiangkao@linux.alibaba.com>
---
 lib/compress.c | 191 +++++++++++++++++++++++++++----------------------
 1 file changed, 105 insertions(+), 86 deletions(-)

diff --git a/lib/compress.c b/lib/compress.c
index b084446..8ca4033 100644
--- a/lib/compress.c
+++ b/lib/compress.c
@@ -1031,6 +1031,102 @@ int z_erofs_compress_segment(struct z_erofs_compress_sctx *ctx,
 	return 0;
 }
 
+int erofs_commit_compressed_file(struct z_erofs_compress_ictx *ictx,
+				 struct erofs_buffer_head *bh,
+				 erofs_blk_t blkaddr,
+				 erofs_blk_t compressed_blocks)
+{
+	struct erofs_inode *inode = ictx->inode;
+	struct erofs_sb_info *sbi = inode->sbi;
+	unsigned int legacymetasize;
+	u8 *compressmeta;
+	int ret;
+
+	/* fall back to no compression mode */
+	DBG_BUGON(compressed_blocks < !!inode->idata_size);
+	compressed_blocks -= !!inode->idata_size;
+
+	compressmeta = malloc(BLK_ROUND_UP(sbi, inode->i_size) *
+			      sizeof(struct z_erofs_lcluster_index) +
+			      Z_EROFS_LEGACY_MAP_HEADER_SIZE);
+	if (!compressmeta) {
+		ret = -ENOMEM;
+		goto err_free_idata;
+	}
+	ictx->metacur = compressmeta + Z_EROFS_LEGACY_MAP_HEADER_SIZE;
+	z_erofs_write_indexes(ictx);
+
+	legacymetasize = ictx->metacur - compressmeta;
+	/* estimate if data compression saves space or not */
+	if (!inode->fragment_size &&
+	    compressed_blocks * erofs_blksiz(sbi) + inode->idata_size +
+	    legacymetasize >= inode->i_size) {
+		z_erofs_dedupe_commit(true);
+		ret = -ENOSPC;
+		goto err_free_meta;
+	}
+	z_erofs_dedupe_commit(false);
+	z_erofs_write_mapheader(inode, compressmeta);
+
+	if (!ictx->fragemitted)
+		sbi->saved_by_deduplication += inode->fragment_size;
+
+	/* if the entire file is a fragment, a simplified form is used. */
+	if (inode->i_size <= inode->fragment_size) {
+		DBG_BUGON(inode->i_size < inode->fragment_size);
+		DBG_BUGON(inode->fragmentoff >> 63);
+		*(__le64 *)compressmeta =
+			cpu_to_le64(inode->fragmentoff | 1ULL << 63);
+		inode->datalayout = EROFS_INODE_COMPRESSED_FULL;
+		legacymetasize = Z_EROFS_LEGACY_MAP_HEADER_SIZE;
+	}
+
+	if (compressed_blocks) {
+		ret = erofs_bh_balloon(bh, erofs_pos(sbi, compressed_blocks));
+		DBG_BUGON(ret != erofs_blksiz(sbi));
+	} else {
+		if (!cfg.c_fragments && !cfg.c_dedupe)
+			DBG_BUGON(!inode->idata_size);
+	}
+
+	erofs_info("compressed %s (%llu bytes) into %u blocks",
+		   inode->i_srcpath, (unsigned long long)inode->i_size,
+		   compressed_blocks);
+
+	if (inode->idata_size) {
+		bh->op = &erofs_skip_write_bhops;
+		inode->bh_data = bh;
+	} else {
+		erofs_bdrop(bh, false);
+	}
+
+	inode->u.i_blocks = compressed_blocks;
+
+	if (inode->datalayout == EROFS_INODE_COMPRESSED_FULL) {
+		inode->extent_isize = legacymetasize;
+	} else {
+		ret = z_erofs_convert_to_compacted_format(inode, blkaddr,
+							  legacymetasize,
+							  compressmeta);
+		DBG_BUGON(ret);
+	}
+	inode->compressmeta = compressmeta;
+	if (!erofs_is_packed_inode(inode))
+		erofs_droid_blocklist_write(inode, blkaddr, compressed_blocks);
+	return 0;
+
+err_free_meta:
+	free(compressmeta);
+	inode->compressmeta = NULL;
+err_free_idata:
+	if (inode->idata) {
+		free(inode->idata);
+		inode->idata = NULL;
+	}
+	erofs_bdrop(bh, true);	/* revoke buffer */
+	return ret;
+}
+
 #ifdef EROFS_MT_ENABLED
 void *z_erofs_mt_wq_tls_alloc(struct erofs_workqueue *wq, void *ptr)
 {
@@ -1260,23 +1356,9 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 	static struct z_erofs_compress_sctx sctx;
 	struct erofs_compress_cfg *ccfg;
 	erofs_blk_t blkaddr, compressed_blocks = 0;
-	unsigned int legacymetasize;
 	int ret;
 	bool ismt = false;
 	struct erofs_sb_info *sbi = inode->sbi;
-	u8 *compressmeta = malloc(BLK_ROUND_UP(sbi, inode->i_size) *
-				  sizeof(struct z_erofs_lcluster_index) +
-				  Z_EROFS_LEGACY_MAP_HEADER_SIZE);
-
-	if (!compressmeta)
-		return -ENOMEM;
-
-	/* allocate main data buffer */
-	bh = erofs_balloc(DATA, 0, 0, 0);
-	if (IS_ERR(bh)) {
-		ret = PTR_ERR(bh);
-		goto err_free_meta;
-	}
 
 	/* initialize per-file compression setting */
 	inode->z_advise = 0;
@@ -1321,20 +1403,24 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 	if (cfg.c_fragments && !erofs_is_packed_inode(inode)) {
 		ret = z_erofs_fragments_dedupe(inode, fd, &ctx.tof_chksum);
 		if (ret < 0)
-			goto err_bdrop;
+			return ret;
 	}
 
-	blkaddr = erofs_mapbh(bh->block);	/* start_blkaddr */
 	ctx.inode = inode;
 	ctx.fd = fd;
 	ctx.fpos = fpos;
-	ctx.metacur = compressmeta + Z_EROFS_LEGACY_MAP_HEADER_SIZE;
 	init_list_head(&ctx.extents);
 	ctx.fix_dedupedfrag = false;
 	ctx.fragemitted = false;
 	sctx = (struct z_erofs_compress_sctx) { .ictx = &ctx, };
 	init_list_head(&sctx.extents);
 
+	/* allocate main data buffer */
+	bh = erofs_balloc(DATA, 0, 0, 0);
+	if (IS_ERR(bh))
+		return PTR_ERR(bh);
+	blkaddr = erofs_mapbh(bh->block);	/* start_blkaddr */
+
 	if (cfg.c_all_fragments && !erofs_is_packed_inode(inode) &&
 	    !inode->fragment_size) {
 		ret = z_erofs_pack_file_from_fd(inode, fd, ctx.tof_chksum);
@@ -1363,10 +1449,6 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 		compressed_blocks = sctx.blkaddr - blkaddr;
 	}
 
-	/* fall back to no compression mode */
-	DBG_BUGON(compressed_blocks < !!inode->idata_size);
-	compressed_blocks -= !!inode->idata_size;
-
 	/* generate an extent for the deduplicated fragment */
 	if (inode->fragment_size && !ctx.fragemitted) {
 		struct z_erofs_extent_item *ei;
@@ -1388,80 +1470,17 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 		z_erofs_commit_extent(&sctx, ei);
 	}
 	z_erofs_fragments_commit(inode);
-
 	if (!ismt)
 		list_splice_tail(&sctx.extents, &ctx.extents);
 
-	z_erofs_write_indexes(&ctx);
-	legacymetasize = ctx.metacur - compressmeta;
-	/* estimate if data compression saves space or not */
-	if (!inode->fragment_size &&
-	    compressed_blocks * erofs_blksiz(sbi) + inode->idata_size +
-	    legacymetasize >= inode->i_size) {
-		z_erofs_dedupe_commit(true);
-		ret = -ENOSPC;
-		goto err_free_idata;
-	}
-	z_erofs_dedupe_commit(false);
-	z_erofs_write_mapheader(inode, compressmeta);
-
-	if (!ctx.fragemitted)
-		sbi->saved_by_deduplication += inode->fragment_size;
-
-	/* if the entire file is a fragment, a simplified form is used. */
-	if (inode->i_size <= inode->fragment_size) {
-		DBG_BUGON(inode->i_size < inode->fragment_size);
-		DBG_BUGON(inode->fragmentoff >> 63);
-		*(__le64 *)compressmeta =
-			cpu_to_le64(inode->fragmentoff | 1ULL << 63);
-		inode->datalayout = EROFS_INODE_COMPRESSED_FULL;
-		legacymetasize = Z_EROFS_LEGACY_MAP_HEADER_SIZE;
-	}
-
-	if (compressed_blocks) {
-		ret = erofs_bh_balloon(bh, erofs_pos(sbi, compressed_blocks));
-		DBG_BUGON(ret != erofs_blksiz(sbi));
-	} else {
-		if (!cfg.c_fragments && !cfg.c_dedupe)
-			DBG_BUGON(!inode->idata_size);
-	}
-
-	erofs_info("compressed %s (%llu bytes) into %u blocks",
-		   inode->i_srcpath, (unsigned long long)inode->i_size,
-		   compressed_blocks);
-
-	if (inode->idata_size) {
-		bh->op = &erofs_skip_write_bhops;
-		inode->bh_data = bh;
-	} else {
-		erofs_bdrop(bh, false);
-	}
-
-	inode->u.i_blocks = compressed_blocks;
-
-	if (inode->datalayout == EROFS_INODE_COMPRESSED_FULL) {
-		inode->extent_isize = legacymetasize;
-	} else {
-		ret = z_erofs_convert_to_compacted_format(inode, blkaddr,
-							  legacymetasize,
-							  compressmeta);
-		DBG_BUGON(ret);
-	}
-	inode->compressmeta = compressmeta;
-	if (!erofs_is_packed_inode(inode))
-		erofs_droid_blocklist_write(inode, blkaddr, compressed_blocks);
-	return 0;
-
+	return erofs_commit_compressed_file(&ctx, bh, blkaddr,
+					    compressed_blocks);
 err_free_idata:
 	if (inode->idata) {
 		free(inode->idata);
 		inode->idata = NULL;
 	}
-err_bdrop:
 	erofs_bdrop(bh, true);	/* revoke buffer */
-err_free_meta:
-	free(compressmeta);
-	inode->compressmeta = NULL;
 	return ret;
 }
 
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH v2 4/8] erofs-utils: rearrange several fields for multi-threaded mkfs
  2024-04-22  0:34 [PATCH v2 1/8] erofs-utils: use erofs_atomic_t for inode->i_count Gao Xiang
  2024-04-22  0:34 ` [PATCH v2 2/8] erofs-utils: lib: prepare for later deferred work Gao Xiang
  2024-04-22  0:34 ` [PATCH v2 3/8] erofs-utils: lib: split out erofs_commit_compressed_file() Gao Xiang
@ 2024-04-22  0:34 ` Gao Xiang
  2024-04-22  0:34 ` [PATCH v2 5/8] erofs-utils: lib: split up z_erofs_mt_compress() Gao Xiang
                   ` (3 subsequent siblings)
  6 siblings, 0 replies; 8+ messages in thread
From: Gao Xiang @ 2024-04-22  0:34 UTC (permalink / raw)
  To: linux-erofs; +Cc: Gao Xiang

From: Gao Xiang <hsiangkao@linux.alibaba.com>

They should be located in `struct z_erofs_compress_ictx`.

Signed-off-by: Gao Xiang <hsiangkao@linux.alibaba.com>
---
 lib/compress.c | 57 +++++++++++++++++++++++++++-----------------------
 1 file changed, 31 insertions(+), 26 deletions(-)

diff --git a/lib/compress.c b/lib/compress.c
index 8ca4033..0bc5426 100644
--- a/lib/compress.c
+++ b/lib/compress.c
@@ -38,6 +38,7 @@ struct z_erofs_extent_item {
 
 struct z_erofs_compress_ictx {		/* inode context */
 	struct erofs_inode *inode;
+	struct erofs_compress_cfg *ccfg;
 	int fd;
 	u64 fpos;
 
@@ -49,6 +50,14 @@ struct z_erofs_compress_ictx {		/* inode context */
 	u8 *metacur;
 	struct list_head extents;
 	u16 clusterofs;
+
+	int seg_num;
+
+#if EROFS_MT_ENABLED
+	pthread_mutex_t mutex;
+	pthread_cond_t cond;
+	int nfini;
+#endif
 };
 
 struct z_erofs_compress_sctx {		/* segment context */
@@ -68,7 +77,7 @@ struct z_erofs_compress_sctx {		/* segment context */
 	erofs_blk_t blkaddr;		/* pointing to the next blkaddr */
 	u16 clusterofs;
 
-	int seg_num, seg_idx;
+	int seg_idx;
 
 	void *membuf;
 	erofs_off_t memoff;
@@ -98,9 +107,6 @@ struct erofs_compress_work {
 static struct {
 	struct erofs_workqueue wq;
 	struct erofs_compress_work *idle;
-	pthread_mutex_t mutex;
-	pthread_cond_t cond;
-	int nfini;
 } z_erofs_mt_ctrl;
 #endif
 
@@ -513,7 +519,7 @@ static int __z_erofs_compress_one(struct z_erofs_compress_sctx *ctx,
 	struct erofs_compress *const h = ctx->chandle;
 	unsigned int len = ctx->tail - ctx->head;
 	bool is_packed_inode = erofs_is_packed_inode(inode);
-	bool tsg = (ctx->seg_idx + 1 >= ctx->seg_num), final = !ctx->remaining;
+	bool tsg = (ctx->seg_idx + 1 >= ictx->seg_num), final = !ctx->remaining;
 	bool may_packing = (cfg.c_fragments && tsg && final &&
 			    !is_packed_inode && !z_erofs_mt_enabled);
 	bool may_inline = (cfg.c_ztailpacking && tsg && final && !may_packing);
@@ -1201,7 +1207,8 @@ void z_erofs_mt_workfn(struct erofs_work *work, void *tlsp)
 	struct erofs_compress_work *cwork = (struct erofs_compress_work *)work;
 	struct erofs_compress_wq_tls *tls = tlsp;
 	struct z_erofs_compress_sctx *sctx = &cwork->ctx;
-	struct erofs_inode *inode = sctx->ictx->inode;
+	struct z_erofs_compress_ictx *ictx = sctx->ictx;
+	struct erofs_inode *inode = ictx->inode;
 	struct erofs_sb_info *sbi = inode->sbi;
 	int ret = 0;
 
@@ -1228,10 +1235,10 @@ void z_erofs_mt_workfn(struct erofs_work *work, void *tlsp)
 
 out:
 	cwork->errcode = ret;
-	pthread_mutex_lock(&z_erofs_mt_ctrl.mutex);
-	++z_erofs_mt_ctrl.nfini;
-	pthread_cond_signal(&z_erofs_mt_ctrl.cond);
-	pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex);
+	pthread_mutex_lock(&ictx->mutex);
+	++ictx->nfini;
+	pthread_cond_signal(&ictx->cond);
+	pthread_mutex_unlock(&ictx->mutex);
 }
 
 int z_erofs_merge_segment(struct z_erofs_compress_ictx *ictx,
@@ -1268,16 +1275,19 @@ int z_erofs_merge_segment(struct z_erofs_compress_ictx *ictx,
 }
 
 int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx,
-			struct erofs_compress_cfg *ccfg,
 			erofs_blk_t blkaddr,
 			erofs_blk_t *compressed_blocks)
 {
 	struct erofs_compress_work *cur, *head = NULL, **last = &head;
+	struct erofs_compress_cfg *ccfg = ictx->ccfg;
 	struct erofs_inode *inode = ictx->inode;
 	int nsegs = DIV_ROUND_UP(inode->i_size, cfg.c_segment_size);
 	int ret, i;
 
-	z_erofs_mt_ctrl.nfini = 0;
+	ictx->seg_num = nsegs;
+	ictx->nfini = 0;
+	pthread_mutex_init(&ictx->mutex, NULL);
+	pthread_cond_init(&ictx->cond, NULL);
 
 	for (i = 0; i < nsegs; i++) {
 		if (z_erofs_mt_ctrl.idle) {
@@ -1294,7 +1304,6 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx,
 
 		cur->ctx = (struct z_erofs_compress_sctx) {
 			.ictx = ictx,
-			.seg_num = nsegs,
 			.seg_idx = i,
 			.pivot = &dummy_pivot,
 		};
@@ -1316,11 +1325,10 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx,
 		erofs_queue_work(&z_erofs_mt_ctrl.wq, &cur->work);
 	}
 
-	pthread_mutex_lock(&z_erofs_mt_ctrl.mutex);
-	while (z_erofs_mt_ctrl.nfini != nsegs)
-		pthread_cond_wait(&z_erofs_mt_ctrl.cond,
-				  &z_erofs_mt_ctrl.mutex);
-	pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex);
+	pthread_mutex_lock(&ictx->mutex);
+	while (ictx->nfini < ictx->seg_num)
+		pthread_cond_wait(&ictx->cond, &ictx->mutex);
+	pthread_mutex_unlock(&ictx->mutex);
 
 	ret = 0;
 	while (head) {
@@ -1354,7 +1362,6 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 	struct erofs_buffer_head *bh;
 	static struct z_erofs_compress_ictx ctx;
 	static struct z_erofs_compress_sctx sctx;
-	struct erofs_compress_cfg *ccfg;
 	erofs_blk_t blkaddr, compressed_blocks = 0;
 	int ret;
 	bool ismt = false;
@@ -1389,8 +1396,8 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 		}
 	}
 #endif
-	ccfg = &erofs_ccfg[inode->z_algorithmtype[0]];
-	inode->z_algorithmtype[0] = ccfg[0].algorithmtype;
+	ctx.ccfg = &erofs_ccfg[inode->z_algorithmtype[0]];
+	inode->z_algorithmtype[0] = ctx.ccfg->algorithmtype;
 	inode->z_algorithmtype[1] = 0;
 
 	inode->idata_size = 0;
@@ -1429,16 +1436,16 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 #ifdef EROFS_MT_ENABLED
 	} else if (z_erofs_mt_enabled && inode->i_size > cfg.c_segment_size) {
 		ismt = true;
-		ret = z_erofs_mt_compress(&ctx, ccfg, blkaddr, &compressed_blocks);
+		ret = z_erofs_mt_compress(&ctx, blkaddr, &compressed_blocks);
 		if (ret)
 			goto err_free_idata;
 #endif
 	} else {
+		ctx.seg_num = 1;
 		sctx.queue = g_queue;
 		sctx.destbuf = NULL;
-		sctx.chandle = &ccfg->handle;
+		sctx.chandle = &ctx.ccfg->handle;
 		sctx.remaining = inode->i_size - inode->fragment_size;
-		sctx.seg_num = 1;
 		sctx.seg_idx = 0;
 		sctx.pivot = &dummy_pivot;
 		sctx.pclustersize = z_erofs_get_max_pclustersize(inode);
@@ -1628,8 +1635,6 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s
 	z_erofs_mt_enabled = false;
 #ifdef EROFS_MT_ENABLED
 	if (cfg.c_mt_workers > 1) {
-		pthread_mutex_init(&z_erofs_mt_ctrl.mutex, NULL);
-		pthread_cond_init(&z_erofs_mt_ctrl.cond, NULL);
 		ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.wq,
 					    cfg.c_mt_workers,
 					    cfg.c_mt_workers << 2,
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH v2 5/8] erofs-utils: lib: split up z_erofs_mt_compress()
  2024-04-22  0:34 [PATCH v2 1/8] erofs-utils: use erofs_atomic_t for inode->i_count Gao Xiang
                   ` (2 preceding siblings ...)
  2024-04-22  0:34 ` [PATCH v2 4/8] erofs-utils: rearrange several fields for multi-threaded mkfs Gao Xiang
@ 2024-04-22  0:34 ` Gao Xiang
  2024-04-22  0:34 ` [PATCH v2 6/8] erofs-utils: mkfs: prepare inter-file multi-threaded compression Gao Xiang
                   ` (2 subsequent siblings)
  6 siblings, 0 replies; 8+ messages in thread
From: Gao Xiang @ 2024-04-22  0:34 UTC (permalink / raw)
  To: linux-erofs; +Cc: Gao Xiang

From: Gao Xiang <hsiangkao@linux.alibaba.com>

The on-disk compressed data write will be moved into a new function
erofs_mt_write_compressed_file().

Signed-off-by: Gao Xiang <hsiangkao@linux.alibaba.com>
---
 lib/compress.c | 162 ++++++++++++++++++++++++++++---------------------
 1 file changed, 93 insertions(+), 69 deletions(-)

diff --git a/lib/compress.c b/lib/compress.c
index 0bc5426..4ac4760 100644
--- a/lib/compress.c
+++ b/lib/compress.c
@@ -57,6 +57,8 @@ struct z_erofs_compress_ictx {		/* inode context */
 	pthread_mutex_t mutex;
 	pthread_cond_t cond;
 	int nfini;
+
+	struct erofs_compress_work *mtworks;
 #endif
 };
 
@@ -530,11 +532,11 @@ static int __z_erofs_compress_one(struct z_erofs_compress_sctx *ctx,
 	if (len <= ctx->pclustersize) {
 		if (!final || !len)
 			return 1;
+		if (inode->fragment_size && !ictx->fix_dedupedfrag) {
+			ctx->pclustersize = roundup(len, blksz);
+			goto fix_dedupedfrag;
+		}
 		if (may_packing) {
-			if (inode->fragment_size && !ictx->fix_dedupedfrag) {
-				ctx->pclustersize = roundup(len, blksz);
-				goto fix_dedupedfrag;
-			}
 			e->length = len;
 			goto frag_packing;
 		}
@@ -1034,6 +1036,26 @@ int z_erofs_compress_segment(struct z_erofs_compress_sctx *ctx,
 		z_erofs_commit_extent(ctx, ctx->pivot);
 		ctx->pivot = NULL;
 	}
+
+	/* generate an extra extent for the deduplicated fragment */
+	if (ctx->seg_idx >= ictx->seg_num - 1 &&
+	    ictx->inode->fragment_size && !ictx->fragemitted) {
+		struct z_erofs_extent_item *ei;
+
+		ei = malloc(sizeof(*ei));
+		if (!ei)
+			return -ENOMEM;
+
+		ei->e = (struct z_erofs_inmem_extent) {
+			.length = ictx->inode->fragment_size,
+			.compressedblks = 0,
+			.raw = false,
+			.partial = false,
+			.blkaddr = ctx->blkaddr,
+		};
+		init_list_head(&ei->list);
+		z_erofs_commit_extent(ctx, ei);
+	}
 	return 0;
 }
 
@@ -1048,6 +1070,8 @@ int erofs_commit_compressed_file(struct z_erofs_compress_ictx *ictx,
 	u8 *compressmeta;
 	int ret;
 
+	z_erofs_fragments_commit(inode);
+
 	/* fall back to no compression mode */
 	DBG_BUGON(compressed_blocks < !!inode->idata_size);
 	compressed_blocks -= !!inode->idata_size;
@@ -1125,11 +1149,11 @@ err_free_meta:
 	free(compressmeta);
 	inode->compressmeta = NULL;
 err_free_idata:
+	erofs_bdrop(bh, true);	/* revoke buffer */
 	if (inode->idata) {
 		free(inode->idata);
 		inode->idata = NULL;
 	}
-	erofs_bdrop(bh, true);	/* revoke buffer */
 	return ret;
 }
 
@@ -1260,7 +1284,7 @@ int z_erofs_merge_segment(struct z_erofs_compress_ictx *ictx,
 		sctx->blkaddr += ei->e.compressedblks;
 
 		/* skip write data but leave blkaddr for inline fallback */
-		if (ei->e.inlined)
+		if (ei->e.inlined || !ei->e.compressedblks)
 			continue;
 		ret2 = blk_write(sbi, sctx->membuf + blkoff * erofs_blksiz(sbi),
 				 ei->e.blkaddr, ei->e.compressedblks);
@@ -1274,15 +1298,13 @@ int z_erofs_merge_segment(struct z_erofs_compress_ictx *ictx,
 	return ret;
 }
 
-int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx,
-			erofs_blk_t blkaddr,
-			erofs_blk_t *compressed_blocks)
+int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx)
 {
 	struct erofs_compress_work *cur, *head = NULL, **last = &head;
 	struct erofs_compress_cfg *ccfg = ictx->ccfg;
 	struct erofs_inode *inode = ictx->inode;
 	int nsegs = DIV_ROUND_UP(inode->i_size, cfg.c_segment_size);
-	int ret, i;
+	int i;
 
 	ictx->seg_num = nsegs;
 	ictx->nfini = 0;
@@ -1290,11 +1312,12 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx,
 	pthread_cond_init(&ictx->cond, NULL);
 
 	for (i = 0; i < nsegs; i++) {
-		if (z_erofs_mt_ctrl.idle) {
-			cur = z_erofs_mt_ctrl.idle;
+		cur = z_erofs_mt_ctrl.idle;
+		if (cur) {
 			z_erofs_mt_ctrl.idle = cur->next;
 			cur->next = NULL;
-		} else {
+		}
+		if (!cur) {
 			cur = calloc(1, sizeof(*cur));
 			if (!cur)
 				return -ENOMEM;
@@ -1324,14 +1347,31 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx,
 		cur->work.fn = z_erofs_mt_workfn;
 		erofs_queue_work(&z_erofs_mt_ctrl.wq, &cur->work);
 	}
+	ictx->mtworks = head;
+	return 0;
+}
+
+int erofs_mt_write_compressed_file(struct z_erofs_compress_ictx *ictx)
+{
+	struct erofs_buffer_head *bh = NULL;
+	struct erofs_compress_work *head = ictx->mtworks, *cur;
+	erofs_blk_t blkaddr, compressed_blocks = 0;
+	int ret;
 
 	pthread_mutex_lock(&ictx->mutex);
 	while (ictx->nfini < ictx->seg_num)
 		pthread_cond_wait(&ictx->cond, &ictx->mutex);
 	pthread_mutex_unlock(&ictx->mutex);
 
+	bh = erofs_balloc(DATA, 0, 0, 0);
+	if (IS_ERR(bh))
+		return PTR_ERR(bh);
+
+	DBG_BUGON(!head);
+	blkaddr = erofs_mapbh(bh->block);
+
 	ret = 0;
-	while (head) {
+	do {
 		cur = head;
 		head = cur->next;
 
@@ -1345,14 +1385,19 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx,
 			if (ret2)
 				ret = ret2;
 
-			*compressed_blocks += cur->ctx.blkaddr - blkaddr;
+			compressed_blocks += cur->ctx.blkaddr - blkaddr;
 			blkaddr = cur->ctx.blkaddr;
 		}
 
 		cur->next = z_erofs_mt_ctrl.idle;
 		z_erofs_mt_ctrl.idle = cur;
-	}
-	return ret;
+	} while(head);
+
+	if (ret)
+		return ret;
+
+	return erofs_commit_compressed_file(ictx, bh,
+			blkaddr - compressed_blocks, compressed_blocks);
 }
 #endif
 
@@ -1362,9 +1407,8 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 	struct erofs_buffer_head *bh;
 	static struct z_erofs_compress_ictx ctx;
 	static struct z_erofs_compress_sctx sctx;
-	erofs_blk_t blkaddr, compressed_blocks = 0;
+	erofs_blk_t blkaddr;
 	int ret;
-	bool ismt = false;
 	struct erofs_sb_info *sbi = inode->sbi;
 
 	/* initialize per-file compression setting */
@@ -1419,14 +1463,6 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 	init_list_head(&ctx.extents);
 	ctx.fix_dedupedfrag = false;
 	ctx.fragemitted = false;
-	sctx = (struct z_erofs_compress_sctx) { .ictx = &ctx, };
-	init_list_head(&sctx.extents);
-
-	/* allocate main data buffer */
-	bh = erofs_balloc(DATA, 0, 0, 0);
-	if (IS_ERR(bh))
-		return PTR_ERR(bh);
-	blkaddr = erofs_mapbh(bh->block);	/* start_blkaddr */
 
 	if (cfg.c_all_fragments && !erofs_is_packed_inode(inode) &&
 	    !inode->fragment_size) {
@@ -1434,60 +1470,48 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 		if (ret)
 			goto err_free_idata;
 #ifdef EROFS_MT_ENABLED
-	} else if (z_erofs_mt_enabled && inode->i_size > cfg.c_segment_size) {
-		ismt = true;
-		ret = z_erofs_mt_compress(&ctx, blkaddr, &compressed_blocks);
+	} else if (z_erofs_mt_enabled) {
+		ret = z_erofs_mt_compress(&ctx);
 		if (ret)
 			goto err_free_idata;
+		return erofs_mt_write_compressed_file(&ctx);
 #endif
-	} else {
-		ctx.seg_num = 1;
-		sctx.queue = g_queue;
-		sctx.destbuf = NULL;
-		sctx.chandle = &ctx.ccfg->handle;
-		sctx.remaining = inode->i_size - inode->fragment_size;
-		sctx.seg_idx = 0;
-		sctx.pivot = &dummy_pivot;
-		sctx.pclustersize = z_erofs_get_max_pclustersize(inode);
-
-		ret = z_erofs_compress_segment(&sctx, -1, blkaddr);
-		if (ret)
-			goto err_free_idata;
-		compressed_blocks = sctx.blkaddr - blkaddr;
 	}
-
-	/* generate an extent for the deduplicated fragment */
-	if (inode->fragment_size && !ctx.fragemitted) {
-		struct z_erofs_extent_item *ei;
-
-		ei = malloc(sizeof(*ei));
-		if (!ei) {
-			ret = -ENOMEM;
-			goto err_free_idata;
-		}
-
-		ei->e = (struct z_erofs_inmem_extent) {
-			.length = inode->fragment_size,
-			.compressedblks = 0,
-			.raw = false,
-			.partial = false,
-			.blkaddr = sctx.blkaddr,
-		};
-		init_list_head(&ei->list);
-		z_erofs_commit_extent(&sctx, ei);
+	/* allocate main data buffer */
+	bh = erofs_balloc(DATA, 0, 0, 0);
+	if (IS_ERR(bh)) {
+		ret = PTR_ERR(bh);
+		goto err_free_idata;
 	}
-	z_erofs_fragments_commit(inode);
-	if (!ismt)
-		list_splice_tail(&sctx.extents, &ctx.extents);
+	blkaddr = erofs_mapbh(bh->block); /* start_blkaddr */
+
+	ctx.seg_num = 1;
+	sctx = (struct z_erofs_compress_sctx) {
+		.ictx = &ctx,
+		.queue = g_queue,
+		.chandle = &ctx.ccfg->handle,
+		.remaining = inode->i_size - inode->fragment_size,
+		.seg_idx = 0,
+		.pivot = &dummy_pivot,
+		.pclustersize = z_erofs_get_max_pclustersize(inode),
+	};
+	init_list_head(&sctx.extents);
+
+	ret = z_erofs_compress_segment(&sctx, -1, blkaddr);
+	if (ret)
+		goto err_bdrop;
+	list_splice_tail(&sctx.extents, &ctx.extents);
 
 	return erofs_commit_compressed_file(&ctx, bh, blkaddr,
-					    compressed_blocks);
+					    sctx.blkaddr - blkaddr);
+
+err_bdrop:
+	erofs_bdrop(bh, true);	/* revoke buffer */
 err_free_idata:
 	if (inode->idata) {
 		free(inode->idata);
 		inode->idata = NULL;
 	}
-	erofs_bdrop(bh, true);	/* revoke buffer */
 	return ret;
 }
 
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH v2 6/8] erofs-utils: mkfs: prepare inter-file multi-threaded compression
  2024-04-22  0:34 [PATCH v2 1/8] erofs-utils: use erofs_atomic_t for inode->i_count Gao Xiang
                   ` (3 preceding siblings ...)
  2024-04-22  0:34 ` [PATCH v2 5/8] erofs-utils: lib: split up z_erofs_mt_compress() Gao Xiang
@ 2024-04-22  0:34 ` Gao Xiang
  2024-04-22  0:34 ` [PATCH v2 7/8] erofs-utils: lib: introduce non-directory jobitem context Gao Xiang
  2024-04-22  0:34 ` [PATCH v2 8/8] erofs-utils: mkfs: enable inter-file multi-threaded compression Gao Xiang
  6 siblings, 0 replies; 8+ messages in thread
From: Gao Xiang @ 2024-04-22  0:34 UTC (permalink / raw)
  To: linux-erofs; +Cc: Gao Xiang, Yifan Zhao, Tong Xin

From: Yifan Zhao <zhaoyifan@sjtu.edu.cn>

This patch separates the compression process into two parts.

Specifically, erofs_begin_compressed_file() will trigger compression.
erofs_write_compressed_file() will wait for the compression finish and
write compressed (meta)data.

Note that it's possible that erofs_begin_compressed_file() and
erofs_write_compressed_file() run with different threads even the
global inode context is used, thus add another synchronization point.

Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn>
Co-authored-by: Tong Xin <xin_tong@sjtu.edu.cn>
Signed-off-by: Gao Xiang <hsiangkao@linux.alibaba.com>
---
 include/erofs/compress.h |   5 +-
 lib/compress.c           | 138 ++++++++++++++++++++++++++++-----------
 lib/inode.c              |  17 ++++-
 3 files changed, 118 insertions(+), 42 deletions(-)

diff --git a/include/erofs/compress.h b/include/erofs/compress.h
index 871db54..c9831a7 100644
--- a/include/erofs/compress.h
+++ b/include/erofs/compress.h
@@ -17,8 +17,11 @@ extern "C"
 #define EROFS_CONFIG_COMPR_MAX_SZ	(4000 * 1024)
 #define Z_EROFS_COMPR_QUEUE_SZ		(EROFS_CONFIG_COMPR_MAX_SZ * 2)
 
+struct z_erofs_compress_ictx;
+
 void z_erofs_drop_inline_pcluster(struct erofs_inode *inode);
-int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos);
+void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos);
+int erofs_write_compressed_file(struct z_erofs_compress_ictx *ictx);
 
 int z_erofs_compress_init(struct erofs_sb_info *sbi,
 			  struct erofs_buffer_head *bh);
diff --git a/lib/compress.c b/lib/compress.c
index 4ac4760..7fef698 100644
--- a/lib/compress.c
+++ b/lib/compress.c
@@ -109,6 +109,7 @@ struct erofs_compress_work {
 static struct {
 	struct erofs_workqueue wq;
 	struct erofs_compress_work *idle;
+	pthread_mutex_t mutex;
 } z_erofs_mt_ctrl;
 #endif
 
@@ -1312,11 +1313,13 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx)
 	pthread_cond_init(&ictx->cond, NULL);
 
 	for (i = 0; i < nsegs; i++) {
+		pthread_mutex_lock(&z_erofs_mt_ctrl.mutex);
 		cur = z_erofs_mt_ctrl.idle;
 		if (cur) {
 			z_erofs_mt_ctrl.idle = cur->next;
 			cur->next = NULL;
 		}
+		pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex);
 		if (!cur) {
 			cur = calloc(1, sizeof(*cur));
 			if (!cur)
@@ -1364,8 +1367,10 @@ int erofs_mt_write_compressed_file(struct z_erofs_compress_ictx *ictx)
 	pthread_mutex_unlock(&ictx->mutex);
 
 	bh = erofs_balloc(DATA, 0, 0, 0);
-	if (IS_ERR(bh))
-		return PTR_ERR(bh);
+	if (IS_ERR(bh)) {
+		ret = PTR_ERR(bh);
+		goto out;
+	}
 
 	DBG_BUGON(!head);
 	blkaddr = erofs_mapbh(bh->block);
@@ -1389,27 +1394,31 @@ int erofs_mt_write_compressed_file(struct z_erofs_compress_ictx *ictx)
 			blkaddr = cur->ctx.blkaddr;
 		}
 
+		pthread_mutex_lock(&z_erofs_mt_ctrl.mutex);
 		cur->next = z_erofs_mt_ctrl.idle;
 		z_erofs_mt_ctrl.idle = cur;
-	} while(head);
+		pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex);
+	} while (head);
 
 	if (ret)
-		return ret;
-
-	return erofs_commit_compressed_file(ictx, bh,
+		goto out;
+	ret = erofs_commit_compressed_file(ictx, bh,
 			blkaddr - compressed_blocks, compressed_blocks);
+
+out:
+	close(ictx->fd);
+	free(ictx);
+	return ret;
 }
 #endif
 
-int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
+static struct z_erofs_compress_ictx g_ictx;
+
+void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 {
-	static u8 g_queue[Z_EROFS_COMPR_QUEUE_SZ];
-	struct erofs_buffer_head *bh;
-	static struct z_erofs_compress_ictx ctx;
-	static struct z_erofs_compress_sctx sctx;
-	erofs_blk_t blkaddr;
-	int ret;
 	struct erofs_sb_info *sbi = inode->sbi;
+	struct z_erofs_compress_ictx *ictx;
+	int ret;
 
 	/* initialize per-file compression setting */
 	inode->z_advise = 0;
@@ -1440,43 +1449,87 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 		}
 	}
 #endif
-	ctx.ccfg = &erofs_ccfg[inode->z_algorithmtype[0]];
-	inode->z_algorithmtype[0] = ctx.ccfg->algorithmtype;
-	inode->z_algorithmtype[1] = 0;
-
 	inode->idata_size = 0;
 	inode->fragment_size = 0;
 
+	if (z_erofs_mt_enabled) {
+		ictx = malloc(sizeof(*ictx));
+		if (!ictx)
+			return ERR_PTR(-ENOMEM);
+		ictx->fd = dup(fd);
+	} else {
+#ifdef EROFS_MT_ENABLED
+		pthread_mutex_lock(&g_ictx.mutex);
+		if (g_ictx.seg_num)
+			pthread_cond_wait(&g_ictx.cond, &g_ictx.mutex);
+		g_ictx.seg_num = 1;
+		pthread_mutex_unlock(&g_ictx.mutex);
+#endif
+		ictx = &g_ictx;
+		ictx->fd = fd;
+	}
+
+	ictx->ccfg = &erofs_ccfg[inode->z_algorithmtype[0]];
+	inode->z_algorithmtype[0] = ictx->ccfg->algorithmtype;
+	inode->z_algorithmtype[1] = 0;
+
 	/*
 	 * Handle tails in advance to avoid writing duplicated
 	 * parts into the packed inode.
 	 */
 	if (cfg.c_fragments && !erofs_is_packed_inode(inode)) {
-		ret = z_erofs_fragments_dedupe(inode, fd, &ctx.tof_chksum);
+		ret = z_erofs_fragments_dedupe(inode, fd, &ictx->tof_chksum);
 		if (ret < 0)
-			return ret;
+			goto err_free_ictx;
 	}
 
-	ctx.inode = inode;
-	ctx.fd = fd;
-	ctx.fpos = fpos;
-	init_list_head(&ctx.extents);
-	ctx.fix_dedupedfrag = false;
-	ctx.fragemitted = false;
+	ictx->inode = inode;
+	ictx->fpos = fpos;
+	init_list_head(&ictx->extents);
+	ictx->fix_dedupedfrag = false;
+	ictx->fragemitted = false;
 
 	if (cfg.c_all_fragments && !erofs_is_packed_inode(inode) &&
 	    !inode->fragment_size) {
-		ret = z_erofs_pack_file_from_fd(inode, fd, ctx.tof_chksum);
+		ret = z_erofs_pack_file_from_fd(inode, fd, ictx->tof_chksum);
 		if (ret)
 			goto err_free_idata;
+	}
 #ifdef EROFS_MT_ENABLED
-	} else if (z_erofs_mt_enabled) {
-		ret = z_erofs_mt_compress(&ctx);
+	if (ictx != &g_ictx) {
+		ret = z_erofs_mt_compress(ictx);
 		if (ret)
 			goto err_free_idata;
-		return erofs_mt_write_compressed_file(&ctx);
+	}
 #endif
+	return ictx;
+
+err_free_idata:
+	if (inode->idata) {
+		free(inode->idata);
+		inode->idata = NULL;
 	}
+err_free_ictx:
+	if (ictx != &g_ictx)
+		free(ictx);
+	return ERR_PTR(ret);
+}
+
+int erofs_write_compressed_file(struct z_erofs_compress_ictx *ictx)
+{
+	static u8 g_queue[Z_EROFS_COMPR_QUEUE_SZ];
+	struct erofs_buffer_head *bh;
+	static struct z_erofs_compress_sctx sctx;
+	struct erofs_compress_cfg *ccfg = ictx->ccfg;
+	struct erofs_inode *inode = ictx->inode;
+	erofs_blk_t blkaddr;
+	int ret;
+
+#ifdef EROFS_MT_ENABLED
+	if (ictx != &g_ictx)
+		return erofs_mt_write_compressed_file(ictx);
+#endif
+
 	/* allocate main data buffer */
 	bh = erofs_balloc(DATA, 0, 0, 0);
 	if (IS_ERR(bh)) {
@@ -1485,11 +1538,11 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 	}
 	blkaddr = erofs_mapbh(bh->block); /* start_blkaddr */
 
-	ctx.seg_num = 1;
+	ictx->seg_num = 1;
 	sctx = (struct z_erofs_compress_sctx) {
-		.ictx = &ctx,
+		.ictx = ictx,
 		.queue = g_queue,
-		.chandle = &ctx.ccfg->handle,
+		.chandle = &ccfg->handle,
 		.remaining = inode->i_size - inode->fragment_size,
 		.seg_idx = 0,
 		.pivot = &dummy_pivot,
@@ -1499,19 +1552,26 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 
 	ret = z_erofs_compress_segment(&sctx, -1, blkaddr);
 	if (ret)
-		goto err_bdrop;
-	list_splice_tail(&sctx.extents, &ctx.extents);
+		goto err_free_idata;
 
-	return erofs_commit_compressed_file(&ctx, bh, blkaddr,
-					    sctx.blkaddr - blkaddr);
+	list_splice_tail(&sctx.extents, &ictx->extents);
+	ret = erofs_commit_compressed_file(ictx, bh, blkaddr,
+					   sctx.blkaddr - blkaddr);
+	goto out;
 
-err_bdrop:
-	erofs_bdrop(bh, true);	/* revoke buffer */
 err_free_idata:
+	erofs_bdrop(bh, true);	/* revoke buffer */
 	if (inode->idata) {
 		free(inode->idata);
 		inode->idata = NULL;
 	}
+out:
+#ifdef EROFS_MT_ENABLED
+	pthread_mutex_lock(&ictx->mutex);
+	ictx->seg_num = 0;
+	pthread_cond_signal(&ictx->cond);
+	pthread_mutex_unlock(&ictx->mutex);
+#endif
 	return ret;
 }
 
@@ -1666,6 +1726,8 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s
 					    z_erofs_mt_wq_tls_free);
 		z_erofs_mt_enabled = !ret;
 	}
+	pthread_mutex_init(&g_ictx.mutex, NULL);
+	pthread_cond_init(&g_ictx.cond, NULL);
 #endif
 	return 0;
 }
diff --git a/lib/inode.c b/lib/inode.c
index 1ff05e1..0d044f4 100644
--- a/lib/inode.c
+++ b/lib/inode.c
@@ -499,10 +499,15 @@ int erofs_write_file(struct erofs_inode *inode, int fd, u64 fpos)
 	DBG_BUGON(!inode->i_size);
 
 	if (cfg.c_compr_opts[0].alg && erofs_file_is_compressible(inode)) {
+		void *ictx;
 		int ret;
 
-		ret = erofs_write_compressed_file(inode, fd, fpos);
-		if (!ret || ret != -ENOSPC)
+		ictx = erofs_begin_compressed_file(inode, fd, fpos);
+		if (IS_ERR(ictx))
+			return PTR_ERR(ictx);
+
+		ret = erofs_write_compressed_file(ictx);
+		if (ret != -ENOSPC)
 			return ret;
 
 		if (lseek(fd, fpos, SEEK_SET) < 0)
@@ -1362,6 +1367,7 @@ struct erofs_inode *erofs_mkfs_build_special_from_fd(int fd, const char *name)
 {
 	struct stat st;
 	struct erofs_inode *inode;
+	void *ictx;
 	int ret;
 
 	ret = lseek(fd, 0, SEEK_SET);
@@ -1392,7 +1398,12 @@ struct erofs_inode *erofs_mkfs_build_special_from_fd(int fd, const char *name)
 		inode->nid = inode->sbi->packed_nid;
 	}
 
-	ret = erofs_write_compressed_file(inode, fd, 0);
+	ictx = erofs_begin_compressed_file(inode, fd, 0);
+	if (IS_ERR(ictx))
+		return ERR_CAST(ictx);
+
+	DBG_BUGON(!ictx);
+	ret = erofs_write_compressed_file(ictx);
 	if (ret == -ENOSPC) {
 		ret = lseek(fd, 0, SEEK_SET);
 		if (ret < 0)
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH v2 7/8] erofs-utils: lib: introduce non-directory jobitem context
  2024-04-22  0:34 [PATCH v2 1/8] erofs-utils: use erofs_atomic_t for inode->i_count Gao Xiang
                   ` (4 preceding siblings ...)
  2024-04-22  0:34 ` [PATCH v2 6/8] erofs-utils: mkfs: prepare inter-file multi-threaded compression Gao Xiang
@ 2024-04-22  0:34 ` Gao Xiang
  2024-04-22  0:34 ` [PATCH v2 8/8] erofs-utils: mkfs: enable inter-file multi-threaded compression Gao Xiang
  6 siblings, 0 replies; 8+ messages in thread
From: Gao Xiang @ 2024-04-22  0:34 UTC (permalink / raw)
  To: linux-erofs; +Cc: Gao Xiang

From: Gao Xiang <hsiangkao@linux.alibaba.com>

It will describe EROFS_MKFS_JOB_NDIR defer work.  Also, start
compression before queueing EROFS_MKFS_JOB_NDIR.

Signed-off-by: Gao Xiang <hsiangkao@linux.alibaba.com>
---
 lib/inode.c | 62 +++++++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 51 insertions(+), 11 deletions(-)

diff --git a/lib/inode.c b/lib/inode.c
index 0d044f4..6ad66bf 100644
--- a/lib/inode.c
+++ b/lib/inode.c
@@ -1107,8 +1107,36 @@ static void erofs_fixup_meta_blkaddr(struct erofs_inode *rootdir)
 	rootdir->nid = (off - meta_offset) >> EROFS_ISLOTBITS;
 }
 
-static int erofs_mkfs_handle_nondirectory(struct erofs_inode *inode)
+struct erofs_mkfs_job_ndir_ctx {
+	struct erofs_inode *inode;
+	void *ictx;
+	int fd;
+};
+
+static int erofs_mkfs_job_write_file(struct erofs_mkfs_job_ndir_ctx *ctx)
 {
+	struct erofs_inode *inode = ctx->inode;
+	int ret;
+
+	if (ctx->ictx) {
+		ret = erofs_write_compressed_file(ctx->ictx);
+		if (ret != -ENOSPC)
+			goto out;
+		if (lseek(ctx->fd, 0, SEEK_SET) < 0) {
+			ret = -errno;
+			goto out;
+		}
+	}
+	/* fallback to all data uncompressed */
+	ret = erofs_write_unencoded_file(inode, ctx->fd, 0);
+out:
+	close(ctx->fd);
+	return ret;
+}
+
+static int erofs_mkfs_handle_nondirectory(struct erofs_mkfs_job_ndir_ctx *ctx)
+{
+	struct erofs_inode *inode = ctx->inode;
 	int ret = 0;
 
 	if (S_ISLNK(inode->i_mode)) {
@@ -1124,12 +1152,7 @@ static int erofs_mkfs_handle_nondirectory(struct erofs_inode *inode)
 		ret = erofs_write_file_from_buffer(inode, symlink);
 		free(symlink);
 	} else if (inode->i_size) {
-		int fd = open(inode->i_srcpath, O_RDONLY | O_BINARY);
-
-		if (fd < 0)
-			return -errno;
-		ret = erofs_write_file(inode, fd, 0);
-		close(fd);
+		ret = erofs_mkfs_job_write_file(ctx);
 	}
 	if (ret)
 		return ret;
@@ -1148,6 +1171,7 @@ struct erofs_mkfs_jobitem {
 	enum erofs_mkfs_jobtype type;
 	union {
 		struct erofs_inode *inode;
+		struct erofs_mkfs_job_ndir_ctx ndir;
 	} u;
 };
 
@@ -1157,7 +1181,7 @@ static int erofs_mkfs_jobfn(struct erofs_mkfs_jobitem *item)
 	int ret;
 
 	if (item->type == EROFS_MKFS_JOB_NDIR)
-		return erofs_mkfs_handle_nondirectory(inode);
+		return erofs_mkfs_handle_nondirectory(&item->u.ndir);
 
 	if (item->type == EROFS_MKFS_JOB_DIR) {
 		ret = erofs_prepare_inode_buffer(inode);
@@ -1294,11 +1318,27 @@ static int erofs_mkfs_handle_inode(struct erofs_inode *inode)
 	if (ret < 0)
 		return ret;
 
-	if (!S_ISDIR(inode->i_mode))
+	if (!S_ISDIR(inode->i_mode)) {
+		struct erofs_mkfs_job_ndir_ctx ctx = { .inode = inode };
+
+		if (!S_ISLNK(inode->i_mode) && inode->i_size) {
+			ctx.fd = open(inode->i_srcpath, O_RDONLY | O_BINARY);
+			if (ctx.fd < 0)
+				return -errno;
+
+			if (cfg.c_compr_opts[0].alg &&
+			    erofs_file_is_compressible(inode)) {
+				ctx.ictx = erofs_begin_compressed_file(inode,
+								ctx.fd, 0);
+				if (IS_ERR(ctx.ictx))
+					return PTR_ERR(ctx.ictx);
+			}
+		}
 		ret = erofs_mkfs_go(inode->sbi, EROFS_MKFS_JOB_NDIR,
-				    &inode, sizeof(inode));
-	else
+				    &ctx, sizeof(ctx));
+	} else {
 		ret = erofs_mkfs_handle_directory(inode);
+	}
 	erofs_info("file %s dumped (mode %05o)", erofs_fspath(inode->i_srcpath),
 		   inode->i_mode);
 	return ret;
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH v2 8/8] erofs-utils: mkfs: enable inter-file multi-threaded compression
  2024-04-22  0:34 [PATCH v2 1/8] erofs-utils: use erofs_atomic_t for inode->i_count Gao Xiang
                   ` (5 preceding siblings ...)
  2024-04-22  0:34 ` [PATCH v2 7/8] erofs-utils: lib: introduce non-directory jobitem context Gao Xiang
@ 2024-04-22  0:34 ` Gao Xiang
  6 siblings, 0 replies; 8+ messages in thread
From: Gao Xiang @ 2024-04-22  0:34 UTC (permalink / raw)
  To: linux-erofs; +Cc: Gao Xiang

From: Gao Xiang <hsiangkao@linux.alibaba.com>

Dispatch deferred ops in another per-sb worker thread.  Note that
deferred ops are strictly FIFOed.

Signed-off-by: Gao Xiang <hsiangkao@linux.alibaba.com>
---
 include/erofs/internal.h |   6 ++
 lib/inode.c              | 119 ++++++++++++++++++++++++++++++++++++++-
 2 files changed, 123 insertions(+), 2 deletions(-)

diff --git a/include/erofs/internal.h b/include/erofs/internal.h
index f31e548..ecbbdf6 100644
--- a/include/erofs/internal.h
+++ b/include/erofs/internal.h
@@ -71,6 +71,7 @@ struct erofs_xattr_prefix_item {
 
 #define EROFS_PACKED_NID_UNALLOCATED	-1
 
+struct erofs_mkfs_dfops;
 struct erofs_sb_info {
 	struct erofs_device_info *devs;
 	char *devname;
@@ -124,6 +125,11 @@ struct erofs_sb_info {
 	struct list_head list;
 
 	u64 saved_by_deduplication;
+
+#ifdef EROFS_MT_ENABLED
+	pthread_t dfops_worker;
+	struct erofs_mkfs_dfops *mkfs_dfops;
+#endif
 };
 
 /* make sure that any user of the erofs headers has atleast 64bit off_t type */
diff --git a/lib/inode.c b/lib/inode.c
index 6ad66bf..cf22bbe 100644
--- a/lib/inode.c
+++ b/lib/inode.c
@@ -1165,6 +1165,7 @@ enum erofs_mkfs_jobtype {	/* ordered job types */
 	EROFS_MKFS_JOB_NDIR,
 	EROFS_MKFS_JOB_DIR,
 	EROFS_MKFS_JOB_DIR_BH,
+	EROFS_MKFS_JOB_MAX
 };
 
 struct erofs_mkfs_jobitem {
@@ -1203,6 +1204,73 @@ static int erofs_mkfs_jobfn(struct erofs_mkfs_jobitem *item)
 	return -EINVAL;
 }
 
+#ifdef EROFS_MT_ENABLED
+
+struct erofs_mkfs_dfops {
+	pthread_t worker;
+	pthread_mutex_t lock;
+	pthread_cond_t full, empty;
+	struct erofs_mkfs_jobitem *queue;
+	unsigned int entries, head, tail;
+};
+
+#define EROFS_MT_QUEUE_SIZE 128
+
+void *erofs_mkfs_pop_jobitem(struct erofs_mkfs_dfops *q)
+{
+	struct erofs_mkfs_jobitem *item;
+
+	pthread_mutex_lock(&q->lock);
+	while (q->head == q->tail)
+		pthread_cond_wait(&q->empty, &q->lock);
+
+	item = q->queue + q->head;
+	q->head = (q->head + 1) & (q->entries - 1);
+
+	pthread_cond_signal(&q->full);
+	pthread_mutex_unlock(&q->lock);
+	return item;
+}
+
+void *z_erofs_mt_dfops_worker(void *arg)
+{
+	struct erofs_sb_info *sbi = arg;
+	int ret = 0;
+
+	while (1) {
+		struct erofs_mkfs_jobitem *item;
+
+		item = erofs_mkfs_pop_jobitem(sbi->mkfs_dfops);
+		if (item->type >= EROFS_MKFS_JOB_MAX)
+			break;
+		ret = erofs_mkfs_jobfn(item);
+		if (ret)
+			break;
+	}
+	pthread_exit((void *)(uintptr_t)ret);
+}
+
+int erofs_mkfs_go(struct erofs_sb_info *sbi,
+		  enum erofs_mkfs_jobtype type, void *elem, int size)
+{
+	struct erofs_mkfs_jobitem *item;
+	struct erofs_mkfs_dfops *q = sbi->mkfs_dfops;
+
+	pthread_mutex_lock(&q->lock);
+
+	while (((q->tail + 1) & (q->entries - 1)) == q->head)
+		pthread_cond_wait(&q->full, &q->lock);
+
+	item = q->queue + q->tail;
+	item->type = type;
+	memcpy(&item->u, elem, size);
+	q->tail = (q->tail + 1) & (q->entries - 1);
+
+	pthread_cond_signal(&q->empty);
+	pthread_mutex_unlock(&q->lock);
+	return 0;
+}
+#else
 int erofs_mkfs_go(struct erofs_sb_info *sbi,
 		  enum erofs_mkfs_jobtype type, void *elem, int size)
 {
@@ -1212,6 +1280,7 @@ int erofs_mkfs_go(struct erofs_sb_info *sbi,
 	memcpy(&item.u, elem, size);
 	return erofs_mkfs_jobfn(&item);
 }
+#endif
 
 static int erofs_mkfs_handle_directory(struct erofs_inode *dir)
 {
@@ -1344,7 +1413,11 @@ static int erofs_mkfs_handle_inode(struct erofs_inode *inode)
 	return ret;
 }
 
-struct erofs_inode *erofs_mkfs_build_tree_from_path(const char *path)
+#ifndef EROFS_MT_ENABLED
+#define __erofs_mkfs_build_tree_from_path erofs_mkfs_build_tree_from_path
+#endif
+
+struct erofs_inode *__erofs_mkfs_build_tree_from_path(const char *path)
 {
 	struct erofs_inode *root, *dumpdir;
 	int err;
@@ -1399,10 +1472,52 @@ struct erofs_inode *erofs_mkfs_build_tree_from_path(const char *path)
 		if (err)
 			return ERR_PTR(err);
 	} while (dumpdir);
-
 	return root;
 }
 
+#ifdef EROFS_MT_ENABLED
+struct erofs_inode *erofs_mkfs_build_tree_from_path(const char *path)
+{
+	struct erofs_mkfs_dfops *q;
+	struct erofs_inode *root;
+	int err;
+
+	q = malloc(sizeof(*q));
+	if (!q)
+		return ERR_PTR(-ENOMEM);
+
+	q->entries = EROFS_MT_QUEUE_SIZE;
+	q->queue = malloc(q->entries * sizeof(*q->queue));
+	if (!q->queue) {
+		free(q);
+		return ERR_PTR(-ENOMEM);
+	}
+	pthread_mutex_init(&q->lock, NULL);
+	pthread_cond_init(&q->empty, NULL);
+	pthread_cond_init(&q->full, NULL);
+
+	q->head = 0;
+	q->tail = 0;
+	sbi.mkfs_dfops = q;
+	err = pthread_create(&sbi.dfops_worker, NULL,
+			     z_erofs_mt_dfops_worker, &sbi);
+	if (err)
+		goto fail;
+	root = __erofs_mkfs_build_tree_from_path(path);
+
+	erofs_mkfs_go(&sbi, ~0, NULL, 0);
+	err = pthread_join(sbi.dfops_worker, NULL);
+
+fail:
+	pthread_cond_destroy(&q->empty);
+	pthread_cond_destroy(&q->full);
+	pthread_mutex_destroy(&q->lock);
+	free(q->queue);
+	free(q);
+	return err ? ERR_PTR(err) : root;
+}
+#endif
+
 struct erofs_inode *erofs_mkfs_build_special_from_fd(int fd, const char *name)
 {
 	struct stat st;
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 8+ messages in thread

end of thread, other threads:[~2024-04-22  0:36 UTC | newest]

Thread overview: 8+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2024-04-22  0:34 [PATCH v2 1/8] erofs-utils: use erofs_atomic_t for inode->i_count Gao Xiang
2024-04-22  0:34 ` [PATCH v2 2/8] erofs-utils: lib: prepare for later deferred work Gao Xiang
2024-04-22  0:34 ` [PATCH v2 3/8] erofs-utils: lib: split out erofs_commit_compressed_file() Gao Xiang
2024-04-22  0:34 ` [PATCH v2 4/8] erofs-utils: rearrange several fields for multi-threaded mkfs Gao Xiang
2024-04-22  0:34 ` [PATCH v2 5/8] erofs-utils: lib: split up z_erofs_mt_compress() Gao Xiang
2024-04-22  0:34 ` [PATCH v2 6/8] erofs-utils: mkfs: prepare inter-file multi-threaded compression Gao Xiang
2024-04-22  0:34 ` [PATCH v2 7/8] erofs-utils: lib: introduce non-directory jobitem context Gao Xiang
2024-04-22  0:34 ` [PATCH v2 8/8] erofs-utils: mkfs: enable inter-file multi-threaded compression Gao Xiang

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).