This patch allows parallelizing the compression process of different files in mkfs. Specifically, a traverser thread traverses the files and issues the compression task, which is handled by the workers. Then, the main thread consumes them and writes the compressed data to the device. To this end, the logic of erofs_write_compressed_file() has been modified to split the creation and completion logic of the compression task. Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn> Co-authored-by: Tong Xin <xin_tong@sjtu.edu.cn> --- include/erofs/compress.h | 16 ++ include/erofs/internal.h | 3 + include/erofs/list.h | 8 + include/erofs/queue.h | 22 +++ lib/Makefile.am | 2 +- lib/compress.c | 323 +++++++++++++++++++++++++-------------- lib/inode.c | 242 +++++++++++++++++++++++++++-- lib/queue.c | 64 ++++++++ 8 files changed, 553 insertions(+), 127 deletions(-) create mode 100644 include/erofs/queue.h create mode 100644 lib/queue.c diff --git a/include/erofs/compress.h b/include/erofs/compress.h index 3253611..9bcd888 100644 --- a/include/erofs/compress.h +++ b/include/erofs/compress.h @@ -17,6 +17,22 @@ extern "C" #define EROFS_CONFIG_COMPR_MAX_SZ (4000 * 1024) #define Z_EROFS_COMPR_QUEUE_SZ (EROFS_CONFIG_COMPR_MAX_SZ * 2) +#ifdef EROFS_MT_ENABLED +struct z_erofs_mt_file { + pthread_mutex_t mutex; + pthread_cond_t cond; + int total; + int nfini; + + int fd; + struct erofs_compress_work *head; + + struct z_erofs_mt_file *next; +}; + +int z_erofs_mt_reap(struct z_erofs_mt_file *desc); +#endif + void z_erofs_drop_inline_pcluster(struct erofs_inode *inode); int erofs_write_compressed_file(struct erofs_inode *inode, int fd); diff --git a/include/erofs/internal.h b/include/erofs/internal.h index 4cd2059..2580588 100644 --- a/include/erofs/internal.h +++ b/include/erofs/internal.h @@ -250,6 +250,9 @@ struct erofs_inode { #ifdef WITH_ANDROID uint64_t capabilities; #endif +#ifdef EROFS_MT_ENABLED + struct z_erofs_mt_file *mt_desc; +#endif }; static inline erofs_off_t erofs_iloc(struct erofs_inode *inode) diff --git a/include/erofs/list.h b/include/erofs/list.h index d7a9fee..55383ac 100644 --- a/include/erofs/list.h +++ b/include/erofs/list.h @@ -90,6 +90,14 @@ static inline void list_splice_tail(struct list_head *list, __list_splice(list, head->prev, head); } +static inline void list_replace(struct list_head *old, struct list_head *new) +{ + new->next = old->next; + new->next->prev = new; + new->prev = old->prev; + new->prev->next = new; +} + #define list_entry(ptr, type, member) container_of(ptr, type, member) #define list_first_entry(ptr, type, member) \ diff --git a/include/erofs/queue.h b/include/erofs/queue.h new file mode 100644 index 0000000..35d29b0 --- /dev/null +++ b/include/erofs/queue.h @@ -0,0 +1,22 @@ +/* SPDX-License-Identifier: GPL-2.0+ */ +#ifndef __EROFS_QUEUE_H +#define __EROFS_QUEUE_H + +#include "internal.h" + +struct erofs_queue { + pthread_mutex_t lock; + pthread_cond_t full, empty; + + void *buf; + + size_t size, elem_size; + size_t head, tail; +}; + +struct erofs_queue* erofs_alloc_queue(size_t size, size_t elem_size); +void erofs_push_queue(struct erofs_queue *q, void *elem); +void *erofs_pop_queue(struct erofs_queue *q); +void erofs_destroy_queue(struct erofs_queue *q); + +#endif \ No newline at end of file diff --git a/lib/Makefile.am b/lib/Makefile.am index b3bea74..e4b7096 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -55,5 +55,5 @@ liberofs_la_SOURCES += compressor_libdeflate.c endif if ENABLE_EROFS_MT liberofs_la_LDFLAGS = -lpthread -liberofs_la_SOURCES += workqueue.c +liberofs_la_SOURCES += workqueue.c queue.c endif diff --git a/lib/compress.c b/lib/compress.c index 8d88dd1..9eb40b5 100644 --- a/lib/compress.c +++ b/lib/compress.c @@ -7,6 +7,7 @@ */ #ifndef _LARGEFILE64_SOURCE #define _LARGEFILE64_SOURCE +#include "erofs/internal.h" #endif #include <string.h> #include <stdlib.h> @@ -84,6 +85,7 @@ struct erofs_compress_work { struct erofs_work work; struct z_erofs_compress_sctx ctx; struct erofs_compress_work *next; + struct z_erofs_mt_file *mtfile_desc; unsigned int alg_id; char *alg_name; @@ -95,14 +97,14 @@ struct erofs_compress_work { static struct { struct erofs_workqueue wq; - struct erofs_compress_work *idle; - pthread_mutex_t mutex; - pthread_cond_t cond; - int nfini; + struct erofs_compress_work *work_idle; + pthread_mutex_t work_mutex; + struct z_erofs_mt_file *file_idle; + pthread_mutex_t file_mutex; } z_erofs_mt_ctrl; #endif -static bool z_erofs_mt_enabled; +bool z_erofs_mt_enabled; #define Z_EROFS_LEGACY_MAP_HEADER_SIZE Z_EROFS_FULL_INDEX_ALIGN(0) @@ -1022,6 +1024,90 @@ int z_erofs_compress_segment(struct z_erofs_compress_sctx *ctx, return 0; } +int z_erofs_finalize_compression(struct z_erofs_compress_ictx *ictx, + struct erofs_buffer_head *bh, + erofs_blk_t blkaddr, + erofs_blk_t compressed_blocks) +{ + struct erofs_inode *inode = ictx->inode; + struct erofs_sb_info *sbi = inode->sbi; + u8 *compressmeta = ictx->metacur - Z_EROFS_LEGACY_MAP_HEADER_SIZE; + unsigned int legacymetasize; + int ret = 0; + + /* fall back to no compression mode */ + DBG_BUGON(compressed_blocks < !!inode->idata_size); + compressed_blocks -= !!inode->idata_size; + + z_erofs_write_indexes(ictx); + legacymetasize = ictx->metacur - compressmeta; + /* estimate if data compression saves space or not */ + if (!inode->fragment_size && + compressed_blocks * erofs_blksiz(sbi) + inode->idata_size + + legacymetasize >= inode->i_size) { + z_erofs_dedupe_commit(true); + + if (inode->idata) { + free(inode->idata); + inode->idata = NULL; + } + erofs_bdrop(bh, true); /* revoke buffer */ + free(compressmeta); + inode->compressmeta = NULL; + + return -ENOSPC; + } + z_erofs_dedupe_commit(false); + z_erofs_write_mapheader(inode, compressmeta); + + if (!ictx->fragemitted) + sbi->saved_by_deduplication += inode->fragment_size; + + /* if the entire file is a fragment, a simplified form is used. */ + if (inode->i_size <= inode->fragment_size) { + DBG_BUGON(inode->i_size < inode->fragment_size); + DBG_BUGON(inode->fragmentoff >> 63); + *(__le64 *)compressmeta = + cpu_to_le64(inode->fragmentoff | 1ULL << 63); + inode->datalayout = EROFS_INODE_COMPRESSED_FULL; + legacymetasize = Z_EROFS_LEGACY_MAP_HEADER_SIZE; + } + + if (compressed_blocks) { + ret = erofs_bh_balloon(bh, erofs_pos(sbi, compressed_blocks)); + DBG_BUGON(ret != erofs_blksiz(sbi)); + } else { + if (!cfg.c_fragments && !cfg.c_dedupe) + DBG_BUGON(!inode->idata_size); + } + + erofs_info("compressed %s (%llu bytes) into %u blocks", + inode->i_srcpath, (unsigned long long)inode->i_size, + compressed_blocks); + + if (inode->idata_size) { + bh->op = &erofs_skip_write_bhops; + inode->bh_data = bh; + } else { + erofs_bdrop(bh, false); + } + + inode->u.i_blocks = compressed_blocks; + + if (inode->datalayout == EROFS_INODE_COMPRESSED_FULL) { + inode->extent_isize = legacymetasize; + } else { + ret = z_erofs_convert_to_compacted_format(inode, blkaddr, + legacymetasize, + compressmeta); + DBG_BUGON(ret); + } + inode->compressmeta = compressmeta; + if (!erofs_is_packed_inode(inode)) + erofs_droid_blocklist_write(inode, blkaddr, compressed_blocks); + return 0; +} + #ifdef EROFS_MT_ENABLED void *z_erofs_mt_wq_tls_alloc(struct erofs_workqueue *wq, void *ptr) { @@ -1096,6 +1182,7 @@ void z_erofs_mt_workfn(struct erofs_work *work, void *tlsp) struct erofs_compress_work *cwork = (struct erofs_compress_work *)work; struct erofs_compress_wq_tls *tls = tlsp; struct z_erofs_compress_sctx *sctx = &cwork->ctx; + struct z_erofs_mt_file *mtfile_desc = cwork->mtfile_desc; struct erofs_sb_info *sbi = sctx->ictx->inode->sbi; int ret = 0; @@ -1121,10 +1208,10 @@ void z_erofs_mt_workfn(struct erofs_work *work, void *tlsp) out: cwork->errcode = ret; - pthread_mutex_lock(&z_erofs_mt_ctrl.mutex); - ++z_erofs_mt_ctrl.nfini; - pthread_cond_signal(&z_erofs_mt_ctrl.cond); - pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex); + pthread_mutex_lock(&mtfile_desc->mutex); + ++mtfile_desc->nfini; + pthread_cond_signal(&mtfile_desc->cond); + pthread_mutex_unlock(&mtfile_desc->mutex); } int z_erofs_merge_segment(struct z_erofs_compress_ictx *ictx, @@ -1158,27 +1245,49 @@ int z_erofs_merge_segment(struct z_erofs_compress_ictx *ictx, } int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx, - struct erofs_compress_cfg *ccfg, - erofs_blk_t blkaddr, - erofs_blk_t *compressed_blocks) + struct erofs_compress_cfg *ccfg) { struct erofs_compress_work *cur, *head = NULL, **last = &head; struct erofs_inode *inode = ictx->inode; + struct z_erofs_mt_file *mtfile_desc; int nsegs = DIV_ROUND_UP(inode->i_size, cfg.c_segment_size); - int ret, i; + int i; + + pthread_mutex_lock(&z_erofs_mt_ctrl.file_mutex); + if (z_erofs_mt_ctrl.file_idle) { + mtfile_desc = z_erofs_mt_ctrl.file_idle; + z_erofs_mt_ctrl.file_idle = mtfile_desc->next; + mtfile_desc->next = NULL; + pthread_mutex_unlock(&z_erofs_mt_ctrl.file_mutex); + } else { + pthread_mutex_unlock(&z_erofs_mt_ctrl.file_mutex); + mtfile_desc = calloc(1, sizeof(*mtfile_desc)); + if (!mtfile_desc) + goto err_free_ictx; + } + inode->mt_desc = mtfile_desc; - z_erofs_mt_ctrl.nfini = 0; + mtfile_desc->fd = ictx->fd; + mtfile_desc->total = nsegs; + mtfile_desc->nfini = 0; + pthread_mutex_init(&mtfile_desc->mutex, NULL); + pthread_cond_init(&mtfile_desc->cond, NULL); for (i = 0; i < nsegs; i++) { - if (z_erofs_mt_ctrl.idle) { - cur = z_erofs_mt_ctrl.idle; - z_erofs_mt_ctrl.idle = cur->next; + pthread_mutex_lock(&z_erofs_mt_ctrl.work_mutex); + if (z_erofs_mt_ctrl.work_idle) { + cur = z_erofs_mt_ctrl.work_idle; + z_erofs_mt_ctrl.work_idle = cur->next; cur->next = NULL; + pthread_mutex_unlock(&z_erofs_mt_ctrl.work_mutex); } else { + pthread_mutex_unlock(&z_erofs_mt_ctrl.work_mutex); cur = calloc(1, sizeof(*cur)); if (!cur) - return -ENOMEM; + goto err_free_cwork; } + if (i == 0) + mtfile_desc->head = cur; *last = cur; last = &cur->next; @@ -1202,21 +1311,40 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx, cur->comp_level = ccfg->handle.compression_level; cur->dict_size = ccfg->handle.dict_size; + cur->mtfile_desc = mtfile_desc; cur->work.fn = z_erofs_mt_workfn; erofs_queue_work(&z_erofs_mt_ctrl.wq, &cur->work); } - pthread_mutex_lock(&z_erofs_mt_ctrl.mutex); - while (z_erofs_mt_ctrl.nfini != nsegs) - pthread_cond_wait(&z_erofs_mt_ctrl.cond, - &z_erofs_mt_ctrl.mutex); - pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex); + return 0; - ret = 0; +err_free_cwork: while (head) { cur = head; head = cur->next; + free(cur); + } + free(mtfile_desc); +err_free_ictx: + free(ictx); + return -ENOMEM; +} + +int z_erofs_mt_reap(struct z_erofs_mt_file *desc) { + struct erofs_buffer_head *bh = NULL; + struct erofs_compress_work *cur = desc->head, *tmp; + struct z_erofs_compress_ictx *ictx = cur->ctx.ictx; + erofs_blk_t blkaddr, compressed_blocks = 0; + int ret = 0; + + bh = erofs_balloc(DATA, 0, 0, 0); + if (IS_ERR(bh)) { + ret = PTR_ERR(bh); + goto out; + } + blkaddr = erofs_mapbh(bh->block); + while (cur) { if (cur->errcode) { ret = cur->errcode; } else { @@ -1227,13 +1355,30 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx, if (ret2) ret = ret2; - *compressed_blocks += cur->ctx.blkaddr - blkaddr; + compressed_blocks += cur->ctx.blkaddr - blkaddr; blkaddr = cur->ctx.blkaddr; } - cur->next = z_erofs_mt_ctrl.idle; - z_erofs_mt_ctrl.idle = cur; + tmp = cur->next; + pthread_mutex_lock(&z_erofs_mt_ctrl.work_mutex); + cur->next = z_erofs_mt_ctrl.work_idle; + z_erofs_mt_ctrl.work_idle = cur; + pthread_mutex_unlock(&z_erofs_mt_ctrl.work_mutex); + cur = tmp; } + if (ret) + goto out; + + ret = z_erofs_finalize_compression( + ictx, bh, blkaddr - compressed_blocks, compressed_blocks); + +out: + free(ictx); + pthread_mutex_lock(&z_erofs_mt_ctrl.file_mutex); + desc->next = z_erofs_mt_ctrl.file_idle; + z_erofs_mt_ctrl.file_idle = desc; + pthread_mutex_unlock(&z_erofs_mt_ctrl.file_mutex); + return ret; } #endif @@ -1246,9 +1391,7 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd) static struct z_erofs_compress_sctx sctx; struct erofs_compress_cfg *ccfg; erofs_blk_t blkaddr, compressed_blocks = 0; - unsigned int legacymetasize; int ret; - bool ismt = false; struct erofs_sb_info *sbi = inode->sbi; u8 *compressmeta = malloc(BLK_ROUND_UP(sbi, inode->i_size) * sizeof(struct z_erofs_lcluster_index) + @@ -1257,11 +1400,17 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd) if (!compressmeta) return -ENOMEM; - /* allocate main data buffer */ - bh = erofs_balloc(DATA, 0, 0, 0); - if (IS_ERR(bh)) { - ret = PTR_ERR(bh); - goto err_free_meta; + if (!z_erofs_mt_enabled) { + /* allocate main data buffer */ + bh = erofs_balloc(DATA, 0, 0, 0); + if (IS_ERR(bh)) { + ret = PTR_ERR(bh); + goto err_free_meta; + } + blkaddr = erofs_mapbh(bh->block); /* start_blkaddr */ + } else { + bh = NULL; + blkaddr = EROFS_NULL_ADDR; } /* initialize per-file compression setting */ @@ -1310,7 +1459,6 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd) goto err_bdrop; } - blkaddr = erofs_mapbh(bh->block); /* start_blkaddr */ ctx.inode = inode; ctx.pclustersize = z_erofs_get_max_pclustersize(inode); ctx.metacur = compressmeta + Z_EROFS_LEGACY_MAP_HEADER_SIZE; @@ -1327,11 +1475,22 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd) if (ret) goto err_free_idata; #ifdef EROFS_MT_ENABLED - } else if (z_erofs_mt_enabled && inode->i_size > cfg.c_segment_size) { - ismt = true; - ret = z_erofs_mt_compress(&ctx, ccfg, blkaddr, &compressed_blocks); + } else if (z_erofs_mt_enabled) { + struct z_erofs_compress_ictx *l_ictx; + + l_ictx = malloc(sizeof(*l_ictx)); + if (!l_ictx) { + ret = -ENOMEM; + goto err_free_idata; + } + + memcpy(l_ictx, &ctx, sizeof(*l_ictx)); + init_list_head(&l_ictx->extents); + + ret = z_erofs_mt_compress(l_ictx, ccfg); if (ret) goto err_free_idata; + return 0; #endif } else { sctx.queue = g_queue; @@ -1348,10 +1507,6 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd) compressed_blocks = sctx.blkaddr - blkaddr; } - /* fall back to no compression mode */ - DBG_BUGON(compressed_blocks < !!inode->idata_size); - compressed_blocks -= !!inode->idata_size; - /* generate an extent for the deduplicated fragment */ if (inode->fragment_size && !ctx.fragemitted) { struct z_erofs_extent_item *ei; @@ -1373,69 +1528,10 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd) z_erofs_commit_extent(&sctx, ei); } z_erofs_fragments_commit(inode); + list_splice_tail(&sctx.extents, &ctx.extents); - if (!ismt) - list_splice_tail(&sctx.extents, &ctx.extents); - - z_erofs_write_indexes(&ctx); - legacymetasize = ctx.metacur - compressmeta; - /* estimate if data compression saves space or not */ - if (!inode->fragment_size && - compressed_blocks * erofs_blksiz(sbi) + inode->idata_size + - legacymetasize >= inode->i_size) { - z_erofs_dedupe_commit(true); - ret = -ENOSPC; - goto err_free_idata; - } - z_erofs_dedupe_commit(false); - z_erofs_write_mapheader(inode, compressmeta); - - if (!ctx.fragemitted) - sbi->saved_by_deduplication += inode->fragment_size; - - /* if the entire file is a fragment, a simplified form is used. */ - if (inode->i_size <= inode->fragment_size) { - DBG_BUGON(inode->i_size < inode->fragment_size); - DBG_BUGON(inode->fragmentoff >> 63); - *(__le64 *)compressmeta = - cpu_to_le64(inode->fragmentoff | 1ULL << 63); - inode->datalayout = EROFS_INODE_COMPRESSED_FULL; - legacymetasize = Z_EROFS_LEGACY_MAP_HEADER_SIZE; - } - - if (compressed_blocks) { - ret = erofs_bh_balloon(bh, erofs_pos(sbi, compressed_blocks)); - DBG_BUGON(ret != erofs_blksiz(sbi)); - } else { - if (!cfg.c_fragments && !cfg.c_dedupe) - DBG_BUGON(!inode->idata_size); - } - - erofs_info("compressed %s (%llu bytes) into %u blocks", - inode->i_srcpath, (unsigned long long)inode->i_size, - compressed_blocks); - - if (inode->idata_size) { - bh->op = &erofs_skip_write_bhops; - inode->bh_data = bh; - } else { - erofs_bdrop(bh, false); - } - - inode->u.i_blocks = compressed_blocks; - - if (inode->datalayout == EROFS_INODE_COMPRESSED_FULL) { - inode->extent_isize = legacymetasize; - } else { - ret = z_erofs_convert_to_compacted_format(inode, blkaddr, - legacymetasize, - compressmeta); - DBG_BUGON(ret); - } - inode->compressmeta = compressmeta; - if (!erofs_is_packed_inode(inode)) - erofs_droid_blocklist_write(inode, blkaddr, compressed_blocks); - return 0; + return z_erofs_finalize_compression(&ctx, bh, blkaddr, + compressed_blocks); err_free_idata: if (inode->idata) { @@ -1443,7 +1539,8 @@ err_free_idata: inode->idata = NULL; } err_bdrop: - erofs_bdrop(bh, true); /* revoke buffer */ + if (bh) + erofs_bdrop(bh, true); /* revoke buffer */ err_free_meta: free(compressmeta); inode->compressmeta = NULL; @@ -1594,8 +1691,6 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s z_erofs_mt_enabled = false; #ifdef EROFS_MT_ENABLED if (cfg.c_mt_workers > 1) { - pthread_mutex_init(&z_erofs_mt_ctrl.mutex, NULL); - pthread_cond_init(&z_erofs_mt_ctrl.cond, NULL); ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.wq, cfg.c_mt_workers, cfg.c_mt_workers << 2, @@ -1622,11 +1717,17 @@ int z_erofs_compress_exit(void) ret = erofs_destroy_workqueue(&z_erofs_mt_ctrl.wq); if (ret) return ret; - while (z_erofs_mt_ctrl.idle) { + while (z_erofs_mt_ctrl.work_idle) { struct erofs_compress_work *tmp = - z_erofs_mt_ctrl.idle->next; - free(z_erofs_mt_ctrl.idle); - z_erofs_mt_ctrl.idle = tmp; + z_erofs_mt_ctrl.work_idle->next; + free(z_erofs_mt_ctrl.work_idle); + z_erofs_mt_ctrl.work_idle = tmp; + } + while (z_erofs_mt_ctrl.file_idle) { + struct z_erofs_mt_file *tmp = + z_erofs_mt_ctrl.file_idle->next; + free(z_erofs_mt_ctrl.file_idle); + z_erofs_mt_ctrl.file_idle = tmp; } #endif } diff --git a/lib/inode.c b/lib/inode.c index 8460344..d7ef444 100644 --- a/lib/inode.c +++ b/lib/inode.c @@ -27,8 +27,13 @@ #include "erofs/compress_hints.h" #include "erofs/blobchunk.h" #include "erofs/fragments.h" +#ifdef EROFS_MT_ENABLED +#include "erofs/queue.h" +#endif #include "liberofs_private.h" +extern bool z_erofs_mt_enabled; + #define S_SHIFT 12 static unsigned char erofs_ftype_by_mode[S_IFMT >> S_SHIFT] = { [S_IFREG >> S_SHIFT] = EROFS_FT_REG_FILE, @@ -1036,6 +1041,9 @@ struct erofs_inode *erofs_new_inode(void) inode->i_ino[0] = sbi.inos++; /* inode serial number */ inode->i_count = 1; inode->datalayout = EROFS_INODE_FLAT_PLAIN; +#ifdef EROFS_MT_ENABLED + inode->mt_desc = NULL; +#endif init_list_head(&inode->i_hash); init_list_head(&inode->i_subdirs); @@ -1100,6 +1108,10 @@ static void erofs_fixup_meta_blkaddr(struct erofs_inode *rootdir) rootdir->nid = (off - meta_offset) >> EROFS_ISLOTBITS; } +#ifdef EROFS_MT_ENABLED +#define EROFS_MT_QUEUE_SIZE 256 +struct erofs_queue *z_erofs_mt_queue; +#endif static int erofs_mkfs_handle_symlink(struct erofs_inode *inode) { @@ -1143,14 +1155,69 @@ static int erofs_mkfs_handle_file(struct erofs_inode *inode) return 0; } +static int erofs_mkfs_issue_compress(struct erofs_inode *inode) +{ + if (!inode->i_size) + return 0; + + if (!S_ISLNK(inode->i_mode) && cfg.c_compr_opts[0].alg && + erofs_file_is_compressible(inode)) { + int fd = open(inode->i_srcpath, O_RDONLY | O_BINARY); + if (fd < 0) + return -errno; + return erofs_write_compressed_file(inode, fd); + } + + return 0; +} + static int erofs_mkfs_handle_dir(struct erofs_inode *dir, - struct list_head *dirs) + struct list_head *dirs, bool ismt) { int ret; DIR *_dir; struct dirent *dp; struct erofs_dentry *d; - unsigned int nr_subdirs = 0, i_nlink; + unsigned int nr_subdirs, i_nlink; + + ret = erofs_scan_file_xattrs(dir); + if (ret < 0) + return ret; + + ret = erofs_prepare_xattr_ibody(dir); + if (ret < 0) + return ret; + + if (!S_ISDIR(dir->i_mode)) { + if (S_ISLNK(dir->i_mode)) { + char *const symlink = malloc(dir->i_size); + + if (!symlink) + return -ENOMEM; + ret = readlink(dir->i_srcpath, symlink, dir->i_size); + if (ret < 0) { + free(symlink); + return -errno; + } + ret = erofs_write_file_from_buffer(dir, symlink); + free(symlink); + } else if (dir->i_size) { + int fd = open(dir->i_srcpath, O_RDONLY | O_BINARY); + if (fd < 0) + return -errno; + + ret = erofs_write_file(dir, fd, 0); + close(fd); + } else { + ret = 0; + } + if (ret) + return ret; + + erofs_prepare_inode_buffer(dir); + erofs_write_tail_end(dir); + return 0; + } _dir = opendir(dir->i_srcpath); if (!_dir) { @@ -1195,13 +1262,15 @@ static int erofs_mkfs_handle_dir(struct erofs_inode *dir, if (ret) return ret; - ret = erofs_prepare_inode_buffer(dir); - if (ret) - return ret; - dir->bh->op = &erofs_skip_write_bhops; + if (!ismt) { + ret = erofs_prepare_inode_buffer(dir); + if (ret) + return ret; + dir->bh->op = &erofs_skip_write_bhops; - if (IS_ROOT(dir)) - erofs_fixup_meta_blkaddr(dir); + if (IS_ROOT(dir)) + erofs_fixup_meta_blkaddr(dir); + } i_nlink = 0; list_for_each_entry(d, &dir->i_subdirs, d_child) { @@ -1286,7 +1355,7 @@ static void erofs_mkfs_dumpdir(struct erofs_inode *dumpdir) } static int erofs_mkfs_build_tree(struct erofs_inode *dir, - struct list_head *dirs) + struct list_head *dirs, bool ismt) { int ret; @@ -1299,12 +1368,15 @@ static int erofs_mkfs_build_tree(struct erofs_inode *dir, return ret; if (S_ISDIR(dir->i_mode)) - return erofs_mkfs_handle_dir(dir, dirs); + return erofs_mkfs_handle_dir(dir, dirs, ismt); + else if (ismt) + return erofs_mkfs_issue_compress(dir); else return erofs_mkfs_handle_file(dir); } -struct erofs_inode *erofs_mkfs_build_tree_from_path(const char *path) +struct erofs_inode *__erofs_mkfs_build_tree_from_path(const char *path, + bool ismt) { LIST_HEAD(dirs); struct erofs_inode *inode, *root, *dumpdir; @@ -1325,23 +1397,163 @@ struct erofs_inode *erofs_mkfs_build_tree_from_path(const char *path) list_del(&inode->i_subdirs); init_list_head(&inode->i_subdirs); - erofs_mkfs_print_progressinfo(inode); + if (!ismt) + erofs_mkfs_print_progressinfo(inode); - err = erofs_mkfs_build_tree(inode, &dirs); + err = erofs_mkfs_build_tree(inode, &dirs, ismt); if (err) { root = ERR_PTR(err); break; } + if (!ismt) { + if (S_ISDIR(inode->i_mode)) { + inode->next_dirwrite = dumpdir; + dumpdir = inode; + } else { + erofs_iput(inode); + } +#ifdef EROFS_MT_ENABLED + } else { + erofs_push_queue(z_erofs_mt_queue, &inode); +#endif + } + } while (!list_empty(&dirs)); + + if (!ismt) + erofs_mkfs_dumpdir(dumpdir); +#ifdef EROFS_MT_ENABLED + else + erofs_push_queue(z_erofs_mt_queue, &dumpdir); +#endif + return root; +} + +#ifdef EROFS_MT_ENABLED +pthread_t z_erofs_mt_traverser; + +void *z_erofs_mt_traverse_task(void *path) +{ + pthread_exit((void *)__erofs_mkfs_build_tree_from_path(path, true)); +} + +static int z_erofs_mt_reap_compressed(struct erofs_inode *inode) +{ + struct z_erofs_mt_file *desc = inode->mt_desc; + int fd = desc->fd; + int ret = 0; + + pthread_mutex_lock(&desc->mutex); + while (desc->nfini != desc->total) + pthread_cond_wait(&desc->cond, &desc->mutex); + pthread_mutex_unlock(&desc->mutex); + + ret = z_erofs_mt_reap(desc); + if (ret == -ENOSPC) { + ret = lseek(fd, 0, SEEK_SET); + if (ret < 0) + return -errno; + + ret = write_uncompressed_file_from_fd(inode, fd); + } + + close(fd); + return ret; +} + +static int z_erofs_mt_reap_inodes() +{ + struct erofs_inode *inode, *dumpdir; + int ret = 0; + + dumpdir = NULL; + while (true) { + inode = *(struct erofs_inode **)erofs_pop_queue( + z_erofs_mt_queue); + if (!inode) + break; + + erofs_mkfs_print_progressinfo(inode); + if (S_ISDIR(inode->i_mode)) { + ret = erofs_prepare_inode_buffer(inode); + if (ret) + goto out; + inode->bh->op = &erofs_skip_write_bhops; + + if (IS_ROOT(inode)) + erofs_fixup_meta_blkaddr(inode); + inode->next_dirwrite = dumpdir; dumpdir = inode; + continue; + } + + if (inode->mt_desc) { + ret = z_erofs_mt_reap_compressed(inode); + } else if (S_ISLNK(inode->i_mode)) { + ret = erofs_mkfs_handle_symlink(inode); + } else if (!inode->i_size) { + ret = 0; } else { - erofs_iput(inode); + int fd = open(inode->i_srcpath, O_RDONLY | O_BINARY); + if (fd < 0) + return -errno; + + if (cfg.c_chunkbits) + ret = erofs_write_chunked_file(inode, fd, 0); + else + ret = write_uncompressed_file_from_fd(inode, + fd); + close(fd); } - } while (!list_empty(&dirs)); + if (ret) + goto out; + + erofs_prepare_inode_buffer(inode); + erofs_write_tail_end(inode); + erofs_iput(inode); + } erofs_mkfs_dumpdir(dumpdir); + +out: + return ret; +} +#endif + +struct erofs_inode *erofs_mkfs_build_tree_from_path(const char *path) +{ +#ifdef EROFS_MT_ENABLED + int err; +#endif + struct erofs_inode *root = NULL; + + if (!z_erofs_mt_enabled) + return __erofs_mkfs_build_tree_from_path(path, false); + +#ifdef EROFS_MT_ENABLED + z_erofs_mt_queue = erofs_alloc_queue(EROFS_MT_QUEUE_SIZE, + sizeof(struct erofs_inode *)); + if (IS_ERR(z_erofs_mt_queue)) + return ERR_CAST(z_erofs_mt_queue); + + err = pthread_create(&z_erofs_mt_traverser, NULL, + z_erofs_mt_traverse_task, (void *)path); + if (err) + return ERR_PTR(err); + + err = z_erofs_mt_reap_inodes(); + if (err) + return ERR_PTR(err); + + err = pthread_join(z_erofs_mt_traverser, (void *)&root); + if (err) + return ERR_PTR(err); + + erofs_destroy_queue(z_erofs_mt_queue); +#endif + return root; } diff --git a/lib/queue.c b/lib/queue.c new file mode 100644 index 0000000..f40ed1d --- /dev/null +++ b/lib/queue.c @@ -0,0 +1,64 @@ +// SPDX-License-Identifier: GPL-2.0+ +#include "erofs/err.h" +#include <stdlib.h> +#include "erofs/queue.h" + +struct erofs_queue *erofs_alloc_queue(size_t size, size_t elem_size) +{ + struct erofs_queue *q = malloc(sizeof(*q)); + + pthread_mutex_init(&q->lock, NULL); + pthread_cond_init(&q->empty, NULL); + pthread_cond_init(&q->full, NULL); + + q->size = size; + q->elem_size = elem_size; + q->head = 0; + q->tail = 0; + q->buf = calloc(size, elem_size); + if (!q->buf) + return ERR_PTR(-ENOMEM); + + return q; +} + +void erofs_push_queue(struct erofs_queue *q, void *elem) +{ + pthread_mutex_lock(&q->lock); + + while ((q->tail + 1) % q->size == q->head) + pthread_cond_wait(&q->full, &q->lock); + + memcpy(q->buf + q->tail * q->elem_size, elem, q->elem_size); + q->tail = (q->tail + 1) % q->size; + + pthread_cond_signal(&q->empty); + pthread_mutex_unlock(&q->lock); +} + +void *erofs_pop_queue(struct erofs_queue *q) +{ + void *elem; + + pthread_mutex_lock(&q->lock); + + while (q->head == q->tail) + pthread_cond_wait(&q->empty, &q->lock); + + elem = q->buf + q->head * q->elem_size; + q->head = (q->head + 1) % q->size; + + pthread_cond_signal(&q->full); + pthread_mutex_unlock(&q->lock); + + return elem; +} + +void erofs_destroy_queue(struct erofs_queue *q) +{ + pthread_mutex_destroy(&q->lock); + pthread_cond_destroy(&q->empty); + pthread_cond_destroy(&q->full); + free(q->buf); + free(q); +} \ No newline at end of file -- 2.44.0
This patch splits part of the logic in function erofs_mkfs_build_tree() and erofs_mkfs_build_tree_from_path() into several new functions. This is in preparation for the upcoming inter-file multi-threaded compression feature. Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn> --- lib/inode.c | 161 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 100 insertions(+), 61 deletions(-) diff --git a/lib/inode.c b/lib/inode.c index ac00228..8460344 100644 --- a/lib/inode.c +++ b/lib/inode.c @@ -477,20 +477,24 @@ static int write_uncompressed_file_from_fd(struct erofs_inode *inode, int fd) return 0; } +static int erofs_write_chunked_file(struct erofs_inode *inode, int fd, u64 fpos) +{ + inode->u.chunkbits = cfg.c_chunkbits; + /* chunk indexes when explicitly specified */ + inode->u.chunkformat = 0; + if (cfg.c_force_chunkformat == FORCE_INODE_CHUNK_INDEXES) + inode->u.chunkformat = EROFS_CHUNK_FORMAT_INDEXES; + return erofs_blob_write_chunked_file(inode, fd, fpos); +} + int erofs_write_file(struct erofs_inode *inode, int fd, u64 fpos) { int ret; DBG_BUGON(!inode->i_size); - if (cfg.c_chunkbits) { - inode->u.chunkbits = cfg.c_chunkbits; - /* chunk indexes when explicitly specified */ - inode->u.chunkformat = 0; - if (cfg.c_force_chunkformat == FORCE_INODE_CHUNK_INDEXES) - inode->u.chunkformat = EROFS_CHUNK_FORMAT_INDEXES; - return erofs_blob_write_chunked_file(inode, fd, fpos); - } + if (cfg.c_chunkbits) + return erofs_write_chunked_file(inode, fd, fpos); if (cfg.c_compr_opts[0].alg && erofs_file_is_compressible(inode)) { ret = erofs_write_compressed_file(inode, fd); @@ -1096,52 +1100,57 @@ static void erofs_fixup_meta_blkaddr(struct erofs_inode *rootdir) rootdir->nid = (off - meta_offset) >> EROFS_ISLOTBITS; } -static int erofs_mkfs_build_tree(struct erofs_inode *dir, struct list_head *dirs) -{ - int ret; - DIR *_dir; - struct dirent *dp; - struct erofs_dentry *d; - unsigned int nr_subdirs, i_nlink; - ret = erofs_scan_file_xattrs(dir); - if (ret < 0) - return ret; +static int erofs_mkfs_handle_symlink(struct erofs_inode *inode) +{ + int ret = 0; + char *const symlink = malloc(inode->i_size); - ret = erofs_prepare_xattr_ibody(dir); - if (ret < 0) - return ret; + if (!symlink) + return -ENOMEM; + ret = readlink(inode->i_srcpath, symlink, inode->i_size); + if (ret < 0) { + free(symlink); + return -errno; + } + ret = erofs_write_file_from_buffer(inode, symlink); + free(symlink); - if (!S_ISDIR(dir->i_mode)) { - if (S_ISLNK(dir->i_mode)) { - char *const symlink = malloc(dir->i_size); + return ret; +} - if (!symlink) - return -ENOMEM; - ret = readlink(dir->i_srcpath, symlink, dir->i_size); - if (ret < 0) { - free(symlink); - return -errno; - } - ret = erofs_write_file_from_buffer(dir, symlink); - free(symlink); - } else if (dir->i_size) { - int fd = open(dir->i_srcpath, O_RDONLY | O_BINARY); - if (fd < 0) - return -errno; +static int erofs_mkfs_handle_file(struct erofs_inode *inode) +{ + int ret = 0; - ret = erofs_write_file(dir, fd, 0); - close(fd); - } else { - ret = 0; - } - if (ret) - return ret; + if (S_ISLNK(inode->i_mode)) { + ret = erofs_mkfs_handle_symlink(inode); + } else if (inode->i_size) { + int fd = open(inode->i_srcpath, O_RDONLY | O_BINARY); + if (fd < 0) + return -errno; - erofs_prepare_inode_buffer(dir); - erofs_write_tail_end(dir); - return 0; + ret = erofs_write_file(inode, fd, 0); + close(fd); + } else { + ret = 0; } + if (ret) + return ret; + + erofs_prepare_inode_buffer(inode); + erofs_write_tail_end(inode); + return 0; +} + +static int erofs_mkfs_handle_dir(struct erofs_inode *dir, + struct list_head *dirs) +{ + int ret; + DIR *_dir; + struct dirent *dp; + struct erofs_dentry *d; + unsigned int nr_subdirs = 0, i_nlink; _dir = opendir(dir->i_srcpath); if (!_dir) { @@ -1253,6 +1262,48 @@ err_closedir: return ret; } +static void erofs_mkfs_print_progressinfo(struct erofs_inode *inode) +{ + char *trimmed; + + trimmed = erofs_trim_for_progressinfo(erofs_fspath(inode->i_srcpath), + sizeof("Processing ...") - 1); + erofs_update_progressinfo("Processing %s ...", trimmed); + free(trimmed); +} + +static void erofs_mkfs_dumpdir(struct erofs_inode *dumpdir) +{ + struct erofs_inode *inode; + while (dumpdir) { + inode = dumpdir; + erofs_write_dir_file(inode); + erofs_write_tail_end(inode); + inode->bh->op = &erofs_write_inode_bhops; + dumpdir = inode->next_dirwrite; + erofs_iput(inode); + } +} + +static int erofs_mkfs_build_tree(struct erofs_inode *dir, + struct list_head *dirs) +{ + int ret; + + ret = erofs_scan_file_xattrs(dir); + if (ret < 0) + return ret; + + ret = erofs_prepare_xattr_ibody(dir); + if (ret < 0) + return ret; + + if (S_ISDIR(dir->i_mode)) + return erofs_mkfs_handle_dir(dir, dirs); + else + return erofs_mkfs_handle_file(dir); +} + struct erofs_inode *erofs_mkfs_build_tree_from_path(const char *path) { LIST_HEAD(dirs); @@ -1269,17 +1320,12 @@ struct erofs_inode *erofs_mkfs_build_tree_from_path(const char *path) dumpdir = NULL; do { int err; - char *trimmed; inode = list_first_entry(&dirs, struct erofs_inode, i_subdirs); list_del(&inode->i_subdirs); init_list_head(&inode->i_subdirs); - trimmed = erofs_trim_for_progressinfo( - erofs_fspath(inode->i_srcpath), - sizeof("Processing ...") - 1); - erofs_update_progressinfo("Processing %s ...", trimmed); - free(trimmed); + erofs_mkfs_print_progressinfo(inode); err = erofs_mkfs_build_tree(inode, &dirs); if (err) { @@ -1295,14 +1341,7 @@ struct erofs_inode *erofs_mkfs_build_tree_from_path(const char *path) } } while (!list_empty(&dirs)); - while (dumpdir) { - inode = dumpdir; - erofs_write_dir_file(inode); - erofs_write_tail_end(inode); - inode->bh->op = &erofs_write_inode_bhops; - dumpdir = inode->next_dirwrite; - erofs_iput(inode); - } + erofs_mkfs_dumpdir(dumpdir); return root; } -- 2.44.0
This patchset introduces inter-file multi-threaded compression. Yifan Zhao (2): erofs-utils: lib: split function logic in inode.c erofs-utils: mkfs: introduce inter-file multi-threaded compression include/erofs/compress.h | 16 ++ include/erofs/internal.h | 3 + include/erofs/list.h | 8 + include/erofs/queue.h | 22 +++ lib/Makefile.am | 2 +- lib/compress.c | 323 +++++++++++++++++++++++++-------------- lib/inode.c | 311 +++++++++++++++++++++++++++++++++---- lib/queue.c | 64 ++++++++ 8 files changed, 607 insertions(+), 142 deletions(-) create mode 100644 include/erofs/queue.h create mode 100644 lib/queue.c -- 2.44.0
On Sun, Mar 17, 2024 at 02:45:09PM +0800, Yifan Zhao wrote: > If a segment is smaller than the block size, sizeof(sctx->membuf) should > be at least as large as the block size, as memory write into the buffer > is done in block size. > > Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn> > --- Folded the following diff into the original patch: diff --git a/lib/compress.c b/lib/compress.c index aeb7013..8d88dd1 100644 --- a/lib/compress.c +++ b/lib/compress.c @@ -1096,11 +1096,11 @@ void z_erofs_mt_workfn(struct erofs_work *work, void *tlsp) struct erofs_compress_work *cwork = (struct erofs_compress_work *)work; struct erofs_compress_wq_tls *tls = tlsp; struct z_erofs_compress_sctx *sctx = &cwork->ctx; + struct erofs_sb_info *sbi = sctx->ictx->inode->sbi; int ret = 0; - ret = z_erofs_mt_wq_tls_init_compr(sctx->ictx->inode->sbi, tls, - cwork->alg_id, cwork->alg_name, - cwork->comp_level, + ret = z_erofs_mt_wq_tls_init_compr(sbi, tls, cwork->alg_id, + cwork->alg_name, cwork->comp_level, cwork->dict_size); if (ret) goto out; @@ -1109,7 +1109,7 @@ void z_erofs_mt_workfn(struct erofs_work *work, void *tlsp) sctx->destbuf = tls->destbuf; sctx->chandle = &tls->ccfg[cwork->alg_id].handle; - sctx->membuf = malloc(sctx->remaining); + sctx->membuf = malloc(round_up(sctx->remaining, erofs_blksiz(sbi))); if (!sctx->membuf) { ret = -ENOMEM; goto out; -- 2.30.2
If a segment is smaller than the block size, sizeof(sctx->membuf) should be at least as large as the block size, as memory write into the buffer is done in block size. Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn> --- lib/compress.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/compress.c b/lib/compress.c index aeb7013..67a86db 100644 --- a/lib/compress.c +++ b/lib/compress.c @@ -1096,11 +1096,12 @@ void z_erofs_mt_workfn(struct erofs_work *work, void *tlsp) struct erofs_compress_work *cwork = (struct erofs_compress_work *)work; struct erofs_compress_wq_tls *tls = tlsp; struct z_erofs_compress_sctx *sctx = &cwork->ctx; + struct erofs_sb_info *sbi = sctx->ictx->inode->sbi; + erofs_off_t blksz = erofs_blksiz(sbi); int ret = 0; - ret = z_erofs_mt_wq_tls_init_compr(sctx->ictx->inode->sbi, tls, - cwork->alg_id, cwork->alg_name, - cwork->comp_level, + ret = z_erofs_mt_wq_tls_init_compr(sbi, tls, cwork->alg_id, + cwork->alg_name, cwork->comp_level, cwork->dict_size); if (ret) goto out; @@ -1109,7 +1110,7 @@ void z_erofs_mt_workfn(struct erofs_work *work, void *tlsp) sctx->destbuf = tls->destbuf; sctx->chandle = &tls->ccfg[cwork->alg_id].handle; - sctx->membuf = malloc(sctx->remaining); + sctx->membuf = malloc(max(blksz, sctx->remaining)); if (!sctx->membuf) { ret = -ENOMEM; goto out; -- 2.44.0
On 2024/3/15 15:18, Noboru Asai wrote:
> I think it is easier to understand the source code if the names of
> variables and pointers in the same structure are unified.
>
> struct z_erofs_compress_ictx inode_ctx; // i stands for inode?
> struct z_erofs_compress_ictx *ictx;
>
> struct z_erofs_compress_sctx seg_ctx;
> struct z_erofs_compress_sctx *sctx;
"ictx" means "inode context", "sctx" means "segment context".
If there is no confusion, "ctx" naming can be used in a function.
Also I tend to avoid making a huge diff to make ctx->"sctx or
ictx", otherwise "git blame" will be in a mess.
Thanks,
Gao Xiang
I think it is easier to understand the source code if the names of
variables and pointers in the same structure are unified.
struct z_erofs_compress_ictx inode_ctx; // i stands for inode?
struct z_erofs_compress_ictx *ictx;
struct z_erofs_compress_sctx seg_ctx;
struct z_erofs_compress_sctx *sctx;
2024年3月15日(金) 10:11 Gao Xiang <hsiangkao@linux.alibaba.com>:
>
> From: Yifan Zhao <zhaoyifan@sjtu.edu.cn>
>
> Currently, the creation of EROFS compressed image creation is
> single-threaded, which suffers from performance issues. This patch
> attempts to address it by compressing the large file in parallel.
>
> Specifically, each input file larger than 16MB is splited into segments,
> and each worker thread compresses a segment as if it were a separate
> file. Finally, the main thread merges all the compressed segments.
>
> Multi-threaded compression is not compatible with -Ededupe,
> -E(all-)fragments and -Eztailpacking for now.
>
> Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn>
> Co-authored-by: Tong Xin <xin_tong@sjtu.edu.cn>
> Signed-off-by: Gao Xiang <hsiangkao@linux.alibaba.com>
> ---
> v7:
> - support -Eztailpacking;
> - wq_private -> wq_tls;
> - minor updates.
>
> include/erofs/compress.h | 3 +-
> lib/compress.c | 548 ++++++++++++++++++++++++++++++++-------
> lib/compressor.c | 2 +
> mkfs/main.c | 8 +-
> 4 files changed, 464 insertions(+), 97 deletions(-)
>
> diff --git a/include/erofs/compress.h b/include/erofs/compress.h
> index b3272f7..3253611 100644
> --- a/include/erofs/compress.h
> +++ b/include/erofs/compress.h
> @@ -14,7 +14,8 @@ extern "C"
>
> #include "internal.h"
>
> -#define EROFS_CONFIG_COMPR_MAX_SZ (4000 * 1024)
> +#define EROFS_CONFIG_COMPR_MAX_SZ (4000 * 1024)
> +#define Z_EROFS_COMPR_QUEUE_SZ (EROFS_CONFIG_COMPR_MAX_SZ * 2)
>
> void z_erofs_drop_inline_pcluster(struct erofs_inode *inode);
> int erofs_write_compressed_file(struct erofs_inode *inode, int fd);
> diff --git a/lib/compress.c b/lib/compress.c
> index 4101009..0d796c8 100644
> --- a/lib/compress.c
> +++ b/lib/compress.c
> @@ -20,6 +20,9 @@
> #include "erofs/block_list.h"
> #include "erofs/compress_hints.h"
> #include "erofs/fragments.h"
> +#ifdef EROFS_MT_ENABLED
> +#include "erofs/workqueue.h"
> +#endif
>
> /* compressing configuration specified by users */
> struct erofs_compress_cfg {
> @@ -33,29 +36,77 @@ struct z_erofs_extent_item {
> struct z_erofs_inmem_extent e;
> };
>
> -struct z_erofs_vle_compress_ctx {
> - u8 queue[EROFS_CONFIG_COMPR_MAX_SZ * 2];
> +struct z_erofs_compress_ictx {
> + struct erofs_inode *inode;
> + int fd;
> + unsigned int pclustersize;
> +
> + u32 tof_chksum;
> + bool fix_dedupedfrag;
> + bool fragemitted;
> +
> + /* fields for write indexes */
> + u8 *metacur;
> + struct list_head extents;
> + u16 clusterofs;
> +};
> +
> +struct z_erofs_compress_sctx { /* segment context */
> + struct z_erofs_compress_ictx *ictx;
> +
> + u8 *queue;
> struct list_head extents;
> struct z_erofs_extent_item *pivot;
>
> - struct erofs_inode *inode;
> - struct erofs_compress_cfg *ccfg;
> + struct erofs_compress *chandle;
> + char *destbuf;
>
> - u8 *metacur;
> unsigned int head, tail;
> erofs_off_t remaining;
> - unsigned int pclustersize;
> erofs_blk_t blkaddr; /* pointing to the next blkaddr */
> u16 clusterofs;
>
> - u32 tof_chksum;
> - bool fix_dedupedfrag;
> - bool fragemitted;
> + int seg_num, seg_idx;
> +
> + void *membuf;
> + erofs_off_t memoff;
> +};
> +
> +#ifdef EROFS_MT_ENABLED
> +struct erofs_compress_wq_tls {
> + u8 *queue;
> + char *destbuf;
> + struct erofs_compress_cfg *ccfg;
> };
>
> +struct erofs_compress_work {
> + /* Note: struct erofs_work must be the first member */
> + struct erofs_work work;
> + struct z_erofs_compress_sctx ctx;
> + struct erofs_compress_work *next;
> +
> + unsigned int alg_id;
> + char *alg_name;
> + unsigned int comp_level;
> + unsigned int dict_size;
> +
> + int errcode;
> +};
> +
> +static struct {
> + struct erofs_workqueue wq;
> + struct erofs_compress_work *idle;
> + pthread_mutex_t mutex;
> + pthread_cond_t cond;
> + int nfini;
> +} z_erofs_mt_ctrl;
> +#endif
> +
> +static bool z_erofs_mt_enabled;
> +
> #define Z_EROFS_LEGACY_MAP_HEADER_SIZE Z_EROFS_FULL_INDEX_ALIGN(0)
>
> -static void z_erofs_write_indexes_final(struct z_erofs_vle_compress_ctx *ctx)
> +static void z_erofs_write_indexes_final(struct z_erofs_compress_ictx *ctx)
> {
> const unsigned int type = Z_EROFS_LCLUSTER_TYPE_PLAIN;
> struct z_erofs_lcluster_index di;
> @@ -71,7 +122,7 @@ static void z_erofs_write_indexes_final(struct z_erofs_vle_compress_ctx *ctx)
> ctx->metacur += sizeof(di);
> }
>
> -static void z_erofs_write_extent(struct z_erofs_vle_compress_ctx *ctx,
> +static void z_erofs_write_extent(struct z_erofs_compress_ictx *ctx,
> struct z_erofs_inmem_extent *e)
> {
> struct erofs_inode *inode = ctx->inode;
> @@ -170,7 +221,7 @@ static void z_erofs_write_extent(struct z_erofs_vle_compress_ctx *ctx,
> ctx->clusterofs = clusterofs + count;
> }
>
> -static void z_erofs_write_indexes(struct z_erofs_vle_compress_ctx *ctx)
> +static void z_erofs_write_indexes(struct z_erofs_compress_ictx *ctx)
> {
> struct z_erofs_extent_item *ei, *n;
>
> @@ -184,15 +235,16 @@ static void z_erofs_write_indexes(struct z_erofs_vle_compress_ctx *ctx)
> z_erofs_write_indexes_final(ctx);
> }
>
> -static bool z_erofs_need_refill(struct z_erofs_vle_compress_ctx *ctx)
> +static bool z_erofs_need_refill(struct z_erofs_compress_sctx *ctx)
> {
> const bool final = !ctx->remaining;
> unsigned int qh_aligned, qh_after;
> + struct erofs_inode *inode = ctx->ictx->inode;
>
> if (final || ctx->head < EROFS_CONFIG_COMPR_MAX_SZ)
> return false;
>
> - qh_aligned = round_down(ctx->head, erofs_blksiz(ctx->inode->sbi));
> + qh_aligned = round_down(ctx->head, erofs_blksiz(inode->sbi));
> qh_after = ctx->head - qh_aligned;
> memmove(ctx->queue, ctx->queue + qh_aligned, ctx->tail - qh_aligned);
> ctx->tail -= qh_aligned;
> @@ -204,7 +256,7 @@ static struct z_erofs_extent_item dummy_pivot = {
> .e.length = 0
> };
>
> -static void z_erofs_commit_extent(struct z_erofs_vle_compress_ctx *ctx,
> +static void z_erofs_commit_extent(struct z_erofs_compress_sctx *ctx,
> struct z_erofs_extent_item *ei)
> {
> if (ei == &dummy_pivot)
> @@ -212,14 +264,13 @@ static void z_erofs_commit_extent(struct z_erofs_vle_compress_ctx *ctx,
>
> list_add_tail(&ei->list, &ctx->extents);
> ctx->clusterofs = (ctx->clusterofs + ei->e.length) &
> - (erofs_blksiz(ctx->inode->sbi) - 1);
> -
> + (erofs_blksiz(ctx->ictx->inode->sbi) - 1);
> }
>
> -static int z_erofs_compress_dedupe(struct z_erofs_vle_compress_ctx *ctx,
> +static int z_erofs_compress_dedupe(struct z_erofs_compress_sctx *ctx,
> unsigned int *len)
> {
> - struct erofs_inode *inode = ctx->inode;
> + struct erofs_inode *inode = ctx->ictx->inode;
> const unsigned int lclustermask = (1 << inode->z_logical_clusterbits) - 1;
> struct erofs_sb_info *sbi = inode->sbi;
> struct z_erofs_extent_item *ei = ctx->pivot;
> @@ -315,16 +366,17 @@ out:
> return 0;
> }
>
> -static int write_uncompressed_extent(struct z_erofs_vle_compress_ctx *ctx,
> +static int write_uncompressed_extent(struct z_erofs_compress_sctx *ctx,
> unsigned int len, char *dst)
> {
> - struct erofs_sb_info *sbi = ctx->inode->sbi;
> + struct erofs_inode *inode = ctx->ictx->inode;
> + struct erofs_sb_info *sbi = inode->sbi;
> unsigned int count = min(erofs_blksiz(sbi), len);
> unsigned int interlaced_offset, rightpart;
> int ret;
>
> /* write interlaced uncompressed data if needed */
> - if (ctx->inode->z_advise & Z_EROFS_ADVISE_INTERLACED_PCLUSTER)
> + if (inode->z_advise & Z_EROFS_ADVISE_INTERLACED_PCLUSTER)
> interlaced_offset = ctx->clusterofs;
> else
> interlaced_offset = 0;
> @@ -335,11 +387,17 @@ static int write_uncompressed_extent(struct z_erofs_vle_compress_ctx *ctx,
> memcpy(dst + interlaced_offset, ctx->queue + ctx->head, rightpart);
> memcpy(dst, ctx->queue + ctx->head + rightpart, count - rightpart);
>
> - erofs_dbg("Writing %u uncompressed data to block %u",
> - count, ctx->blkaddr);
> - ret = blk_write(sbi, dst, ctx->blkaddr, 1);
> - if (ret)
> - return ret;
> + if (ctx->membuf) {
> + erofs_dbg("Writing %u uncompressed data to membuf", count);
> + memcpy(ctx->membuf + ctx->memoff, dst, erofs_blksiz(sbi));
> + ctx->memoff += erofs_blksiz(sbi);
> + } else {
> + erofs_dbg("Writing %u uncompressed data to block %u", count,
> + ctx->blkaddr);
> + ret = blk_write(sbi, dst, ctx->blkaddr, 1);
> + if (ret)
> + return ret;
> + }
> return count;
> }
>
> @@ -379,12 +437,12 @@ static int z_erofs_fill_inline_data(struct erofs_inode *inode, void *data,
> return len;
> }
>
> -static void tryrecompress_trailing(struct z_erofs_vle_compress_ctx *ctx,
> +static void tryrecompress_trailing(struct z_erofs_compress_sctx *ctx,
> struct erofs_compress *ec,
> void *in, unsigned int *insize,
> void *out, unsigned int *compressedsize)
> {
> - struct erofs_sb_info *sbi = ctx->inode->sbi;
> + struct erofs_sb_info *sbi = ctx->ictx->inode->sbi;
> static char tmp[Z_EROFS_PCLUSTER_MAX_SIZE];
> unsigned int count;
> int ret = *compressedsize;
> @@ -406,10 +464,11 @@ static void tryrecompress_trailing(struct z_erofs_vle_compress_ctx *ctx,
> *compressedsize = ret;
> }
>
> -static bool z_erofs_fixup_deduped_fragment(struct z_erofs_vle_compress_ctx *ctx,
> +static bool z_erofs_fixup_deduped_fragment(struct z_erofs_compress_sctx *ctx,
> unsigned int len)
> {
> - struct erofs_inode *inode = ctx->inode;
> + struct z_erofs_compress_ictx *ictx = ctx->ictx;
> + struct erofs_inode *inode = ictx->inode;
> struct erofs_sb_info *sbi = inode->sbi;
> const unsigned int newsize = ctx->remaining + len;
>
> @@ -417,9 +476,10 @@ static bool z_erofs_fixup_deduped_fragment(struct z_erofs_vle_compress_ctx *ctx,
>
> /* try to fix again if it gets larger (should be rare) */
> if (inode->fragment_size < newsize) {
> - ctx->pclustersize = min_t(erofs_off_t, z_erofs_get_max_pclustersize(inode),
> - roundup(newsize - inode->fragment_size,
> - erofs_blksiz(sbi)));
> + ictx->pclustersize = min_t(erofs_off_t,
> + z_erofs_get_max_pclustersize(inode),
> + roundup(newsize - inode->fragment_size,
> + erofs_blksiz(sbi)));
> return false;
> }
>
> @@ -436,29 +496,32 @@ static bool z_erofs_fixup_deduped_fragment(struct z_erofs_vle_compress_ctx *ctx,
> return true;
> }
>
> -static int __z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx,
> +static int __z_erofs_compress_one(struct z_erofs_compress_sctx *ctx,
> struct z_erofs_inmem_extent *e)
> {
> - static char dstbuf[EROFS_CONFIG_COMPR_MAX_SZ + EROFS_MAX_BLOCK_SIZE];
> - struct erofs_inode *inode = ctx->inode;
> + static char g_dstbuf[EROFS_CONFIG_COMPR_MAX_SZ + EROFS_MAX_BLOCK_SIZE];
> + char *dstbuf = ctx->destbuf ?: g_dstbuf;
> + struct z_erofs_compress_ictx *ictx = ctx->ictx;
> + struct erofs_inode *inode = ictx->inode;
> struct erofs_sb_info *sbi = inode->sbi;
> unsigned int blksz = erofs_blksiz(sbi);
> char *const dst = dstbuf + blksz;
> - struct erofs_compress *const h = &ctx->ccfg->handle;
> + struct erofs_compress *const h = ctx->chandle;
> unsigned int len = ctx->tail - ctx->head;
> bool is_packed_inode = erofs_is_packed_inode(inode);
> bool final = !ctx->remaining;
> - bool may_packing = (cfg.c_fragments && final && !is_packed_inode);
> + bool may_packing = (cfg.c_fragments && final && !is_packed_inode &&
> + !z_erofs_mt_enabled);
> bool may_inline = (cfg.c_ztailpacking && final && !may_packing);
> unsigned int compressedsize;
> int ret;
>
> - if (len <= ctx->pclustersize) {
> + if (len <= ictx->pclustersize) {
> if (!final || !len)
> return 1;
> if (may_packing) {
> - if (inode->fragment_size && !ctx->fix_dedupedfrag) {
> - ctx->pclustersize = roundup(len, blksz);
> + if (inode->fragment_size && !ictx->fix_dedupedfrag) {
> + ictx->pclustersize = roundup(len, blksz);
> goto fix_dedupedfrag;
> }
> e->length = len;
> @@ -470,7 +533,7 @@ static int __z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx,
>
> e->length = min(len, cfg.c_max_decompressed_extent_bytes);
> ret = erofs_compress_destsize(h, ctx->queue + ctx->head,
> - &e->length, dst, ctx->pclustersize);
> + &e->length, dst, ictx->pclustersize);
> if (ret <= 0) {
> erofs_err("failed to compress %s: %s", inode->i_srcpath,
> erofs_strerror(ret));
> @@ -507,16 +570,16 @@ nocompression:
> e->compressedblks = 1;
> e->raw = true;
> } else if (may_packing && len == e->length &&
> - compressedsize < ctx->pclustersize &&
> - (!inode->fragment_size || ctx->fix_dedupedfrag)) {
> + compressedsize < ictx->pclustersize &&
> + (!inode->fragment_size || ictx->fix_dedupedfrag)) {
> frag_packing:
> ret = z_erofs_pack_fragments(inode, ctx->queue + ctx->head,
> - len, ctx->tof_chksum);
> + len, ictx->tof_chksum);
> if (ret < 0)
> return ret;
> e->compressedblks = 0; /* indicate a fragment */
> e->raw = false;
> - ctx->fragemitted = true;
> + ictx->fragemitted = true;
> /* tailpcluster should be less than 1 block */
> } else if (may_inline && len == e->length && compressedsize < blksz) {
> if (ctx->clusterofs + len <= blksz) {
> @@ -545,8 +608,8 @@ frag_packing:
> */
> if (may_packing && len == e->length &&
> (compressedsize & (blksz - 1)) &&
> - ctx->tail < sizeof(ctx->queue)) {
> - ctx->pclustersize = roundup(compressedsize, blksz);
> + ctx->tail < Z_EROFS_COMPR_QUEUE_SZ) {
> + ictx->pclustersize = roundup(compressedsize, blksz);
> goto fix_dedupedfrag;
> }
>
> @@ -569,34 +632,45 @@ frag_packing:
> }
>
> /* write compressed data */
> - erofs_dbg("Writing %u compressed data to %u of %u blocks",
> - e->length, ctx->blkaddr, e->compressedblks);
> + if (ctx->membuf) {
> + erofs_off_t sz = e->compressedblks * blksz;
> + erofs_dbg("Writing %u compressed data to membuf of %u blocks",
> + e->length, e->compressedblks);
>
> - ret = blk_write(sbi, dst - padding, ctx->blkaddr,
> - e->compressedblks);
> - if (ret)
> - return ret;
> + memcpy(ctx->membuf + ctx->memoff, dst - padding, sz);
> + ctx->memoff += sz;
> + } else {
> + erofs_dbg("Writing %u compressed data to %u of %u blocks",
> + e->length, ctx->blkaddr, e->compressedblks);
> +
> + ret = blk_write(sbi, dst - padding, ctx->blkaddr,
> + e->compressedblks);
> + if (ret)
> + return ret;
> + }
> e->raw = false;
> may_inline = false;
> may_packing = false;
> }
> e->partial = false;
> e->blkaddr = ctx->blkaddr;
> + if (ctx->blkaddr != EROFS_NULL_ADDR)
> + ctx->blkaddr += e->compressedblks;
> if (!may_inline && !may_packing && !is_packed_inode)
> (void)z_erofs_dedupe_insert(e, ctx->queue + ctx->head);
> - ctx->blkaddr += e->compressedblks;
> ctx->head += e->length;
> return 0;
>
> fix_dedupedfrag:
> DBG_BUGON(!inode->fragment_size);
> ctx->remaining += inode->fragment_size;
> - ctx->fix_dedupedfrag = true;
> + ictx->fix_dedupedfrag = true;
> return 1;
> }
>
> -static int z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx)
> +static int z_erofs_compress_one(struct z_erofs_compress_sctx *ctx)
> {
> + struct z_erofs_compress_ictx *ictx = ctx->ictx;
> unsigned int len = ctx->tail - ctx->head;
> struct z_erofs_extent_item *ei;
>
> @@ -624,7 +698,7 @@ static int z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx)
>
> len -= ei->e.length;
> ctx->pivot = ei;
> - if (ctx->fix_dedupedfrag && !ctx->fragemitted &&
> + if (ictx->fix_dedupedfrag && !ictx->fragemitted &&
> z_erofs_fixup_deduped_fragment(ctx, len))
> break;
>
> @@ -912,13 +986,268 @@ void z_erofs_drop_inline_pcluster(struct erofs_inode *inode)
> inode->eof_tailraw = NULL;
> }
>
> +int z_erofs_compress_segment(struct z_erofs_compress_sctx *ctx,
> + u64 offset, erofs_blk_t blkaddr)
> +{
> + int fd = ctx->ictx->fd;
> +
> + ctx->blkaddr = blkaddr;
> + while (ctx->remaining) {
> + const u64 rx = min_t(u64, ctx->remaining,
> + Z_EROFS_COMPR_QUEUE_SZ - ctx->tail);
> + int ret;
> +
> + ret = (offset == -1 ?
> + read(fd, ctx->queue + ctx->tail, rx) :
> + pread(fd, ctx->queue + ctx->tail, rx, offset));
> + if (ret != rx)
> + return -errno;
> +
> + ctx->remaining -= rx;
> + ctx->tail += rx;
> + if (offset != -1)
> + offset += rx;
> +
> + ret = z_erofs_compress_one(ctx);
> + if (ret)
> + return ret;
> + }
> + DBG_BUGON(ctx->head != ctx->tail);
> +
> + if (ctx->pivot) {
> + z_erofs_commit_extent(ctx, ctx->pivot);
> + ctx->pivot = NULL;
> + }
> + return 0;
> +}
> +
> +#ifdef EROFS_MT_ENABLED
> +void *z_erofs_mt_wq_tls_alloc(struct erofs_workqueue *wq, void *ptr)
> +{
> + struct erofs_compress_wq_tls *tls;
> +
> + tls = calloc(1, sizeof(*tls));
> + if (!tls)
> + return NULL;
> +
> + tls->queue = malloc(Z_EROFS_COMPR_QUEUE_SZ);
> + if (!tls->queue)
> + goto err_free_priv;
> +
> + tls->destbuf = calloc(1, EROFS_CONFIG_COMPR_MAX_SZ +
> + EROFS_MAX_BLOCK_SIZE);
> + if (!tls->destbuf)
> + goto err_free_queue;
> +
> + tls->ccfg = calloc(EROFS_MAX_COMPR_CFGS, sizeof(*tls->ccfg));
> + if (!tls->ccfg)
> + goto err_free_destbuf;
> + return tls;
> +
> +err_free_destbuf:
> + free(tls->destbuf);
> +err_free_queue:
> + free(tls->queue);
> +err_free_priv:
> + free(tls);
> + return NULL;
> +}
> +
> +int z_erofs_mt_wq_tls_init_compr(struct erofs_sb_info *sbi,
> + struct erofs_compress_wq_tls *tls,
> + unsigned int alg_id, char *alg_name,
> + unsigned int comp_level,
> + unsigned int dict_size)
> +{
> + struct erofs_compress_cfg *lc = &tls->ccfg[alg_id];
> + int ret;
> +
> + if (likely(lc->enable))
> + return 0;
> +
> + ret = erofs_compressor_init(sbi, &lc->handle, alg_name,
> + comp_level, dict_size);
> + if (ret)
> + return ret;
> + lc->algorithmtype = alg_id;
> + lc->enable = true;
> + return 0;
> +}
> +
> +void *z_erofs_mt_wq_tls_free(struct erofs_workqueue *wq, void *priv)
> +{
> + struct erofs_compress_wq_tls *tls = priv;
> + int i;
> +
> + for (i = 0; i < EROFS_MAX_COMPR_CFGS; i++)
> + if (tls->ccfg[i].enable)
> + erofs_compressor_exit(&tls->ccfg[i].handle);
> +
> + free(tls->ccfg);
> + free(tls->destbuf);
> + free(tls->queue);
> + free(tls);
> + return NULL;
> +}
> +
> +void z_erofs_mt_workfn(struct erofs_work *work, void *tlsp)
> +{
> + struct erofs_compress_work *cwork = (struct erofs_compress_work *)work;
> + struct erofs_compress_wq_tls *tls = tlsp;
> + struct z_erofs_compress_sctx *ctx = &cwork->ctx;
> + u64 offset = ctx->seg_idx * cfg.c_segment_size;
> + int ret = 0;
> +
> + ret = z_erofs_mt_wq_tls_init_compr(ctx->ictx->inode->sbi, tls,
> + cwork->alg_id, cwork->alg_name,
> + cwork->comp_level,
> + cwork->dict_size);
> + if (ret)
> + goto out;
> +
> + ctx->queue = tls->queue;
> + ctx->destbuf = tls->destbuf;
> + ctx->chandle = &tls->ccfg[cwork->alg_id].handle;
> +
> + ctx->membuf = malloc(ctx->remaining);
> + if (!ctx->membuf) {
> + ret = -ENOMEM;
> + goto out;
> + }
> + ctx->memoff = 0;
> +
> + ret = z_erofs_compress_segment(ctx, offset, EROFS_NULL_ADDR);
> +
> +out:
> + cwork->errcode = ret;
> + pthread_mutex_lock(&z_erofs_mt_ctrl.mutex);
> + ++z_erofs_mt_ctrl.nfini;
> + pthread_cond_signal(&z_erofs_mt_ctrl.cond);
> + pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex);
> +}
> +
> +int z_erofs_merge_segment(struct z_erofs_compress_ictx *ictx,
> + struct z_erofs_compress_sctx *ctx)
> +{
> + struct z_erofs_extent_item *ei, *n;
> + struct erofs_sb_info *sbi = ictx->inode->sbi;
> + erofs_blk_t blkoff = 0;
> + int ret = 0, ret2;
> +
> + list_for_each_entry_safe(ei, n, &ctx->extents, list) {
> + list_del(&ei->list);
> + list_add_tail(&ei->list, &ictx->extents);
> +
> + if (ei->e.blkaddr != EROFS_NULL_ADDR) /* deduped extents */
> + continue;
> +
> + ei->e.blkaddr = ctx->blkaddr;
> + ctx->blkaddr += ei->e.compressedblks;
> +
> + ret2 = blk_write(sbi, ctx->membuf + blkoff * erofs_blksiz(sbi),
> + ei->e.blkaddr, ei->e.compressedblks);
> + blkoff += ei->e.compressedblks;
> + if (ret2) {
> + ret = ret2;
> + continue;
> + }
> + }
> + free(ctx->membuf);
> + return ret;
> +}
> +
> +int z_erofs_mt_compress(struct z_erofs_compress_ictx *ctx,
> + struct erofs_compress_cfg *ccfg,
> + erofs_blk_t blkaddr,
> + erofs_blk_t *compressed_blocks)
> +{
> + struct erofs_compress_work *cur, *head = NULL, **last = &head;
> + struct erofs_inode *inode = ctx->inode;
> + int nsegs = DIV_ROUND_UP(inode->i_size, cfg.c_segment_size);
> + int ret, i;
> +
> + z_erofs_mt_ctrl.nfini = 0;
> +
> + for (i = 0; i < nsegs; i++) {
> + if (z_erofs_mt_ctrl.idle) {
> + cur = z_erofs_mt_ctrl.idle;
> + z_erofs_mt_ctrl.idle = cur->next;
> + cur->next = NULL;
> + } else {
> + cur = calloc(1, sizeof(*cur));
> + if (!cur)
> + return -ENOMEM;
> + }
> + *last = cur;
> + last = &cur->next;
> +
> + cur->ctx = (struct z_erofs_compress_sctx) {
> + .ictx = ctx,
> + .seg_num = nsegs,
> + .seg_idx = i,
> + .pivot = &dummy_pivot,
> + };
> + init_list_head(&cur->ctx.extents);
> +
> + if (i == nsegs - 1)
> + cur->ctx.remaining = inode->i_size -
> + inode->fragment_size -
> + i * cfg.c_segment_size;
> + else
> + cur->ctx.remaining = cfg.c_segment_size;
> +
> + cur->alg_id = ccfg->handle.alg->id;
> + cur->alg_name = ccfg->handle.alg->name;
> + cur->comp_level = ccfg->handle.compression_level;
> + cur->dict_size = ccfg->handle.dict_size;
> +
> + cur->work.fn = z_erofs_mt_workfn;
> + erofs_queue_work(&z_erofs_mt_ctrl.wq, &cur->work);
> + }
> +
> + pthread_mutex_lock(&z_erofs_mt_ctrl.mutex);
> + while (z_erofs_mt_ctrl.nfini != nsegs)
> + pthread_cond_wait(&z_erofs_mt_ctrl.cond,
> + &z_erofs_mt_ctrl.mutex);
> + pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex);
> +
> + ret = 0;
> + while (head) {
> + cur = head;
> + head = cur->next;
> +
> + if (cur->errcode) {
> + ret = cur->errcode;
> + } else {
> + int ret2;
> +
> + cur->ctx.blkaddr = blkaddr;
> + ret2 = z_erofs_merge_segment(ctx, &cur->ctx);
> + if (ret2)
> + ret = ret2;
> +
> + *compressed_blocks += cur->ctx.blkaddr - blkaddr;
> + blkaddr = cur->ctx.blkaddr;
> + }
> +
> + cur->next = z_erofs_mt_ctrl.idle;
> + z_erofs_mt_ctrl.idle = cur;
> + }
> + return ret;
> +}
> +#endif
> +
> int erofs_write_compressed_file(struct erofs_inode *inode, int fd)
> {
> + static u8 g_queue[Z_EROFS_COMPR_QUEUE_SZ];
> struct erofs_buffer_head *bh;
> - static struct z_erofs_vle_compress_ctx ctx;
> - erofs_blk_t blkaddr, compressed_blocks;
> + static struct z_erofs_compress_ictx ctx;
> + static struct z_erofs_compress_sctx sctx;
> + struct erofs_compress_cfg *ccfg;
> + erofs_blk_t blkaddr, compressed_blocks = 0;
> unsigned int legacymetasize;
> int ret;
> + bool ismt = false;
> struct erofs_sb_info *sbi = inode->sbi;
> u8 *compressmeta = malloc(BLK_ROUND_UP(sbi, inode->i_size) *
> sizeof(struct z_erofs_lcluster_index) +
> @@ -963,8 +1292,8 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd)
> }
> }
> #endif
> - ctx.ccfg = &erofs_ccfg[inode->z_algorithmtype[0]];
> - inode->z_algorithmtype[0] = ctx.ccfg[0].algorithmtype;
> + ccfg = &erofs_ccfg[inode->z_algorithmtype[0]];
> + inode->z_algorithmtype[0] = ccfg[0].algorithmtype;
> inode->z_algorithmtype[1] = 0;
>
> inode->idata_size = 0;
> @@ -983,50 +1312,45 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd)
> blkaddr = erofs_mapbh(bh->block); /* start_blkaddr */
> ctx.inode = inode;
> ctx.pclustersize = z_erofs_get_max_pclustersize(inode);
> - ctx.blkaddr = blkaddr;
> ctx.metacur = compressmeta + Z_EROFS_LEGACY_MAP_HEADER_SIZE;
> - ctx.head = ctx.tail = 0;
> - ctx.clusterofs = 0;
> - ctx.pivot = &dummy_pivot;
> init_list_head(&ctx.extents);
> - ctx.remaining = inode->i_size - inode->fragment_size;
> + ctx.fd = fd;
> ctx.fix_dedupedfrag = false;
> ctx.fragemitted = false;
> + sctx = (struct z_erofs_compress_sctx) { .ictx = &ctx, };
> + init_list_head(&sctx.extents);
> +
> if (cfg.c_all_fragments && !erofs_is_packed_inode(inode) &&
> !inode->fragment_size) {
> ret = z_erofs_pack_file_from_fd(inode, fd, ctx.tof_chksum);
> if (ret)
> goto err_free_idata;
> +#ifdef EROFS_MT_ENABLED
> + } else if (z_erofs_mt_enabled && inode->i_size > cfg.c_segment_size) {
> + ismt = true;
> + ret = z_erofs_mt_compress(&ctx, ccfg, blkaddr, &compressed_blocks);
> + if (ret)
> + goto err_free_idata;
> +#endif
> } else {
> - while (ctx.remaining) {
> - const u64 rx = min_t(u64, ctx.remaining,
> - sizeof(ctx.queue) - ctx.tail);
> -
> - ret = read(fd, ctx.queue + ctx.tail, rx);
> - if (ret != rx) {
> - ret = -errno;
> - goto err_bdrop;
> - }
> - ctx.remaining -= rx;
> - ctx.tail += rx;
> -
> - ret = z_erofs_compress_one(&ctx);
> - if (ret)
> - goto err_free_idata;
> - }
> + sctx.queue = g_queue;
> + sctx.destbuf = NULL;
> + sctx.chandle = &ccfg->handle;
> + sctx.remaining = inode->i_size - inode->fragment_size;
> + sctx.seg_num = 1;
> + sctx.seg_idx = 0;
> + sctx.pivot = &dummy_pivot;
> +
> + ret = z_erofs_compress_segment(&sctx, -1, blkaddr);
> + if (ret)
> + goto err_free_idata;
> + compressed_blocks = sctx.blkaddr - blkaddr;
> }
> - DBG_BUGON(ctx.head != ctx.tail);
>
> /* fall back to no compression mode */
> - compressed_blocks = ctx.blkaddr - blkaddr;
> DBG_BUGON(compressed_blocks < !!inode->idata_size);
> compressed_blocks -= !!inode->idata_size;
>
> - if (ctx.pivot) {
> - z_erofs_commit_extent(&ctx, ctx.pivot);
> - ctx.pivot = NULL;
> - }
> -
> /* generate an extent for the deduplicated fragment */
> if (inode->fragment_size && !ctx.fragemitted) {
> struct z_erofs_extent_item *ei;
> @@ -1042,13 +1366,16 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd)
> .compressedblks = 0,
> .raw = false,
> .partial = false,
> - .blkaddr = ctx.blkaddr,
> + .blkaddr = sctx.blkaddr,
> };
> init_list_head(&ei->list);
> - z_erofs_commit_extent(&ctx, ei);
> + z_erofs_commit_extent(&sctx, ei);
> }
> z_erofs_fragments_commit(inode);
>
> + if (!ismt)
> + list_splice_tail(&sctx.extents, &ctx.extents);
> +
> z_erofs_write_indexes(&ctx);
> legacymetasize = ctx.metacur - compressmeta;
> /* estimate if data compression saves space or not */
> @@ -1257,8 +1584,25 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s
> return -EINVAL;
> }
>
> - if (erofs_sb_has_compr_cfgs(sbi))
> - return z_erofs_build_compr_cfgs(sbi, sb_bh, max_dict_size);
> + if (erofs_sb_has_compr_cfgs(sbi)) {
> + ret = z_erofs_build_compr_cfgs(sbi, sb_bh, max_dict_size);
> + if (ret)
> + return ret;
> + }
> +
> + z_erofs_mt_enabled = false;
> +#ifdef EROFS_MT_ENABLED
> + if (cfg.c_mt_workers > 1) {
> + pthread_mutex_init(&z_erofs_mt_ctrl.mutex, NULL);
> + pthread_cond_init(&z_erofs_mt_ctrl.cond, NULL);
> + ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.wq,
> + cfg.c_mt_workers,
> + cfg.c_mt_workers << 2,
> + z_erofs_mt_wq_tls_alloc,
> + z_erofs_mt_wq_tls_free);
> + z_erofs_mt_enabled = !ret;
> + }
> +#endif
> return 0;
> }
>
> @@ -1271,5 +1615,19 @@ int z_erofs_compress_exit(void)
> if (ret)
> return ret;
> }
> +
> + if (z_erofs_mt_enabled) {
> +#ifdef EROFS_MT_ENABLED
> + ret = erofs_destroy_workqueue(&z_erofs_mt_ctrl.wq);
> + if (ret)
> + return ret;
> + while (z_erofs_mt_ctrl.idle) {
> + struct erofs_compress_work *tmp =
> + z_erofs_mt_ctrl.idle->next;
> + free(z_erofs_mt_ctrl.idle);
> + z_erofs_mt_ctrl.idle = tmp;
> + }
> +#endif
> + }
> return 0;
> }
> diff --git a/lib/compressor.c b/lib/compressor.c
> index 58eae2a..175259e 100644
> --- a/lib/compressor.c
> +++ b/lib/compressor.c
> @@ -86,6 +86,8 @@ int erofs_compressor_init(struct erofs_sb_info *sbi, struct erofs_compress *c,
>
> /* should be written in "minimum compression ratio * 100" */
> c->compress_threshold = 100;
> + c->compression_level = -1;
> + c->dict_size = 0;
>
> if (!alg_name) {
> c->alg = NULL;
> diff --git a/mkfs/main.c b/mkfs/main.c
> index 126a049..5dbaf9f 100644
> --- a/mkfs/main.c
> +++ b/mkfs/main.c
> @@ -678,7 +678,7 @@ static int mkfs_parse_options_cfg(int argc, char *argv[])
>
> processors = erofs_get_available_processors();
> if (cfg.c_mt_workers > processors)
> - erofs_warn("the number of workers %d is more than the number of processors %d, performance may be impacted.",
> + erofs_warn("%d workers exceed %d processors, potentially impacting performance.",
> cfg.c_mt_workers, processors);
> break;
> }
> @@ -838,6 +838,12 @@ static int mkfs_parse_options_cfg(int argc, char *argv[])
> }
> cfg.c_pclusterblks_packed = pclustersize_packed >> sbi.blkszbits;
> }
> +#ifdef EROFS_MT_ENABLED
> + if (cfg.c_mt_workers > 1 && (cfg.c_dedupe || cfg.c_fragments)) {
> + erofs_warn("Note that dedupe/fragments are NOT supported in multi-threaded mode for now, reseting --workers=1.");
> + cfg.c_mt_workers = 1;
> + }
> +#endif
> return 0;
> }
>
> --
> 2.39.3
>
On 2024/3/15 10:39, Gao Xiang wrote:
>
>
> On 2024/3/14 20:37, Yifan Zhao wrote:
>> Currently, the creation of EROFS compressed image creation is
>> single-threaded, which suffers from performance issues. This patch
>> attempts to address it by compressing the large file in parallel.
>>
>> Specifically, each input file larger than 16MB is splited into segments,
>> and each worker thread compresses a segment as if it were a separate
>> file. Finally, the main thread merges all the compressed segments.
>>
>> Multi-threaded compression is not compatible with -Ededupe,
>> -E(all-)fragments and -Eztailpacking for now.
>>
>> Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn>
>> Co-authored-by: Tong Xin <xin_tong@sjtu.edu.cn>
>
> I did some updates yesterday and I just posted v7.
>
> BTW, I also found an issue that the output cannot be stablized with
> "mkfs.erofs -zlz4hc,12 --worker=72" and enwik9 dataset. I'm still
> looking into that since it's an unexpected behavior.
I found the issue is really with "-Eztailpacking", and now fixed.
Thanks,
Gao Xiang
On 2024/3/15 09:10, Gao Xiang wrote: > From: Yifan Zhao <zhaoyifan@sjtu.edu.cn> > > Currently, the creation of EROFS compressed image creation is > single-threaded, which suffers from performance issues. This patch > attempts to address it by compressing the large file in parallel. > > Specifically, each input file larger than 16MB is splited into segments, > and each worker thread compresses a segment as if it were a separate > file. Finally, the main thread merges all the compressed segments. > > Multi-threaded compression is not compatible with -Ededupe, > -E(all-)fragments and -Eztailpacking for now. > > Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn> > Co-authored-by: Tong Xin <xin_tong@sjtu.edu.cn> > Signed-off-by: Gao Xiang <hsiangkao@linux.alibaba.com> > --- > v7: > - support -Eztailpacking; Apply the following diff to fix this: diff --git a/lib/compress.c b/lib/compress.c index 0d796c8..7ad48b0 100644 --- a/lib/compress.c +++ b/lib/compress.c @@ -509,10 +509,10 @@ static int __z_erofs_compress_one(struct z_erofs_compress_sctx *ctx, struct erofs_compress *const h = ctx->chandle; unsigned int len = ctx->tail - ctx->head; bool is_packed_inode = erofs_is_packed_inode(inode); - bool final = !ctx->remaining; - bool may_packing = (cfg.c_fragments && final && !is_packed_inode && - !z_erofs_mt_enabled); - bool may_inline = (cfg.c_ztailpacking && final && !may_packing); + bool tsg = (ctx->seg_idx + 1 >= ctx->seg_num), final = !ctx->remaining; + bool may_packing = (cfg.c_fragments && tsg && final && + !is_packed_inode && !z_erofs_mt_enabled); + bool may_inline = (cfg.c_ztailpacking && tsg && final && !may_packing); unsigned int compressedsize; int ret; Thanks, Gao Xiang
On 2024/3/15 7:14, Sandeep Dhavale via Linux-erofs wrote: > I have been contributing to erofs for sometime and I would like to help > with code reviews as well. Thank you for the effort and looks good to me. :) > > Signed-off-by: Sandeep Dhavale <dhavale@google.com> Acked-by: Chao Yu <chao@kernel.org> Thanks,
On 2024/3/14 20:37, Yifan Zhao wrote:
> Currently, the creation of EROFS compressed image creation is
> single-threaded, which suffers from performance issues. This patch
> attempts to address it by compressing the large file in parallel.
>
> Specifically, each input file larger than 16MB is splited into segments,
> and each worker thread compresses a segment as if it were a separate
> file. Finally, the main thread merges all the compressed segments.
>
> Multi-threaded compression is not compatible with -Ededupe,
> -E(all-)fragments and -Eztailpacking for now.
>
> Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn>
> Co-authored-by: Tong Xin <xin_tong@sjtu.edu.cn>
I did some updates yesterday and I just posted v7.
BTW, I also found an issue that the output cannot be stablized with
"mkfs.erofs -zlz4hc,12 --worker=72" and enwik9 dataset. I'm still
looking into that since it's an unexpected behavior.
Thanks,
Gao Xiang
From: Yifan Zhao <zhaoyifan@sjtu.edu.cn> Currently, the creation of EROFS compressed image creation is single-threaded, which suffers from performance issues. This patch attempts to address it by compressing the large file in parallel. Specifically, each input file larger than 16MB is splited into segments, and each worker thread compresses a segment as if it were a separate file. Finally, the main thread merges all the compressed segments. Multi-threaded compression is not compatible with -Ededupe, -E(all-)fragments and -Eztailpacking for now. Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn> Co-authored-by: Tong Xin <xin_tong@sjtu.edu.cn> Signed-off-by: Gao Xiang <hsiangkao@linux.alibaba.com> --- v7: - support -Eztailpacking; - wq_private -> wq_tls; - minor updates. include/erofs/compress.h | 3 +- lib/compress.c | 548 ++++++++++++++++++++++++++++++++------- lib/compressor.c | 2 + mkfs/main.c | 8 +- 4 files changed, 464 insertions(+), 97 deletions(-) diff --git a/include/erofs/compress.h b/include/erofs/compress.h index b3272f7..3253611 100644 --- a/include/erofs/compress.h +++ b/include/erofs/compress.h @@ -14,7 +14,8 @@ extern "C" #include "internal.h" -#define EROFS_CONFIG_COMPR_MAX_SZ (4000 * 1024) +#define EROFS_CONFIG_COMPR_MAX_SZ (4000 * 1024) +#define Z_EROFS_COMPR_QUEUE_SZ (EROFS_CONFIG_COMPR_MAX_SZ * 2) void z_erofs_drop_inline_pcluster(struct erofs_inode *inode); int erofs_write_compressed_file(struct erofs_inode *inode, int fd); diff --git a/lib/compress.c b/lib/compress.c index 4101009..0d796c8 100644 --- a/lib/compress.c +++ b/lib/compress.c @@ -20,6 +20,9 @@ #include "erofs/block_list.h" #include "erofs/compress_hints.h" #include "erofs/fragments.h" +#ifdef EROFS_MT_ENABLED +#include "erofs/workqueue.h" +#endif /* compressing configuration specified by users */ struct erofs_compress_cfg { @@ -33,29 +36,77 @@ struct z_erofs_extent_item { struct z_erofs_inmem_extent e; }; -struct z_erofs_vle_compress_ctx { - u8 queue[EROFS_CONFIG_COMPR_MAX_SZ * 2]; +struct z_erofs_compress_ictx { + struct erofs_inode *inode; + int fd; + unsigned int pclustersize; + + u32 tof_chksum; + bool fix_dedupedfrag; + bool fragemitted; + + /* fields for write indexes */ + u8 *metacur; + struct list_head extents; + u16 clusterofs; +}; + +struct z_erofs_compress_sctx { /* segment context */ + struct z_erofs_compress_ictx *ictx; + + u8 *queue; struct list_head extents; struct z_erofs_extent_item *pivot; - struct erofs_inode *inode; - struct erofs_compress_cfg *ccfg; + struct erofs_compress *chandle; + char *destbuf; - u8 *metacur; unsigned int head, tail; erofs_off_t remaining; - unsigned int pclustersize; erofs_blk_t blkaddr; /* pointing to the next blkaddr */ u16 clusterofs; - u32 tof_chksum; - bool fix_dedupedfrag; - bool fragemitted; + int seg_num, seg_idx; + + void *membuf; + erofs_off_t memoff; +}; + +#ifdef EROFS_MT_ENABLED +struct erofs_compress_wq_tls { + u8 *queue; + char *destbuf; + struct erofs_compress_cfg *ccfg; }; +struct erofs_compress_work { + /* Note: struct erofs_work must be the first member */ + struct erofs_work work; + struct z_erofs_compress_sctx ctx; + struct erofs_compress_work *next; + + unsigned int alg_id; + char *alg_name; + unsigned int comp_level; + unsigned int dict_size; + + int errcode; +}; + +static struct { + struct erofs_workqueue wq; + struct erofs_compress_work *idle; + pthread_mutex_t mutex; + pthread_cond_t cond; + int nfini; +} z_erofs_mt_ctrl; +#endif + +static bool z_erofs_mt_enabled; + #define Z_EROFS_LEGACY_MAP_HEADER_SIZE Z_EROFS_FULL_INDEX_ALIGN(0) -static void z_erofs_write_indexes_final(struct z_erofs_vle_compress_ctx *ctx) +static void z_erofs_write_indexes_final(struct z_erofs_compress_ictx *ctx) { const unsigned int type = Z_EROFS_LCLUSTER_TYPE_PLAIN; struct z_erofs_lcluster_index di; @@ -71,7 +122,7 @@ static void z_erofs_write_indexes_final(struct z_erofs_vle_compress_ctx *ctx) ctx->metacur += sizeof(di); } -static void z_erofs_write_extent(struct z_erofs_vle_compress_ctx *ctx, +static void z_erofs_write_extent(struct z_erofs_compress_ictx *ctx, struct z_erofs_inmem_extent *e) { struct erofs_inode *inode = ctx->inode; @@ -170,7 +221,7 @@ static void z_erofs_write_extent(struct z_erofs_vle_compress_ctx *ctx, ctx->clusterofs = clusterofs + count; } -static void z_erofs_write_indexes(struct z_erofs_vle_compress_ctx *ctx) +static void z_erofs_write_indexes(struct z_erofs_compress_ictx *ctx) { struct z_erofs_extent_item *ei, *n; @@ -184,15 +235,16 @@ static void z_erofs_write_indexes(struct z_erofs_vle_compress_ctx *ctx) z_erofs_write_indexes_final(ctx); } -static bool z_erofs_need_refill(struct z_erofs_vle_compress_ctx *ctx) +static bool z_erofs_need_refill(struct z_erofs_compress_sctx *ctx) { const bool final = !ctx->remaining; unsigned int qh_aligned, qh_after; + struct erofs_inode *inode = ctx->ictx->inode; if (final || ctx->head < EROFS_CONFIG_COMPR_MAX_SZ) return false; - qh_aligned = round_down(ctx->head, erofs_blksiz(ctx->inode->sbi)); + qh_aligned = round_down(ctx->head, erofs_blksiz(inode->sbi)); qh_after = ctx->head - qh_aligned; memmove(ctx->queue, ctx->queue + qh_aligned, ctx->tail - qh_aligned); ctx->tail -= qh_aligned; @@ -204,7 +256,7 @@ static struct z_erofs_extent_item dummy_pivot = { .e.length = 0 }; -static void z_erofs_commit_extent(struct z_erofs_vle_compress_ctx *ctx, +static void z_erofs_commit_extent(struct z_erofs_compress_sctx *ctx, struct z_erofs_extent_item *ei) { if (ei == &dummy_pivot) @@ -212,14 +264,13 @@ static void z_erofs_commit_extent(struct z_erofs_vle_compress_ctx *ctx, list_add_tail(&ei->list, &ctx->extents); ctx->clusterofs = (ctx->clusterofs + ei->e.length) & - (erofs_blksiz(ctx->inode->sbi) - 1); - + (erofs_blksiz(ctx->ictx->inode->sbi) - 1); } -static int z_erofs_compress_dedupe(struct z_erofs_vle_compress_ctx *ctx, +static int z_erofs_compress_dedupe(struct z_erofs_compress_sctx *ctx, unsigned int *len) { - struct erofs_inode *inode = ctx->inode; + struct erofs_inode *inode = ctx->ictx->inode; const unsigned int lclustermask = (1 << inode->z_logical_clusterbits) - 1; struct erofs_sb_info *sbi = inode->sbi; struct z_erofs_extent_item *ei = ctx->pivot; @@ -315,16 +366,17 @@ out: return 0; } -static int write_uncompressed_extent(struct z_erofs_vle_compress_ctx *ctx, +static int write_uncompressed_extent(struct z_erofs_compress_sctx *ctx, unsigned int len, char *dst) { - struct erofs_sb_info *sbi = ctx->inode->sbi; + struct erofs_inode *inode = ctx->ictx->inode; + struct erofs_sb_info *sbi = inode->sbi; unsigned int count = min(erofs_blksiz(sbi), len); unsigned int interlaced_offset, rightpart; int ret; /* write interlaced uncompressed data if needed */ - if (ctx->inode->z_advise & Z_EROFS_ADVISE_INTERLACED_PCLUSTER) + if (inode->z_advise & Z_EROFS_ADVISE_INTERLACED_PCLUSTER) interlaced_offset = ctx->clusterofs; else interlaced_offset = 0; @@ -335,11 +387,17 @@ static int write_uncompressed_extent(struct z_erofs_vle_compress_ctx *ctx, memcpy(dst + interlaced_offset, ctx->queue + ctx->head, rightpart); memcpy(dst, ctx->queue + ctx->head + rightpart, count - rightpart); - erofs_dbg("Writing %u uncompressed data to block %u", - count, ctx->blkaddr); - ret = blk_write(sbi, dst, ctx->blkaddr, 1); - if (ret) - return ret; + if (ctx->membuf) { + erofs_dbg("Writing %u uncompressed data to membuf", count); + memcpy(ctx->membuf + ctx->memoff, dst, erofs_blksiz(sbi)); + ctx->memoff += erofs_blksiz(sbi); + } else { + erofs_dbg("Writing %u uncompressed data to block %u", count, + ctx->blkaddr); + ret = blk_write(sbi, dst, ctx->blkaddr, 1); + if (ret) + return ret; + } return count; } @@ -379,12 +437,12 @@ static int z_erofs_fill_inline_data(struct erofs_inode *inode, void *data, return len; } -static void tryrecompress_trailing(struct z_erofs_vle_compress_ctx *ctx, +static void tryrecompress_trailing(struct z_erofs_compress_sctx *ctx, struct erofs_compress *ec, void *in, unsigned int *insize, void *out, unsigned int *compressedsize) { - struct erofs_sb_info *sbi = ctx->inode->sbi; + struct erofs_sb_info *sbi = ctx->ictx->inode->sbi; static char tmp[Z_EROFS_PCLUSTER_MAX_SIZE]; unsigned int count; int ret = *compressedsize; @@ -406,10 +464,11 @@ static void tryrecompress_trailing(struct z_erofs_vle_compress_ctx *ctx, *compressedsize = ret; } -static bool z_erofs_fixup_deduped_fragment(struct z_erofs_vle_compress_ctx *ctx, +static bool z_erofs_fixup_deduped_fragment(struct z_erofs_compress_sctx *ctx, unsigned int len) { - struct erofs_inode *inode = ctx->inode; + struct z_erofs_compress_ictx *ictx = ctx->ictx; + struct erofs_inode *inode = ictx->inode; struct erofs_sb_info *sbi = inode->sbi; const unsigned int newsize = ctx->remaining + len; @@ -417,9 +476,10 @@ static bool z_erofs_fixup_deduped_fragment(struct z_erofs_vle_compress_ctx *ctx, /* try to fix again if it gets larger (should be rare) */ if (inode->fragment_size < newsize) { - ctx->pclustersize = min_t(erofs_off_t, z_erofs_get_max_pclustersize(inode), - roundup(newsize - inode->fragment_size, - erofs_blksiz(sbi))); + ictx->pclustersize = min_t(erofs_off_t, + z_erofs_get_max_pclustersize(inode), + roundup(newsize - inode->fragment_size, + erofs_blksiz(sbi))); return false; } @@ -436,29 +496,32 @@ static bool z_erofs_fixup_deduped_fragment(struct z_erofs_vle_compress_ctx *ctx, return true; } -static int __z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx, +static int __z_erofs_compress_one(struct z_erofs_compress_sctx *ctx, struct z_erofs_inmem_extent *e) { - static char dstbuf[EROFS_CONFIG_COMPR_MAX_SZ + EROFS_MAX_BLOCK_SIZE]; - struct erofs_inode *inode = ctx->inode; + static char g_dstbuf[EROFS_CONFIG_COMPR_MAX_SZ + EROFS_MAX_BLOCK_SIZE]; + char *dstbuf = ctx->destbuf ?: g_dstbuf; + struct z_erofs_compress_ictx *ictx = ctx->ictx; + struct erofs_inode *inode = ictx->inode; struct erofs_sb_info *sbi = inode->sbi; unsigned int blksz = erofs_blksiz(sbi); char *const dst = dstbuf + blksz; - struct erofs_compress *const h = &ctx->ccfg->handle; + struct erofs_compress *const h = ctx->chandle; unsigned int len = ctx->tail - ctx->head; bool is_packed_inode = erofs_is_packed_inode(inode); bool final = !ctx->remaining; - bool may_packing = (cfg.c_fragments && final && !is_packed_inode); + bool may_packing = (cfg.c_fragments && final && !is_packed_inode && + !z_erofs_mt_enabled); bool may_inline = (cfg.c_ztailpacking && final && !may_packing); unsigned int compressedsize; int ret; - if (len <= ctx->pclustersize) { + if (len <= ictx->pclustersize) { if (!final || !len) return 1; if (may_packing) { - if (inode->fragment_size && !ctx->fix_dedupedfrag) { - ctx->pclustersize = roundup(len, blksz); + if (inode->fragment_size && !ictx->fix_dedupedfrag) { + ictx->pclustersize = roundup(len, blksz); goto fix_dedupedfrag; } e->length = len; @@ -470,7 +533,7 @@ static int __z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx, e->length = min(len, cfg.c_max_decompressed_extent_bytes); ret = erofs_compress_destsize(h, ctx->queue + ctx->head, - &e->length, dst, ctx->pclustersize); + &e->length, dst, ictx->pclustersize); if (ret <= 0) { erofs_err("failed to compress %s: %s", inode->i_srcpath, erofs_strerror(ret)); @@ -507,16 +570,16 @@ nocompression: e->compressedblks = 1; e->raw = true; } else if (may_packing && len == e->length && - compressedsize < ctx->pclustersize && - (!inode->fragment_size || ctx->fix_dedupedfrag)) { + compressedsize < ictx->pclustersize && + (!inode->fragment_size || ictx->fix_dedupedfrag)) { frag_packing: ret = z_erofs_pack_fragments(inode, ctx->queue + ctx->head, - len, ctx->tof_chksum); + len, ictx->tof_chksum); if (ret < 0) return ret; e->compressedblks = 0; /* indicate a fragment */ e->raw = false; - ctx->fragemitted = true; + ictx->fragemitted = true; /* tailpcluster should be less than 1 block */ } else if (may_inline && len == e->length && compressedsize < blksz) { if (ctx->clusterofs + len <= blksz) { @@ -545,8 +608,8 @@ frag_packing: */ if (may_packing && len == e->length && (compressedsize & (blksz - 1)) && - ctx->tail < sizeof(ctx->queue)) { - ctx->pclustersize = roundup(compressedsize, blksz); + ctx->tail < Z_EROFS_COMPR_QUEUE_SZ) { + ictx->pclustersize = roundup(compressedsize, blksz); goto fix_dedupedfrag; } @@ -569,34 +632,45 @@ frag_packing: } /* write compressed data */ - erofs_dbg("Writing %u compressed data to %u of %u blocks", - e->length, ctx->blkaddr, e->compressedblks); + if (ctx->membuf) { + erofs_off_t sz = e->compressedblks * blksz; + erofs_dbg("Writing %u compressed data to membuf of %u blocks", + e->length, e->compressedblks); - ret = blk_write(sbi, dst - padding, ctx->blkaddr, - e->compressedblks); - if (ret) - return ret; + memcpy(ctx->membuf + ctx->memoff, dst - padding, sz); + ctx->memoff += sz; + } else { + erofs_dbg("Writing %u compressed data to %u of %u blocks", + e->length, ctx->blkaddr, e->compressedblks); + + ret = blk_write(sbi, dst - padding, ctx->blkaddr, + e->compressedblks); + if (ret) + return ret; + } e->raw = false; may_inline = false; may_packing = false; } e->partial = false; e->blkaddr = ctx->blkaddr; + if (ctx->blkaddr != EROFS_NULL_ADDR) + ctx->blkaddr += e->compressedblks; if (!may_inline && !may_packing && !is_packed_inode) (void)z_erofs_dedupe_insert(e, ctx->queue + ctx->head); - ctx->blkaddr += e->compressedblks; ctx->head += e->length; return 0; fix_dedupedfrag: DBG_BUGON(!inode->fragment_size); ctx->remaining += inode->fragment_size; - ctx->fix_dedupedfrag = true; + ictx->fix_dedupedfrag = true; return 1; } -static int z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx) +static int z_erofs_compress_one(struct z_erofs_compress_sctx *ctx) { + struct z_erofs_compress_ictx *ictx = ctx->ictx; unsigned int len = ctx->tail - ctx->head; struct z_erofs_extent_item *ei; @@ -624,7 +698,7 @@ static int z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx) len -= ei->e.length; ctx->pivot = ei; - if (ctx->fix_dedupedfrag && !ctx->fragemitted && + if (ictx->fix_dedupedfrag && !ictx->fragemitted && z_erofs_fixup_deduped_fragment(ctx, len)) break; @@ -912,13 +986,268 @@ void z_erofs_drop_inline_pcluster(struct erofs_inode *inode) inode->eof_tailraw = NULL; } +int z_erofs_compress_segment(struct z_erofs_compress_sctx *ctx, + u64 offset, erofs_blk_t blkaddr) +{ + int fd = ctx->ictx->fd; + + ctx->blkaddr = blkaddr; + while (ctx->remaining) { + const u64 rx = min_t(u64, ctx->remaining, + Z_EROFS_COMPR_QUEUE_SZ - ctx->tail); + int ret; + + ret = (offset == -1 ? + read(fd, ctx->queue + ctx->tail, rx) : + pread(fd, ctx->queue + ctx->tail, rx, offset)); + if (ret != rx) + return -errno; + + ctx->remaining -= rx; + ctx->tail += rx; + if (offset != -1) + offset += rx; + + ret = z_erofs_compress_one(ctx); + if (ret) + return ret; + } + DBG_BUGON(ctx->head != ctx->tail); + + if (ctx->pivot) { + z_erofs_commit_extent(ctx, ctx->pivot); + ctx->pivot = NULL; + } + return 0; +} + +#ifdef EROFS_MT_ENABLED +void *z_erofs_mt_wq_tls_alloc(struct erofs_workqueue *wq, void *ptr) +{ + struct erofs_compress_wq_tls *tls; + + tls = calloc(1, sizeof(*tls)); + if (!tls) + return NULL; + + tls->queue = malloc(Z_EROFS_COMPR_QUEUE_SZ); + if (!tls->queue) + goto err_free_priv; + + tls->destbuf = calloc(1, EROFS_CONFIG_COMPR_MAX_SZ + + EROFS_MAX_BLOCK_SIZE); + if (!tls->destbuf) + goto err_free_queue; + + tls->ccfg = calloc(EROFS_MAX_COMPR_CFGS, sizeof(*tls->ccfg)); + if (!tls->ccfg) + goto err_free_destbuf; + return tls; + +err_free_destbuf: + free(tls->destbuf); +err_free_queue: + free(tls->queue); +err_free_priv: + free(tls); + return NULL; +} + +int z_erofs_mt_wq_tls_init_compr(struct erofs_sb_info *sbi, + struct erofs_compress_wq_tls *tls, + unsigned int alg_id, char *alg_name, + unsigned int comp_level, + unsigned int dict_size) +{ + struct erofs_compress_cfg *lc = &tls->ccfg[alg_id]; + int ret; + + if (likely(lc->enable)) + return 0; + + ret = erofs_compressor_init(sbi, &lc->handle, alg_name, + comp_level, dict_size); + if (ret) + return ret; + lc->algorithmtype = alg_id; + lc->enable = true; + return 0; +} + +void *z_erofs_mt_wq_tls_free(struct erofs_workqueue *wq, void *priv) +{ + struct erofs_compress_wq_tls *tls = priv; + int i; + + for (i = 0; i < EROFS_MAX_COMPR_CFGS; i++) + if (tls->ccfg[i].enable) + erofs_compressor_exit(&tls->ccfg[i].handle); + + free(tls->ccfg); + free(tls->destbuf); + free(tls->queue); + free(tls); + return NULL; +} + +void z_erofs_mt_workfn(struct erofs_work *work, void *tlsp) +{ + struct erofs_compress_work *cwork = (struct erofs_compress_work *)work; + struct erofs_compress_wq_tls *tls = tlsp; + struct z_erofs_compress_sctx *ctx = &cwork->ctx; + u64 offset = ctx->seg_idx * cfg.c_segment_size; + int ret = 0; + + ret = z_erofs_mt_wq_tls_init_compr(ctx->ictx->inode->sbi, tls, + cwork->alg_id, cwork->alg_name, + cwork->comp_level, + cwork->dict_size); + if (ret) + goto out; + + ctx->queue = tls->queue; + ctx->destbuf = tls->destbuf; + ctx->chandle = &tls->ccfg[cwork->alg_id].handle; + + ctx->membuf = malloc(ctx->remaining); + if (!ctx->membuf) { + ret = -ENOMEM; + goto out; + } + ctx->memoff = 0; + + ret = z_erofs_compress_segment(ctx, offset, EROFS_NULL_ADDR); + +out: + cwork->errcode = ret; + pthread_mutex_lock(&z_erofs_mt_ctrl.mutex); + ++z_erofs_mt_ctrl.nfini; + pthread_cond_signal(&z_erofs_mt_ctrl.cond); + pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex); +} + +int z_erofs_merge_segment(struct z_erofs_compress_ictx *ictx, + struct z_erofs_compress_sctx *ctx) +{ + struct z_erofs_extent_item *ei, *n; + struct erofs_sb_info *sbi = ictx->inode->sbi; + erofs_blk_t blkoff = 0; + int ret = 0, ret2; + + list_for_each_entry_safe(ei, n, &ctx->extents, list) { + list_del(&ei->list); + list_add_tail(&ei->list, &ictx->extents); + + if (ei->e.blkaddr != EROFS_NULL_ADDR) /* deduped extents */ + continue; + + ei->e.blkaddr = ctx->blkaddr; + ctx->blkaddr += ei->e.compressedblks; + + ret2 = blk_write(sbi, ctx->membuf + blkoff * erofs_blksiz(sbi), + ei->e.blkaddr, ei->e.compressedblks); + blkoff += ei->e.compressedblks; + if (ret2) { + ret = ret2; + continue; + } + } + free(ctx->membuf); + return ret; +} + +int z_erofs_mt_compress(struct z_erofs_compress_ictx *ctx, + struct erofs_compress_cfg *ccfg, + erofs_blk_t blkaddr, + erofs_blk_t *compressed_blocks) +{ + struct erofs_compress_work *cur, *head = NULL, **last = &head; + struct erofs_inode *inode = ctx->inode; + int nsegs = DIV_ROUND_UP(inode->i_size, cfg.c_segment_size); + int ret, i; + + z_erofs_mt_ctrl.nfini = 0; + + for (i = 0; i < nsegs; i++) { + if (z_erofs_mt_ctrl.idle) { + cur = z_erofs_mt_ctrl.idle; + z_erofs_mt_ctrl.idle = cur->next; + cur->next = NULL; + } else { + cur = calloc(1, sizeof(*cur)); + if (!cur) + return -ENOMEM; + } + *last = cur; + last = &cur->next; + + cur->ctx = (struct z_erofs_compress_sctx) { + .ictx = ctx, + .seg_num = nsegs, + .seg_idx = i, + .pivot = &dummy_pivot, + }; + init_list_head(&cur->ctx.extents); + + if (i == nsegs - 1) + cur->ctx.remaining = inode->i_size - + inode->fragment_size - + i * cfg.c_segment_size; + else + cur->ctx.remaining = cfg.c_segment_size; + + cur->alg_id = ccfg->handle.alg->id; + cur->alg_name = ccfg->handle.alg->name; + cur->comp_level = ccfg->handle.compression_level; + cur->dict_size = ccfg->handle.dict_size; + + cur->work.fn = z_erofs_mt_workfn; + erofs_queue_work(&z_erofs_mt_ctrl.wq, &cur->work); + } + + pthread_mutex_lock(&z_erofs_mt_ctrl.mutex); + while (z_erofs_mt_ctrl.nfini != nsegs) + pthread_cond_wait(&z_erofs_mt_ctrl.cond, + &z_erofs_mt_ctrl.mutex); + pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex); + + ret = 0; + while (head) { + cur = head; + head = cur->next; + + if (cur->errcode) { + ret = cur->errcode; + } else { + int ret2; + + cur->ctx.blkaddr = blkaddr; + ret2 = z_erofs_merge_segment(ctx, &cur->ctx); + if (ret2) + ret = ret2; + + *compressed_blocks += cur->ctx.blkaddr - blkaddr; + blkaddr = cur->ctx.blkaddr; + } + + cur->next = z_erofs_mt_ctrl.idle; + z_erofs_mt_ctrl.idle = cur; + } + return ret; +} +#endif + int erofs_write_compressed_file(struct erofs_inode *inode, int fd) { + static u8 g_queue[Z_EROFS_COMPR_QUEUE_SZ]; struct erofs_buffer_head *bh; - static struct z_erofs_vle_compress_ctx ctx; - erofs_blk_t blkaddr, compressed_blocks; + static struct z_erofs_compress_ictx ctx; + static struct z_erofs_compress_sctx sctx; + struct erofs_compress_cfg *ccfg; + erofs_blk_t blkaddr, compressed_blocks = 0; unsigned int legacymetasize; int ret; + bool ismt = false; struct erofs_sb_info *sbi = inode->sbi; u8 *compressmeta = malloc(BLK_ROUND_UP(sbi, inode->i_size) * sizeof(struct z_erofs_lcluster_index) + @@ -963,8 +1292,8 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd) } } #endif - ctx.ccfg = &erofs_ccfg[inode->z_algorithmtype[0]]; - inode->z_algorithmtype[0] = ctx.ccfg[0].algorithmtype; + ccfg = &erofs_ccfg[inode->z_algorithmtype[0]]; + inode->z_algorithmtype[0] = ccfg[0].algorithmtype; inode->z_algorithmtype[1] = 0; inode->idata_size = 0; @@ -983,50 +1312,45 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd) blkaddr = erofs_mapbh(bh->block); /* start_blkaddr */ ctx.inode = inode; ctx.pclustersize = z_erofs_get_max_pclustersize(inode); - ctx.blkaddr = blkaddr; ctx.metacur = compressmeta + Z_EROFS_LEGACY_MAP_HEADER_SIZE; - ctx.head = ctx.tail = 0; - ctx.clusterofs = 0; - ctx.pivot = &dummy_pivot; init_list_head(&ctx.extents); - ctx.remaining = inode->i_size - inode->fragment_size; + ctx.fd = fd; ctx.fix_dedupedfrag = false; ctx.fragemitted = false; + sctx = (struct z_erofs_compress_sctx) { .ictx = &ctx, }; + init_list_head(&sctx.extents); + if (cfg.c_all_fragments && !erofs_is_packed_inode(inode) && !inode->fragment_size) { ret = z_erofs_pack_file_from_fd(inode, fd, ctx.tof_chksum); if (ret) goto err_free_idata; +#ifdef EROFS_MT_ENABLED + } else if (z_erofs_mt_enabled && inode->i_size > cfg.c_segment_size) { + ismt = true; + ret = z_erofs_mt_compress(&ctx, ccfg, blkaddr, &compressed_blocks); + if (ret) + goto err_free_idata; +#endif } else { - while (ctx.remaining) { - const u64 rx = min_t(u64, ctx.remaining, - sizeof(ctx.queue) - ctx.tail); - - ret = read(fd, ctx.queue + ctx.tail, rx); - if (ret != rx) { - ret = -errno; - goto err_bdrop; - } - ctx.remaining -= rx; - ctx.tail += rx; - - ret = z_erofs_compress_one(&ctx); - if (ret) - goto err_free_idata; - } + sctx.queue = g_queue; + sctx.destbuf = NULL; + sctx.chandle = &ccfg->handle; + sctx.remaining = inode->i_size - inode->fragment_size; + sctx.seg_num = 1; + sctx.seg_idx = 0; + sctx.pivot = &dummy_pivot; + + ret = z_erofs_compress_segment(&sctx, -1, blkaddr); + if (ret) + goto err_free_idata; + compressed_blocks = sctx.blkaddr - blkaddr; } - DBG_BUGON(ctx.head != ctx.tail); /* fall back to no compression mode */ - compressed_blocks = ctx.blkaddr - blkaddr; DBG_BUGON(compressed_blocks < !!inode->idata_size); compressed_blocks -= !!inode->idata_size; - if (ctx.pivot) { - z_erofs_commit_extent(&ctx, ctx.pivot); - ctx.pivot = NULL; - } - /* generate an extent for the deduplicated fragment */ if (inode->fragment_size && !ctx.fragemitted) { struct z_erofs_extent_item *ei; @@ -1042,13 +1366,16 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd) .compressedblks = 0, .raw = false, .partial = false, - .blkaddr = ctx.blkaddr, + .blkaddr = sctx.blkaddr, }; init_list_head(&ei->list); - z_erofs_commit_extent(&ctx, ei); + z_erofs_commit_extent(&sctx, ei); } z_erofs_fragments_commit(inode); + if (!ismt) + list_splice_tail(&sctx.extents, &ctx.extents); + z_erofs_write_indexes(&ctx); legacymetasize = ctx.metacur - compressmeta; /* estimate if data compression saves space or not */ @@ -1257,8 +1584,25 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s return -EINVAL; } - if (erofs_sb_has_compr_cfgs(sbi)) - return z_erofs_build_compr_cfgs(sbi, sb_bh, max_dict_size); + if (erofs_sb_has_compr_cfgs(sbi)) { + ret = z_erofs_build_compr_cfgs(sbi, sb_bh, max_dict_size); + if (ret) + return ret; + } + + z_erofs_mt_enabled = false; +#ifdef EROFS_MT_ENABLED + if (cfg.c_mt_workers > 1) { + pthread_mutex_init(&z_erofs_mt_ctrl.mutex, NULL); + pthread_cond_init(&z_erofs_mt_ctrl.cond, NULL); + ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.wq, + cfg.c_mt_workers, + cfg.c_mt_workers << 2, + z_erofs_mt_wq_tls_alloc, + z_erofs_mt_wq_tls_free); + z_erofs_mt_enabled = !ret; + } +#endif return 0; } @@ -1271,5 +1615,19 @@ int z_erofs_compress_exit(void) if (ret) return ret; } + + if (z_erofs_mt_enabled) { +#ifdef EROFS_MT_ENABLED + ret = erofs_destroy_workqueue(&z_erofs_mt_ctrl.wq); + if (ret) + return ret; + while (z_erofs_mt_ctrl.idle) { + struct erofs_compress_work *tmp = + z_erofs_mt_ctrl.idle->next; + free(z_erofs_mt_ctrl.idle); + z_erofs_mt_ctrl.idle = tmp; + } +#endif + } return 0; } diff --git a/lib/compressor.c b/lib/compressor.c index 58eae2a..175259e 100644 --- a/lib/compressor.c +++ b/lib/compressor.c @@ -86,6 +86,8 @@ int erofs_compressor_init(struct erofs_sb_info *sbi, struct erofs_compress *c, /* should be written in "minimum compression ratio * 100" */ c->compress_threshold = 100; + c->compression_level = -1; + c->dict_size = 0; if (!alg_name) { c->alg = NULL; diff --git a/mkfs/main.c b/mkfs/main.c index 126a049..5dbaf9f 100644 --- a/mkfs/main.c +++ b/mkfs/main.c @@ -678,7 +678,7 @@ static int mkfs_parse_options_cfg(int argc, char *argv[]) processors = erofs_get_available_processors(); if (cfg.c_mt_workers > processors) - erofs_warn("the number of workers %d is more than the number of processors %d, performance may be impacted.", + erofs_warn("%d workers exceed %d processors, potentially impacting performance.", cfg.c_mt_workers, processors); break; } @@ -838,6 +838,12 @@ static int mkfs_parse_options_cfg(int argc, char *argv[]) } cfg.c_pclusterblks_packed = pclustersize_packed >> sbi.blkszbits; } +#ifdef EROFS_MT_ENABLED + if (cfg.c_mt_workers > 1 && (cfg.c_dedupe || cfg.c_fragments)) { + erofs_warn("Note that dedupe/fragments are NOT supported in multi-threaded mode for now, reseting --workers=1."); + cfg.c_mt_workers = 1; + } +#endif return 0; } -- 2.39.3
Add some helpers (relaxed semantics) in order to prepare for the upcoming multi-threaded support. For example, compressor may be initialized more than once in different worker threads, resulting in noisy warnings. This patch makes sure that each message will be printed only once by adding `__warnonce` atomic booleans to each erofs_compressor_init(). Cc: Yifan Zhao <zhaoyifan@sjtu.edu.cn> Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn> Signed-off-by: Gao Xiang <hsiangkao@linux.alibaba.com> --- include/erofs/atomic.h | 28 ++++++++++++++++++++++++++++ lib/compressor_deflate.c | 11 ++++++++--- lib/compressor_libdeflate.c | 6 +++++- lib/compressor_liblzma.c | 5 ++++- 4 files changed, 45 insertions(+), 5 deletions(-) create mode 100644 include/erofs/atomic.h diff --git a/include/erofs/atomic.h b/include/erofs/atomic.h new file mode 100644 index 0000000..214cdb1 --- /dev/null +++ b/include/erofs/atomic.h @@ -0,0 +1,28 @@ +/* SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0 */ +/* + * Copyright (C) 2024 Alibaba Cloud + */ +#ifndef __EROFS_ATOMIC_H +#define __EROFS_ATOMIC_H + +/* + * Just use GCC/clang built-in functions for now + * See: https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html + */ +typedef unsigned long erofs_atomic_t; +typedef char erofs_atomic_bool_t; + +#define erofs_atomic_read(ptr) ({ \ + typeof(*ptr) __n; \ + __atomic_load(ptr, &__n, __ATOMIC_RELAXED); \ +__n;}) + +#define erofs_atomic_set(ptr, n) do { \ + typeof(*ptr) __n = (n); \ + __atomic_store(ptr, &__n, __ATOMIC_RELAXED); \ +} while(0) + +#define erofs_atomic_test_and_set(ptr) \ + __atomic_test_and_set(ptr, __ATOMIC_RELAXED) + +#endif diff --git a/lib/compressor_deflate.c b/lib/compressor_deflate.c index 8629415..e482224 100644 --- a/lib/compressor_deflate.c +++ b/lib/compressor_deflate.c @@ -7,6 +7,7 @@ #include "erofs/print.h" #include "erofs/config.h" #include "compressor.h" +#include "erofs/atomic.h" void *kite_deflate_init(int level, unsigned int dict_size); void kite_deflate_end(void *s); @@ -36,6 +37,8 @@ static int compressor_deflate_exit(struct erofs_compress *c) static int compressor_deflate_init(struct erofs_compress *c) { + static erofs_atomic_bool_t __warnonce; + if (c->private_data) { kite_deflate_end(c->private_data); c->private_data = NULL; @@ -44,9 +47,11 @@ static int compressor_deflate_init(struct erofs_compress *c) if (IS_ERR_VALUE(c->private_data)) return PTR_ERR(c->private_data); - erofs_warn("EXPERIMENTAL DEFLATE algorithm in use. Use at your own risk!"); - erofs_warn("*Carefully* check filesystem data correctness to avoid corruption!"); - erofs_warn("Please send a report to <linux-erofs@lists.ozlabs.org> if something is wrong."); + if (!erofs_atomic_test_and_set(&__warnonce)) { + erofs_warn("EXPERIMENTAL DEFLATE algorithm in use. Use at your own risk!"); + erofs_warn("*Carefully* check filesystem data correctness to avoid corruption!"); + erofs_warn("Please send a report to <linux-erofs@lists.ozlabs.org> if something is wrong."); + } return 0; } diff --git a/lib/compressor_libdeflate.c b/lib/compressor_libdeflate.c index 62d93f7..14cbce4 100644 --- a/lib/compressor_libdeflate.c +++ b/lib/compressor_libdeflate.c @@ -4,6 +4,7 @@ #include "erofs/config.h" #include <libdeflate.h> #include "compressor.h" +#include "erofs/atomic.h" static int libdeflate_compress_destsize(const struct erofs_compress *c, const void *src, unsigned int *srcsize, @@ -82,12 +83,15 @@ static int compressor_libdeflate_exit(struct erofs_compress *c) static int compressor_libdeflate_init(struct erofs_compress *c) { + static erofs_atomic_bool_t __warnonce; + libdeflate_free_compressor(c->private_data); c->private_data = libdeflate_alloc_compressor(c->compression_level); if (!c->private_data) return -ENOMEM; - erofs_warn("EXPERIMENTAL libdeflate compressor in use. Use at your own risk!"); + if (!erofs_atomic_test_and_set(&__warnonce)) + erofs_warn("EXPERIMENTAL libdeflate compressor in use. Use at your own risk!"); return 0; } diff --git a/lib/compressor_liblzma.c b/lib/compressor_liblzma.c index 712f44f..2f19a93 100644 --- a/lib/compressor_liblzma.c +++ b/lib/compressor_liblzma.c @@ -9,6 +9,7 @@ #include "erofs/config.h" #include "erofs/print.h" #include "erofs/internal.h" +#include "erofs/atomic.h" #include "compressor.h" struct erofs_liblzma_context { @@ -85,6 +86,7 @@ static int erofs_compressor_liblzma_init(struct erofs_compress *c) { struct erofs_liblzma_context *ctx; u32 preset; + static erofs_atomic_bool_t __warnonce; ctx = malloc(sizeof(*ctx)); if (!ctx) @@ -103,7 +105,8 @@ static int erofs_compressor_liblzma_init(struct erofs_compress *c) ctx->opt.dict_size = c->dict_size; c->private_data = ctx; - erofs_warn("It may take a longer time since MicroLZMA is still single-threaded for now."); + if (!erofs_atomic_test_and_set(&__warnonce)) + erofs_warn("It may take a longer time since MicroLZMA is still single-threaded for now."); return 0; } -- 2.39.3
From: Yifan Zhao <zhaoyifan@sjtu.edu.cn> This patch introduces `--workers=#` parameter for the incoming multi-threaded compression support. It also introduces a concept called `segment size` to split large inodes for multi-threaded compression, which has the fixed value 16MiB and cannot be modified for now. Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn> Signed-off-by: Gao Xiang <hsiangkao@linux.alibaba.com> --- include/erofs/config.h | 4 ++++ lib/config.c | 4 ++++ mkfs/main.c | 23 +++++++++++++++++++++++ 3 files changed, 31 insertions(+) diff --git a/include/erofs/config.h b/include/erofs/config.h index 73e3ac2..d2f91ff 100644 --- a/include/erofs/config.h +++ b/include/erofs/config.h @@ -75,6 +75,10 @@ struct erofs_configure { char c_force_chunkformat; /* < 0, xattr disabled and INT_MAX, always use inline xattrs */ int c_inline_xattr_tolerance; +#ifdef EROFS_MT_ENABLED + u64 c_segment_size; + u32 c_mt_workers; +#endif u32 c_pclusterblks_max, c_pclusterblks_def, c_pclusterblks_packed; u32 c_max_decompressed_extent_bytes; diff --git a/lib/config.c b/lib/config.c index 947a183..2530274 100644 --- a/lib/config.c +++ b/lib/config.c @@ -38,6 +38,10 @@ void erofs_init_configure(void) cfg.c_pclusterblks_max = 1; cfg.c_pclusterblks_def = 1; cfg.c_max_decompressed_extent_bytes = -1; +#ifdef EROFS_MT_ENABLED + cfg.c_segment_size = 16ULL * 1024 * 1024; + cfg.c_mt_workers = 1; +#endif erofs_stdout_tty = isatty(STDOUT_FILENO); } diff --git a/mkfs/main.c b/mkfs/main.c index 8a68a72..126a049 100644 --- a/mkfs/main.c +++ b/mkfs/main.c @@ -77,6 +77,9 @@ static struct option long_options[] = { #ifdef HAVE_LIBLZMA {"unlzma", optional_argument, NULL, 519}, {"unxz", optional_argument, NULL, 519}, +#endif +#ifdef EROFS_MT_ENABLED + {"workers", required_argument, NULL, 520}, #endif {0, 0, 0, 0}, }; @@ -178,6 +181,9 @@ static void usage(int argc, char **argv) #ifdef HAVE_LIBLZMA " --unxz[=X] try to filter the tarball stream through xz/lzma/lzip\n" " (and optionally dump the raw stream to X together)\n" +#endif +#ifdef EROFS_MT_ENABLED + " --workers=# set the number of worker threads to # (default=1)\n" #endif " --xattr-prefix=X X=extra xattr name prefix\n" " --mount-point=X X=prefix of target fs path (default: /)\n" @@ -660,6 +666,23 @@ static int mkfs_parse_options_cfg(int argc, char *argv[]) erofstar.dumpfile = strdup(optarg); tarerofs_decoder = EROFS_IOS_DECODER_GZIP + (opt - 518); break; +#ifdef EROFS_MT_ENABLED + case 520: { + unsigned int processors; + + cfg.c_mt_workers = strtoul(optarg, &endptr, 0); + if (errno || *endptr != '\0') { + erofs_err("invalid worker number %s", optarg); + return -EINVAL; + } + + processors = erofs_get_available_processors(); + if (cfg.c_mt_workers > processors) + erofs_warn("the number of workers %d is more than the number of processors %d, performance may be impacted.", + cfg.c_mt_workers, processors); + break; + } +#endif case 'V': version(); exit(0); -- 2.39.3
From: Yifan Zhao <zhaoyifan@sjtu.edu.cn> Add a workqueue implementation for multi-threading support inspired by xfsprogs. Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn> Suggested-by: Gao Xiang <hsiangkao@linux.alibaba.com> Signed-off-by: Gao Xiang <hsiangkao@linux.alibaba.com> --- configure.ac | 16 +++++ include/erofs/internal.h | 3 + include/erofs/workqueue.h | 34 +++++++++++ lib/Makefile.am | 4 ++ lib/workqueue.c | 123 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 180 insertions(+) create mode 100644 include/erofs/workqueue.h create mode 100644 lib/workqueue.c diff --git a/configure.ac b/configure.ac index 4b59230..3ccd6bb 100644 --- a/configure.ac +++ b/configure.ac @@ -96,6 +96,14 @@ AC_DEFUN([EROFS_UTILS_PARSE_DIRECTORY], AC_ARG_VAR([MAX_BLOCK_SIZE], [The maximum block size which erofs-utils supports]) +AC_MSG_CHECKING([whether to enable multi-threading support]) +AC_ARG_ENABLE([multithreading], + AS_HELP_STRING([--enable-multithreading], + [enable multi-threading support @<:@default=no@:>@]), + [enable_multithreading="$enableval"], + [enable_multithreading="no"]) +AC_MSG_RESULT([$enable_multithreading]) + AC_ARG_ENABLE([debug], [AS_HELP_STRING([--enable-debug], [enable debugging mode @<:@default=no@:>@])], @@ -280,6 +288,13 @@ AS_IF([test "x$MAX_BLOCK_SIZE" = "x"], [ [erofs_cv_max_block_size=4096])) ], [erofs_cv_max_block_size=$MAX_BLOCK_SIZE]) +# Configure multi-threading support +AS_IF([test "x$enable_multithreading" != "xno"], [ + AC_CHECK_HEADERS([pthread.h]) + AC_CHECK_LIB([pthread], [pthread_mutex_lock], [], AC_MSG_ERROR([libpthread is required for multi-threaded build])) + AC_DEFINE(EROFS_MT_ENABLED, 1, [Enable multi-threading support]) +], []) + # Configure debug mode AS_IF([test "x$enable_debug" != "xno"], [], [ dnl Turn off all assert checking. @@ -471,6 +486,7 @@ AS_IF([test "x$enable_fuzzing" != "xyes"], [], [ AM_CONDITIONAL([ENABLE_FUZZING], [test "x${enable_fuzzing}" = "xyes"]) # Set up needed symbols, conditionals and compiler/linker flags +AM_CONDITIONAL([ENABLE_EROFS_MT], [test "x${enable_multithreading}" != "xno"]) AM_CONDITIONAL([ENABLE_LZ4], [test "x${have_lz4}" = "xyes"]) AM_CONDITIONAL([ENABLE_LZ4HC], [test "x${have_lz4hc}" = "xyes"]) AM_CONDITIONAL([ENABLE_FUSE], [test "x${have_fuse}" = "xyes"]) diff --git a/include/erofs/internal.h b/include/erofs/internal.h index 5e968d6..4cd2059 100644 --- a/include/erofs/internal.h +++ b/include/erofs/internal.h @@ -22,6 +22,9 @@ typedef unsigned short umode_t; #include <sys/types.h> /* for off_t definition */ #include <sys/stat.h> /* for S_ISCHR definition */ #include <stdio.h> +#ifdef HAVE_PTHREAD_H +#include <pthread.h> +#endif #ifndef PATH_MAX #define PATH_MAX 4096 /* # chars in a path name including nul */ diff --git a/include/erofs/workqueue.h b/include/erofs/workqueue.h new file mode 100644 index 0000000..36037c3 --- /dev/null +++ b/include/erofs/workqueue.h @@ -0,0 +1,34 @@ +/* SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0 */ +#ifndef __EROFS_WORKQUEUE_H +#define __EROFS_WORKQUEUE_H + +#include "internal.h" + +struct erofs_workqueue; + +typedef void *(*erofs_wq_func_t)(struct erofs_workqueue *, void *); + +struct erofs_work { + struct erofs_work *next; + void (*fn)(struct erofs_work *work, void *tlsp); +}; + +struct erofs_workqueue { + struct erofs_work *head, *tail; + pthread_mutex_t lock; + pthread_cond_t cond_empty; + pthread_cond_t cond_full; + pthread_t *workers; + unsigned int nworker; + unsigned int max_jobs; + unsigned int job_count; + bool shutdown; + erofs_wq_func_t on_start, on_exit; +}; + +int erofs_alloc_workqueue(struct erofs_workqueue *wq, unsigned int nworker, + unsigned int max_jobs, erofs_wq_func_t on_start, + erofs_wq_func_t on_exit); +int erofs_queue_work(struct erofs_workqueue *wq, struct erofs_work *work); +int erofs_destroy_workqueue(struct erofs_workqueue *wq); +#endif diff --git a/lib/Makefile.am b/lib/Makefile.am index 54b9c9c..b3bea74 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -53,3 +53,7 @@ liberofs_la_SOURCES += kite_deflate.c compressor_deflate.c if ENABLE_LIBDEFLATE liberofs_la_SOURCES += compressor_libdeflate.c endif +if ENABLE_EROFS_MT +liberofs_la_LDFLAGS = -lpthread +liberofs_la_SOURCES += workqueue.c +endif diff --git a/lib/workqueue.c b/lib/workqueue.c new file mode 100644 index 0000000..47cec9b --- /dev/null +++ b/lib/workqueue.c @@ -0,0 +1,123 @@ +// SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0 +#include <pthread.h> +#include <stdlib.h> +#include "erofs/workqueue.h" + +static void *worker_thread(void *arg) +{ + struct erofs_workqueue *wq = arg; + struct erofs_work *work; + void *tlsp = NULL; + + if (wq->on_start) + tlsp = (wq->on_start)(wq, NULL); + + while (true) { + pthread_mutex_lock(&wq->lock); + + while (wq->job_count == 0 && !wq->shutdown) + pthread_cond_wait(&wq->cond_empty, &wq->lock); + if (wq->job_count == 0 && wq->shutdown) { + pthread_mutex_unlock(&wq->lock); + break; + } + + work = wq->head; + wq->head = work->next; + if (!wq->head) + wq->tail = NULL; + wq->job_count--; + + if (wq->job_count == wq->max_jobs - 1) + pthread_cond_broadcast(&wq->cond_full); + + pthread_mutex_unlock(&wq->lock); + work->fn(work, tlsp); + } + + if (wq->on_exit) + (void)(wq->on_exit)(wq, tlsp); + return NULL; +} + +int erofs_alloc_workqueue(struct erofs_workqueue *wq, unsigned int nworker, + unsigned int max_jobs, erofs_wq_func_t on_start, + erofs_wq_func_t on_exit) +{ + unsigned int i; + int ret; + + if (!wq || nworker <= 0 || max_jobs <= 0) + return -EINVAL; + + wq->head = wq->tail = NULL; + wq->nworker = nworker; + wq->max_jobs = max_jobs; + wq->job_count = 0; + wq->shutdown = false; + wq->on_start = on_start; + wq->on_exit = on_exit; + pthread_mutex_init(&wq->lock, NULL); + pthread_cond_init(&wq->cond_empty, NULL); + pthread_cond_init(&wq->cond_full, NULL); + + wq->workers = malloc(nworker * sizeof(pthread_t)); + if (!wq->workers) + return -ENOMEM; + + for (i = 0; i < nworker; i++) { + ret = pthread_create(&wq->workers[i], NULL, worker_thread, wq); + if (ret) { + while (i) + pthread_cancel(wq->workers[--i]); + free(wq->workers); + return ret; + } + } + return 0; +} + +int erofs_queue_work(struct erofs_workqueue *wq, struct erofs_work *work) +{ + if (!wq || !work) + return -EINVAL; + + pthread_mutex_lock(&wq->lock); + + while (wq->job_count == wq->max_jobs) + pthread_cond_wait(&wq->cond_full, &wq->lock); + + work->next = NULL; + if (!wq->head) + wq->head = work; + else + wq->tail->next = work; + wq->tail = work; + wq->job_count++; + + pthread_cond_signal(&wq->cond_empty); + pthread_mutex_unlock(&wq->lock); + return 0; +} + +int erofs_destroy_workqueue(struct erofs_workqueue *wq) +{ + unsigned int i; + + if (!wq) + return -EINVAL; + + pthread_mutex_lock(&wq->lock); + wq->shutdown = true; + pthread_cond_broadcast(&wq->cond_empty); + pthread_mutex_unlock(&wq->lock); + + for (i = 0; i < wq->nworker; i++) + pthread_join(wq->workers[i], NULL); + + free(wq->workers); + pthread_mutex_destroy(&wq->lock); + pthread_cond_destroy(&wq->cond_empty); + pthread_cond_destroy(&wq->cond_full); + return 0; +} -- 2.39.3
In order to prepare for multi-threaded decompression. Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn> Signed-off-by: Gao Xiang <hsiangkao@linux.alibaba.com> --- configure.ac | 1 + include/erofs/config.h | 1 + lib/config.c | 12 ++++++++++++ 3 files changed, 14 insertions(+) diff --git a/configure.ac b/configure.ac index 3ccd6bb..2e69260 100644 --- a/configure.ac +++ b/configure.ac @@ -256,6 +256,7 @@ AC_CHECK_FUNCS(m4_flatten([ strerror strrchr strtoull + sysconf tmpfile64 utimensat])) diff --git a/include/erofs/config.h b/include/erofs/config.h index eecf575..73e3ac2 100644 --- a/include/erofs/config.h +++ b/include/erofs/config.h @@ -109,6 +109,7 @@ static inline int erofs_selabel_open(const char *file_contexts) void erofs_update_progressinfo(const char *fmt, ...); char *erofs_trim_for_progressinfo(const char *str, int placeholder); +unsigned int erofs_get_available_processors(void); #ifdef __cplusplus } diff --git a/lib/config.c b/lib/config.c index 1096cd1..947a183 100644 --- a/lib/config.c +++ b/lib/config.c @@ -14,6 +14,9 @@ #ifdef HAVE_SYS_IOCTL_H #include <sys/ioctl.h> #endif +#ifdef HAVE_UNISTD_H +#include <unistd.h> +#endif struct erofs_configure cfg; struct erofs_sb_info sbi; @@ -177,3 +180,12 @@ void erofs_update_progressinfo(const char *fmt, ...) fputs(msg, stdout); fputc('\n', stdout); } + +unsigned int erofs_get_available_processors(void) +{ +#if defined(HAVE_UNISTD_H) && defined(HAVE_SYSCONF) + return sysconf(_SC_NPROCESSORS_ONLN); +#else + return 0; +#endif +} -- 2.39.3
On 2024/3/15 07:14, Sandeep Dhavale wrote:
> I have been contributing to erofs for sometime and I would like to help
> with code reviews as well.
>
> Signed-off-by: Sandeep Dhavale <dhavale@google.com>
Looks good to me, and thanks for taking your time on erofs project:
Reviewed-by: Gao Xiang <hsiangkao@linux.alibaba.com>
Thanks,
Gao Xiang
I have been contributing to erofs for sometime and I would like to help with code reviews as well. Signed-off-by: Sandeep Dhavale <dhavale@google.com> --- MAINTAINERS | 1 + 1 file changed, 1 insertion(+) diff --git a/MAINTAINERS b/MAINTAINERS index 4f298c4187fb..b130340d71bb 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -7875,6 +7875,7 @@ M: Gao Xiang <xiang@kernel.org> M: Chao Yu <chao@kernel.org> R: Yue Hu <huyue2@coolpad.com> R: Jeffle Xu <jefflexu@linux.alibaba.com> +R: Sandeep Dhavale <dhavale@google.com> L: linux-erofs@lists.ozlabs.org S: Maintained W: https://erofs.docs.kernel.org -- 2.44.0.291.gc1ea87d7ee-goog
On Thu, 14 Mar 2024 09:57:57 -0700 Alison Schofield <alison.schofield@intel.com> wrote: > On Fri, Feb 23, 2024 at 12:56:34PM -0500, Steven Rostedt wrote: > > From: "Steven Rostedt (Google)" <rostedt@goodmis.org> > > > > [ > > This is a treewide change. I will likely re-create this patch again in > > the second week of the merge window of v6.9 and submit it then. Hoping > > to keep the conflicts that it will cause to a minimum. > > ] Note, change of plans. I plan on sending this in the next merge window, as this merge window I have this patch: https://lore.kernel.org/linux-trace-kernel/20240312113002.00031668@gandalf.local.home/ That will warn if the source string of __string() is different than the source string of __assign_str(). I want to make sure they are identical before just dropping one of them. > > > diff --git a/drivers/cxl/core/trace.h b/drivers/cxl/core/trace.h > > index bdf117a33744..07ba4e033347 100644 > > --- a/drivers/cxl/core/trace.h > > +++ b/drivers/cxl/core/trace.h > > snip to poison > > > @@ -668,8 +668,8 @@ TRACE_EVENT(cxl_poison, > > ), > > > > TP_fast_assign( > > - __assign_str(memdev, dev_name(&cxlmd->dev)); > > - __assign_str(host, dev_name(cxlmd->dev.parent)); > > + __assign_str(memdev); > > + __assign_str(host); > > I think I get that the above changes work because the TP_STRUCT__entry for > these did: > __string(memdev, dev_name(&cxlmd->dev)) > __string(host, dev_name(cxlmd->dev.parent)) That's the point. They have to be identical or you will likely bug. The __string(name, src) is used to find the string length of src which allocates the necessary length on the ring buffer. The __assign_str(name, src) will copy src into the ring buffer. Similar to: len = strlen(src); buf = malloc(len); strcpy(buf, str); Where __string() is strlen() and __assign_str() is strcpy(). It doesn't make sense to use two different strings, and if you did, it would likely be a bug. But the magic behind __string() does much more than just get the length of the string, and it could easily save the pointer to the string (along with its length) and have it copy that in the __assign_str() call, making the src parameter of __assign_str() useless. > > > __entry->serial = cxlmd->cxlds->serial; > > __entry->overflow_ts = cxl_poison_overflow(flags, overflow_ts); > > __entry->dpa = cxl_poison_record_dpa(record); > > @@ -678,12 +678,12 @@ TRACE_EVENT(cxl_poison, > > __entry->trace_type = trace_type; > > __entry->flags = flags; > > if (region) { > > - __assign_str(region, dev_name(®ion->dev)); > > + __assign_str(region); > > memcpy(__entry->uuid, ®ion->params.uuid, 16); > > __entry->hpa = cxl_trace_hpa(region, cxlmd, > > __entry->dpa); > > } else { > > - __assign_str(region, ""); > > + __assign_str(region); > > memset(__entry->uuid, 0, 16); > > __entry->hpa = ULLONG_MAX; > > For the above 2, there was no helper in TP_STRUCT__entry. A recently > posted patch is fixing that up to be __string(region, NULL) See [1], > with the actual assignment still happening in TP_fast_assign. __string(region, NULL) doesn't make sense. It's like: len = strlen(NULL); buf = malloc(len); strcpy(buf, NULL); ?? I'll reply to that email. -- Steve > > Does that assign logic need to move to the TP_STRUCT__entry definition > when you merge these changes? I'm not clear how much logic is able to be > included, ie like 'C' style code in the TP_STRUCT__entry. > > [1] > https://lore.kernel.org/linux-cxl/20240314044301.2108650-1-alison.schofield@intel.com/
On Fri, Feb 23, 2024 at 12:56:34PM -0500, Steven Rostedt wrote: > From: "Steven Rostedt (Google)" <rostedt@goodmis.org> > > [ > This is a treewide change. I will likely re-create this patch again in > the second week of the merge window of v6.9 and submit it then. Hoping > to keep the conflicts that it will cause to a minimum. > ] > > With the rework of how the __string() handles dynamic strings where it > saves off the source string in field in the helper structure[1], the > assignment of that value to the trace event field is stored in the helper > value and does not need to be passed in again. > > This means that with: > > __string(field, mystring) > > Which use to be assigned with __assign_str(field, mystring), no longer > needs the second parameter and it is unused. With this, __assign_str() > will now only get a single parameter. > > There's over 700 users of __assign_str() and because coccinelle does not > handle the TRACE_EVENT() macro I ended up using the following sed script: > > git grep -l __assign_str | while read a ; do > sed -e 's/\(__assign_str([^,]*[^ ,]\) *,[^;]*/\1)/' $a > /tmp/test-file; > mv /tmp/test-file $a; > done > > I then searched for __assign_str() that did not end with ';' as those > were multi line assignments that the sed script above would fail to catch. > > Note, the same updates will need to be done for: > > __assign_str_len() > __assign_rel_str() > __assign_rel_str_len() > __assign_bitmask() > __assign_rel_bitmask() > __assign_cpumask() > __assign_rel_cpumask() > > [1] https://lore.kernel.org/linux-trace-kernel/20240222211442.634192653@goodmis.org/ > > Signed-off-by: Steven Rostedt (Google) <rostedt@goodmis.org> > --- > arch/arm64/kernel/trace-events-emulation.h | 2 +- > arch/powerpc/include/asm/trace.h | 4 +- > arch/x86/kvm/trace.h | 2 +- > drivers/base/regmap/trace.h | 18 +-- > drivers/base/trace.h | 2 +- > drivers/block/rnbd/rnbd-srv-trace.h | 12 +- > drivers/cxl/core/trace.h | 24 ++-- snip to CXL > diff --git a/drivers/cxl/core/trace.h b/drivers/cxl/core/trace.h > index bdf117a33744..07ba4e033347 100644 > --- a/drivers/cxl/core/trace.h > +++ b/drivers/cxl/core/trace.h snip to poison > @@ -668,8 +668,8 @@ TRACE_EVENT(cxl_poison, > ), > > TP_fast_assign( > - __assign_str(memdev, dev_name(&cxlmd->dev)); > - __assign_str(host, dev_name(cxlmd->dev.parent)); > + __assign_str(memdev); > + __assign_str(host); I think I get that the above changes work because the TP_STRUCT__entry for these did: __string(memdev, dev_name(&cxlmd->dev)) __string(host, dev_name(cxlmd->dev.parent)) > __entry->serial = cxlmd->cxlds->serial; > __entry->overflow_ts = cxl_poison_overflow(flags, overflow_ts); > __entry->dpa = cxl_poison_record_dpa(record); > @@ -678,12 +678,12 @@ TRACE_EVENT(cxl_poison, > __entry->trace_type = trace_type; > __entry->flags = flags; > if (region) { > - __assign_str(region, dev_name(®ion->dev)); > + __assign_str(region); > memcpy(__entry->uuid, ®ion->params.uuid, 16); > __entry->hpa = cxl_trace_hpa(region, cxlmd, > __entry->dpa); > } else { > - __assign_str(region, ""); > + __assign_str(region); > memset(__entry->uuid, 0, 16); > __entry->hpa = ULLONG_MAX; For the above 2, there was no helper in TP_STRUCT__entry. A recently posted patch is fixing that up to be __string(region, NULL) See [1], with the actual assignment still happening in TP_fast_assign. Does that assign logic need to move to the TP_STRUCT__entry definition when you merge these changes? I'm not clear how much logic is able to be included, ie like 'C' style code in the TP_STRUCT__entry. [1] https://lore.kernel.org/linux-cxl/20240314044301.2108650-1-alison.schofield@intel.com/ Thanks for helping, Alison > }
Currently, the creation of EROFS compressed image creation is single-threaded, which suffers from performance issues. This patch attempts to address it by compressing the large file in parallel. Specifically, each input file larger than 16MB is splited into segments, and each worker thread compresses a segment as if it were a separate file. Finally, the main thread merges all the compressed segments. Multi-threaded compression is not compatible with -Ededupe, -E(all-)fragments and -Eztailpacking for now. Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn> Co-authored-by: Tong Xin <xin_tong@sjtu.edu.cn> --- include/erofs/compress.h | 3 +- lib/compress.c | 551 ++++++++++++++++++++++++++++++++------- lib/compressor.c | 2 + mkfs/main.c | 9 + 4 files changed, 468 insertions(+), 97 deletions(-) diff --git a/include/erofs/compress.h b/include/erofs/compress.h index b3272f7..3253611 100644 --- a/include/erofs/compress.h +++ b/include/erofs/compress.h @@ -14,7 +14,8 @@ extern "C" #include "internal.h" -#define EROFS_CONFIG_COMPR_MAX_SZ (4000 * 1024) +#define EROFS_CONFIG_COMPR_MAX_SZ (4000 * 1024) +#define Z_EROFS_COMPR_QUEUE_SZ (EROFS_CONFIG_COMPR_MAX_SZ * 2) void z_erofs_drop_inline_pcluster(struct erofs_inode *inode); int erofs_write_compressed_file(struct erofs_inode *inode, int fd); diff --git a/lib/compress.c b/lib/compress.c index 4101009..9fc3bee 100644 --- a/lib/compress.c +++ b/lib/compress.c @@ -20,6 +20,9 @@ #include "erofs/block_list.h" #include "erofs/compress_hints.h" #include "erofs/fragments.h" +#ifdef EROFS_MT_ENABLED +#include "erofs/workqueue.h" +#endif /* compressing configuration specified by users */ struct erofs_compress_cfg { @@ -33,29 +36,77 @@ struct z_erofs_extent_item { struct z_erofs_inmem_extent e; }; -struct z_erofs_vle_compress_ctx { - u8 queue[EROFS_CONFIG_COMPR_MAX_SZ * 2]; +struct z_erofs_compress_ictx { + struct erofs_inode *inode; + int fd; + unsigned int pclustersize; + + u32 tof_chksum; + bool fix_dedupedfrag; + bool fragemitted; + + /* fields for write indexes */ + u8 *metacur; + struct list_head extents; + u16 clusterofs; +}; + +struct z_erofs_compress_sctx { /* segment context */ + struct z_erofs_compress_ictx *ictx; + + u8 *queue; struct list_head extents; struct z_erofs_extent_item *pivot; - struct erofs_inode *inode; - struct erofs_compress_cfg *ccfg; + struct erofs_compress *chandle; + char *destbuf; - u8 *metacur; unsigned int head, tail; erofs_off_t remaining; - unsigned int pclustersize; erofs_blk_t blkaddr; /* pointing to the next blkaddr */ u16 clusterofs; - u32 tof_chksum; - bool fix_dedupedfrag; - bool fragemitted; + int seg_num, seg_idx; + + void *membuf; + erofs_off_t memoff; +}; + +#ifdef EROFS_MT_ENABLED +struct erofs_compress_wq_private { + u8 *queue; + char *destbuf; + struct erofs_compress_cfg *ccfg; }; +struct erofs_compress_work { + /* Note: struct erofs_work must be the first member */ + struct erofs_work work; + struct z_erofs_compress_sctx ctx; + struct erofs_compress_work *next; + + unsigned int alg_id; + char *alg_name; + unsigned int comp_level; + unsigned int dict_size; + + int errcode; +}; + +static struct { + struct erofs_workqueue wq; + struct erofs_compress_work *idle; + pthread_mutex_t mutex; + pthread_cond_t cond; + int nfini; +} z_erofs_mt_ctrl; +#endif + +static bool z_erofs_mt_enabled; + #define Z_EROFS_LEGACY_MAP_HEADER_SIZE Z_EROFS_FULL_INDEX_ALIGN(0) -static void z_erofs_write_indexes_final(struct z_erofs_vle_compress_ctx *ctx) +static void z_erofs_write_indexes_final(struct z_erofs_compress_ictx *ctx) { const unsigned int type = Z_EROFS_LCLUSTER_TYPE_PLAIN; struct z_erofs_lcluster_index di; @@ -71,7 +122,7 @@ static void z_erofs_write_indexes_final(struct z_erofs_vle_compress_ctx *ctx) ctx->metacur += sizeof(di); } -static void z_erofs_write_extent(struct z_erofs_vle_compress_ctx *ctx, +static void z_erofs_write_extent(struct z_erofs_compress_ictx *ctx, struct z_erofs_inmem_extent *e) { struct erofs_inode *inode = ctx->inode; @@ -170,7 +221,7 @@ static void z_erofs_write_extent(struct z_erofs_vle_compress_ctx *ctx, ctx->clusterofs = clusterofs + count; } -static void z_erofs_write_indexes(struct z_erofs_vle_compress_ctx *ctx) +static void z_erofs_write_indexes(struct z_erofs_compress_ictx *ctx) { struct z_erofs_extent_item *ei, *n; @@ -184,15 +235,16 @@ static void z_erofs_write_indexes(struct z_erofs_vle_compress_ctx *ctx) z_erofs_write_indexes_final(ctx); } -static bool z_erofs_need_refill(struct z_erofs_vle_compress_ctx *ctx) +static bool z_erofs_need_refill(struct z_erofs_compress_sctx *ctx) { const bool final = !ctx->remaining; unsigned int qh_aligned, qh_after; + struct erofs_inode *inode = ctx->ictx->inode; if (final || ctx->head < EROFS_CONFIG_COMPR_MAX_SZ) return false; - qh_aligned = round_down(ctx->head, erofs_blksiz(ctx->inode->sbi)); + qh_aligned = round_down(ctx->head, erofs_blksiz(inode->sbi)); qh_after = ctx->head - qh_aligned; memmove(ctx->queue, ctx->queue + qh_aligned, ctx->tail - qh_aligned); ctx->tail -= qh_aligned; @@ -204,7 +256,7 @@ static struct z_erofs_extent_item dummy_pivot = { .e.length = 0 }; -static void z_erofs_commit_extent(struct z_erofs_vle_compress_ctx *ctx, +static void z_erofs_commit_extent(struct z_erofs_compress_sctx *ctx, struct z_erofs_extent_item *ei) { if (ei == &dummy_pivot) @@ -212,14 +264,13 @@ static void z_erofs_commit_extent(struct z_erofs_vle_compress_ctx *ctx, list_add_tail(&ei->list, &ctx->extents); ctx->clusterofs = (ctx->clusterofs + ei->e.length) & - (erofs_blksiz(ctx->inode->sbi) - 1); - + (erofs_blksiz(ctx->ictx->inode->sbi) - 1); } -static int z_erofs_compress_dedupe(struct z_erofs_vle_compress_ctx *ctx, +static int z_erofs_compress_dedupe(struct z_erofs_compress_sctx *ctx, unsigned int *len) { - struct erofs_inode *inode = ctx->inode; + struct erofs_inode *inode = ctx->ictx->inode; const unsigned int lclustermask = (1 << inode->z_logical_clusterbits) - 1; struct erofs_sb_info *sbi = inode->sbi; struct z_erofs_extent_item *ei = ctx->pivot; @@ -315,16 +366,17 @@ out: return 0; } -static int write_uncompressed_extent(struct z_erofs_vle_compress_ctx *ctx, +static int write_uncompressed_extent(struct z_erofs_compress_sctx *ctx, unsigned int len, char *dst) { - struct erofs_sb_info *sbi = ctx->inode->sbi; + struct erofs_inode *inode = ctx->ictx->inode; + struct erofs_sb_info *sbi = inode->sbi; unsigned int count = min(erofs_blksiz(sbi), len); unsigned int interlaced_offset, rightpart; int ret; /* write interlaced uncompressed data if needed */ - if (ctx->inode->z_advise & Z_EROFS_ADVISE_INTERLACED_PCLUSTER) + if (inode->z_advise & Z_EROFS_ADVISE_INTERLACED_PCLUSTER) interlaced_offset = ctx->clusterofs; else interlaced_offset = 0; @@ -335,11 +387,17 @@ static int write_uncompressed_extent(struct z_erofs_vle_compress_ctx *ctx, memcpy(dst + interlaced_offset, ctx->queue + ctx->head, rightpart); memcpy(dst, ctx->queue + ctx->head + rightpart, count - rightpart); - erofs_dbg("Writing %u uncompressed data to block %u", - count, ctx->blkaddr); - ret = blk_write(sbi, dst, ctx->blkaddr, 1); - if (ret) - return ret; + if (ctx->membuf) { + erofs_dbg("Writing %u uncompressed data to membuf", count); + memcpy(ctx->membuf + ctx->memoff, dst, erofs_blksiz(sbi)); + ctx->memoff += erofs_blksiz(sbi); + } else { + erofs_dbg("Writing %u uncompressed data to block %u", count, + ctx->blkaddr); + ret = blk_write(sbi, dst, ctx->blkaddr, 1); + if (ret) + return ret; + } return count; } @@ -379,12 +437,12 @@ static int z_erofs_fill_inline_data(struct erofs_inode *inode, void *data, return len; } -static void tryrecompress_trailing(struct z_erofs_vle_compress_ctx *ctx, +static void tryrecompress_trailing(struct z_erofs_compress_sctx *ctx, struct erofs_compress *ec, void *in, unsigned int *insize, void *out, unsigned int *compressedsize) { - struct erofs_sb_info *sbi = ctx->inode->sbi; + struct erofs_sb_info *sbi = ctx->ictx->inode->sbi; static char tmp[Z_EROFS_PCLUSTER_MAX_SIZE]; unsigned int count; int ret = *compressedsize; @@ -406,10 +464,11 @@ static void tryrecompress_trailing(struct z_erofs_vle_compress_ctx *ctx, *compressedsize = ret; } -static bool z_erofs_fixup_deduped_fragment(struct z_erofs_vle_compress_ctx *ctx, +static bool z_erofs_fixup_deduped_fragment(struct z_erofs_compress_sctx *ctx, unsigned int len) { - struct erofs_inode *inode = ctx->inode; + struct z_erofs_compress_ictx *ictx = ctx->ictx; + struct erofs_inode *inode = ictx->inode; struct erofs_sb_info *sbi = inode->sbi; const unsigned int newsize = ctx->remaining + len; @@ -417,9 +476,10 @@ static bool z_erofs_fixup_deduped_fragment(struct z_erofs_vle_compress_ctx *ctx, /* try to fix again if it gets larger (should be rare) */ if (inode->fragment_size < newsize) { - ctx->pclustersize = min_t(erofs_off_t, z_erofs_get_max_pclustersize(inode), - roundup(newsize - inode->fragment_size, - erofs_blksiz(sbi))); + ictx->pclustersize = min_t(erofs_off_t, + z_erofs_get_max_pclustersize(inode), + roundup(newsize - inode->fragment_size, + erofs_blksiz(sbi))); return false; } @@ -436,29 +496,33 @@ static bool z_erofs_fixup_deduped_fragment(struct z_erofs_vle_compress_ctx *ctx, return true; } -static int __z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx, +static int __z_erofs_compress_one(struct z_erofs_compress_sctx *ctx, struct z_erofs_inmem_extent *e) { - static char dstbuf[EROFS_CONFIG_COMPR_MAX_SZ + EROFS_MAX_BLOCK_SIZE]; - struct erofs_inode *inode = ctx->inode; + static char g_dstbuf[EROFS_CONFIG_COMPR_MAX_SZ + EROFS_MAX_BLOCK_SIZE]; + char *dstbuf = ctx->destbuf ?: g_dstbuf; + struct z_erofs_compress_ictx *ictx = ctx->ictx; + struct erofs_inode *inode = ictx->inode; struct erofs_sb_info *sbi = inode->sbi; unsigned int blksz = erofs_blksiz(sbi); char *const dst = dstbuf + blksz; - struct erofs_compress *const h = &ctx->ccfg->handle; + struct erofs_compress *const h = ctx->chandle; unsigned int len = ctx->tail - ctx->head; bool is_packed_inode = erofs_is_packed_inode(inode); bool final = !ctx->remaining; - bool may_packing = (cfg.c_fragments && final && !is_packed_inode); - bool may_inline = (cfg.c_ztailpacking && final && !may_packing); + bool may_packing = (cfg.c_fragments && final && !is_packed_inode && + !z_erofs_mt_enabled); + bool may_inline = (cfg.c_ztailpacking && final && !may_packing && + !z_erofs_mt_enabled); unsigned int compressedsize; int ret; - if (len <= ctx->pclustersize) { + if (len <= ictx->pclustersize) { if (!final || !len) return 1; if (may_packing) { - if (inode->fragment_size && !ctx->fix_dedupedfrag) { - ctx->pclustersize = roundup(len, blksz); + if (inode->fragment_size && !ictx->fix_dedupedfrag) { + ictx->pclustersize = roundup(len, blksz); goto fix_dedupedfrag; } e->length = len; @@ -470,7 +534,7 @@ static int __z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx, e->length = min(len, cfg.c_max_decompressed_extent_bytes); ret = erofs_compress_destsize(h, ctx->queue + ctx->head, - &e->length, dst, ctx->pclustersize); + &e->length, dst, ictx->pclustersize); if (ret <= 0) { erofs_err("failed to compress %s: %s", inode->i_srcpath, erofs_strerror(ret)); @@ -507,16 +571,16 @@ nocompression: e->compressedblks = 1; e->raw = true; } else if (may_packing && len == e->length && - compressedsize < ctx->pclustersize && - (!inode->fragment_size || ctx->fix_dedupedfrag)) { + compressedsize < ictx->pclustersize && + (!inode->fragment_size || ictx->fix_dedupedfrag)) { frag_packing: ret = z_erofs_pack_fragments(inode, ctx->queue + ctx->head, - len, ctx->tof_chksum); + len, ictx->tof_chksum); if (ret < 0) return ret; e->compressedblks = 0; /* indicate a fragment */ e->raw = false; - ctx->fragemitted = true; + ictx->fragemitted = true; /* tailpcluster should be less than 1 block */ } else if (may_inline && len == e->length && compressedsize < blksz) { if (ctx->clusterofs + len <= blksz) { @@ -545,8 +609,8 @@ frag_packing: */ if (may_packing && len == e->length && (compressedsize & (blksz - 1)) && - ctx->tail < sizeof(ctx->queue)) { - ctx->pclustersize = roundup(compressedsize, blksz); + ctx->tail < Z_EROFS_COMPR_QUEUE_SZ) { + ictx->pclustersize = roundup(compressedsize, blksz); goto fix_dedupedfrag; } @@ -569,34 +633,45 @@ frag_packing: } /* write compressed data */ - erofs_dbg("Writing %u compressed data to %u of %u blocks", - e->length, ctx->blkaddr, e->compressedblks); + if (ctx->membuf) { + erofs_off_t sz = e->compressedblks * blksz; + erofs_dbg("Writing %u compressed data to membuf of %u blocks", + e->length, e->compressedblks); - ret = blk_write(sbi, dst - padding, ctx->blkaddr, - e->compressedblks); - if (ret) - return ret; + memcpy(ctx->membuf + ctx->memoff, dst - padding, sz); + ctx->memoff += sz; + } else { + erofs_dbg("Writing %u compressed data to %u of %u blocks", + e->length, ctx->blkaddr, e->compressedblks); + + ret = blk_write(sbi, dst - padding, ctx->blkaddr, + e->compressedblks); + if (ret) + return ret; + } e->raw = false; may_inline = false; may_packing = false; } e->partial = false; e->blkaddr = ctx->blkaddr; + if (ctx->blkaddr != EROFS_NULL_ADDR) + ctx->blkaddr += e->compressedblks; if (!may_inline && !may_packing && !is_packed_inode) (void)z_erofs_dedupe_insert(e, ctx->queue + ctx->head); - ctx->blkaddr += e->compressedblks; ctx->head += e->length; return 0; fix_dedupedfrag: DBG_BUGON(!inode->fragment_size); ctx->remaining += inode->fragment_size; - ctx->fix_dedupedfrag = true; + ictx->fix_dedupedfrag = true; return 1; } -static int z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx) +static int z_erofs_compress_one(struct z_erofs_compress_sctx *ctx) { + struct z_erofs_compress_ictx *ictx = ctx->ictx; unsigned int len = ctx->tail - ctx->head; struct z_erofs_extent_item *ei; @@ -624,7 +699,7 @@ static int z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx) len -= ei->e.length; ctx->pivot = ei; - if (ctx->fix_dedupedfrag && !ctx->fragemitted && + if (ictx->fix_dedupedfrag && !ictx->fragemitted && z_erofs_fixup_deduped_fragment(ctx, len)) break; @@ -912,13 +987,268 @@ void z_erofs_drop_inline_pcluster(struct erofs_inode *inode) inode->eof_tailraw = NULL; } +int z_erofs_compress_segment(struct z_erofs_compress_sctx *ctx, + u64 offset, erofs_blk_t blkaddr) +{ + int fd = ctx->ictx->fd; + + ctx->blkaddr = blkaddr; + while (ctx->remaining) { + const u64 rx = min_t(u64, ctx->remaining, + Z_EROFS_COMPR_QUEUE_SZ - ctx->tail); + int ret; + + ret = (offset == -1 ? + read(fd, ctx->queue + ctx->tail, rx) : + pread(fd, ctx->queue + ctx->tail, rx, offset)); + if (ret != rx) + return -errno; + + ctx->remaining -= rx; + ctx->tail += rx; + if (offset != -1) + offset += rx; + + ret = z_erofs_compress_one(ctx); + if (ret) + return ret; + } + DBG_BUGON(ctx->head != ctx->tail); + + if (ctx->pivot) { + z_erofs_commit_extent(ctx, ctx->pivot); + ctx->pivot = NULL; + } + return 0; +} + +#ifdef EROFS_MT_ENABLED +void *z_erofs_mt_private_alloc(struct erofs_workqueue *wq, void *ptr) +{ + struct erofs_compress_wq_private *priv; + + priv = calloc(1, sizeof(struct erofs_compress_wq_private)); + if (!priv) + return NULL; + + priv->queue = malloc(Z_EROFS_COMPR_QUEUE_SZ); + if (!priv->queue) + goto err_free_priv; + priv->destbuf = calloc(1, EROFS_CONFIG_COMPR_MAX_SZ + + EROFS_MAX_BLOCK_SIZE); + if (!priv->destbuf) + goto err_free_queue; + priv->ccfg = calloc(EROFS_MAX_COMPR_CFGS, sizeof(*priv->ccfg)); + if (!priv->ccfg) + goto err_free_destbuf; + + return priv; + +err_free_destbuf: + free(priv->destbuf); +err_free_queue: + free(priv->queue); +err_free_priv: + free(priv); + return NULL; +} + +int z_erofs_mt_private_init_compr(struct erofs_sb_info *sbi, + struct erofs_compress_wq_private *priv, + unsigned int alg_id, char *alg_name, + unsigned int comp_level, + unsigned int dict_size) +{ + struct erofs_compress_cfg *lc = &priv->ccfg[alg_id]; + int ret; + + if (likely(lc->enable)) + return 0; + + ret = erofs_compressor_init(sbi, &lc->handle, alg_name, + comp_level, dict_size); + if (ret) + return ret; + lc->algorithmtype = alg_id; + lc->enable = true; + return 0; +} + +void *z_erofs_mt_private_free(struct erofs_workqueue *wq, void *private) +{ + struct erofs_compress_wq_private *priv = private; + int i; + + for (i = 0; i < EROFS_MAX_COMPR_CFGS; i++) { + if (priv->ccfg[i].enable) + erofs_compressor_exit(&priv->ccfg[i].handle); + } + free(priv->ccfg); + free(priv->destbuf); + free(priv->queue); + + free(priv); + return NULL; +} + +void z_erofs_mt_workfn(struct erofs_work *work) +{ + struct erofs_compress_work *cwork = (struct erofs_compress_work *)work; + struct z_erofs_compress_sctx *ctx = &cwork->ctx; + struct erofs_compress_wq_private *priv = work->priv; + u64 offset = ctx->seg_idx * cfg.c_segment_size; + int ret = 0; + + ret = z_erofs_mt_private_init_compr(ctx->ictx->inode->sbi, priv, + cwork->alg_id, cwork->alg_name, + cwork->comp_level, + cwork->dict_size); + if (ret) + goto out; + + ctx->queue = priv->queue; + ctx->destbuf = priv->destbuf; + ctx->chandle = &priv->ccfg[cwork->alg_id].handle; + + ctx->membuf = malloc(ctx->remaining); + if (!ctx->membuf) { + ret = -ENOMEM; + goto out; + } + ctx->memoff = 0; + + ret = z_erofs_compress_segment(ctx, offset, EROFS_NULL_ADDR); + +out: + cwork->errcode = ret; + pthread_mutex_lock(&z_erofs_mt_ctrl.mutex); + ++z_erofs_mt_ctrl.nfini; + pthread_cond_signal(&z_erofs_mt_ctrl.cond); + pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex); +} + +int z_erofs_merge_segment(struct z_erofs_compress_ictx *ictx, + struct z_erofs_compress_sctx *ctx) +{ + struct z_erofs_extent_item *ei, *n; + struct erofs_sb_info *sbi = ictx->inode->sbi; + erofs_blk_t blkoff = 0; + int ret = 0, ret2; + + list_for_each_entry_safe(ei, n, &ctx->extents, list) { + list_del(&ei->list); + list_add_tail(&ei->list, &ictx->extents); + + if (ei->e.blkaddr != EROFS_NULL_ADDR) /* deduped extents */ + continue; + + ei->e.blkaddr = ctx->blkaddr; + ctx->blkaddr += ei->e.compressedblks; + + ret2 = blk_write(sbi, ctx->membuf + blkoff * erofs_blksiz(sbi), + ei->e.blkaddr, ei->e.compressedblks); + blkoff += ei->e.compressedblks; + if (ret2) { + ret = ret2; + continue; + } + } + free(ctx->membuf); + return ret; +} + +int z_erofs_mt_compress(struct z_erofs_compress_ictx *ctx, + struct erofs_compress_cfg *ccfg, + erofs_blk_t blkaddr, + erofs_blk_t *compressed_blocks) +{ + struct erofs_compress_work *cur, *head = NULL, **last = &head; + struct erofs_inode *inode = ctx->inode; + int nsegs = DIV_ROUND_UP(inode->i_size, cfg.c_segment_size); + int ret, i; + + z_erofs_mt_ctrl.nfini = 0; + + for (i = 0; i < nsegs; i++) { + if (z_erofs_mt_ctrl.idle) { + cur = z_erofs_mt_ctrl.idle; + z_erofs_mt_ctrl.idle = cur->next; + cur->next = NULL; + } else { + cur = calloc(1, sizeof(*cur)); + if (!cur) + return -ENOMEM; + } + *last = cur; + last = &cur->next; + + cur->ctx = (struct z_erofs_compress_sctx) { + .ictx = ctx, + .seg_num = nsegs, + .seg_idx = i, + .pivot = &dummy_pivot, + }; + init_list_head(&cur->ctx.extents); + + if (i == nsegs - 1) + cur->ctx.remaining = inode->i_size - + inode->fragment_size - + i * cfg.c_segment_size; + else + cur->ctx.remaining = cfg.c_segment_size; + + cur->alg_id = ccfg->handle.alg->id; + cur->alg_name = ccfg->handle.alg->name; + cur->comp_level = ccfg->handle.compression_level; + cur->dict_size = ccfg->handle.dict_size; + + cur->work.func = z_erofs_mt_workfn; + erofs_queue_work(&z_erofs_mt_ctrl.wq, &cur->work); + } + + pthread_mutex_lock(&z_erofs_mt_ctrl.mutex); + while (z_erofs_mt_ctrl.nfini != nsegs) + pthread_cond_wait(&z_erofs_mt_ctrl.cond, + &z_erofs_mt_ctrl.mutex); + pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex); + + ret = 0; + while (head) { + cur = head; + head = cur->next; + + if (cur->errcode) { + ret = cur->errcode; + } else { + int ret2; + + cur->ctx.blkaddr = blkaddr; + ret2 = z_erofs_merge_segment(ctx, &cur->ctx); + if (ret2) + ret = ret2; + + *compressed_blocks += cur->ctx.blkaddr - blkaddr; + blkaddr = cur->ctx.blkaddr; + } + + cur->next = z_erofs_mt_ctrl.idle; + z_erofs_mt_ctrl.idle = cur; + } + return ret; +} +#endif + int erofs_write_compressed_file(struct erofs_inode *inode, int fd) { + static u8 g_queue[Z_EROFS_COMPR_QUEUE_SZ]; struct erofs_buffer_head *bh; - static struct z_erofs_vle_compress_ctx ctx; - erofs_blk_t blkaddr, compressed_blocks; + static struct z_erofs_compress_ictx ctx; + static struct z_erofs_compress_sctx sctx; + struct erofs_compress_cfg *ccfg; + erofs_blk_t blkaddr, compressed_blocks = 0; unsigned int legacymetasize; int ret; + bool ismt = false; struct erofs_sb_info *sbi = inode->sbi; u8 *compressmeta = malloc(BLK_ROUND_UP(sbi, inode->i_size) * sizeof(struct z_erofs_lcluster_index) + @@ -963,8 +1293,8 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd) } } #endif - ctx.ccfg = &erofs_ccfg[inode->z_algorithmtype[0]]; - inode->z_algorithmtype[0] = ctx.ccfg[0].algorithmtype; + ccfg = &erofs_ccfg[inode->z_algorithmtype[0]]; + inode->z_algorithmtype[0] = ccfg[0].algorithmtype; inode->z_algorithmtype[1] = 0; inode->idata_size = 0; @@ -983,50 +1313,45 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd) blkaddr = erofs_mapbh(bh->block); /* start_blkaddr */ ctx.inode = inode; ctx.pclustersize = z_erofs_get_max_pclustersize(inode); - ctx.blkaddr = blkaddr; ctx.metacur = compressmeta + Z_EROFS_LEGACY_MAP_HEADER_SIZE; - ctx.head = ctx.tail = 0; - ctx.clusterofs = 0; - ctx.pivot = &dummy_pivot; init_list_head(&ctx.extents); - ctx.remaining = inode->i_size - inode->fragment_size; + ctx.fd = fd; ctx.fix_dedupedfrag = false; ctx.fragemitted = false; + sctx = (struct z_erofs_compress_sctx) { .ictx = &ctx, }; + init_list_head(&sctx.extents); + if (cfg.c_all_fragments && !erofs_is_packed_inode(inode) && !inode->fragment_size) { ret = z_erofs_pack_file_from_fd(inode, fd, ctx.tof_chksum); if (ret) goto err_free_idata; +#ifdef EROFS_MT_ENABLED + } else if (z_erofs_mt_enabled && inode->i_size > cfg.c_segment_size) { + ismt = true; + ret = z_erofs_mt_compress(&ctx, ccfg, blkaddr, &compressed_blocks); + if (ret) + goto err_free_idata; +#endif } else { - while (ctx.remaining) { - const u64 rx = min_t(u64, ctx.remaining, - sizeof(ctx.queue) - ctx.tail); - - ret = read(fd, ctx.queue + ctx.tail, rx); - if (ret != rx) { - ret = -errno; - goto err_bdrop; - } - ctx.remaining -= rx; - ctx.tail += rx; - - ret = z_erofs_compress_one(&ctx); - if (ret) - goto err_free_idata; - } + sctx.queue = g_queue; + sctx.destbuf = NULL; + sctx.chandle = &ccfg->handle; + sctx.remaining = inode->i_size - inode->fragment_size; + sctx.seg_num = 1; + sctx.seg_idx = 0; + sctx.pivot = &dummy_pivot; + + ret = z_erofs_compress_segment(&sctx, -1, blkaddr); + if (ret) + goto err_free_idata; + compressed_blocks = sctx.blkaddr - blkaddr; } - DBG_BUGON(ctx.head != ctx.tail); /* fall back to no compression mode */ - compressed_blocks = ctx.blkaddr - blkaddr; DBG_BUGON(compressed_blocks < !!inode->idata_size); compressed_blocks -= !!inode->idata_size; - if (ctx.pivot) { - z_erofs_commit_extent(&ctx, ctx.pivot); - ctx.pivot = NULL; - } - /* generate an extent for the deduplicated fragment */ if (inode->fragment_size && !ctx.fragemitted) { struct z_erofs_extent_item *ei; @@ -1042,13 +1367,16 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd) .compressedblks = 0, .raw = false, .partial = false, - .blkaddr = ctx.blkaddr, + .blkaddr = sctx.blkaddr, }; init_list_head(&ei->list); - z_erofs_commit_extent(&ctx, ei); + z_erofs_commit_extent(&sctx, ei); } z_erofs_fragments_commit(inode); + if (!ismt) + list_splice_tail(&sctx.extents, &ctx.extents); + z_erofs_write_indexes(&ctx); legacymetasize = ctx.metacur - compressmeta; /* estimate if data compression saves space or not */ @@ -1257,8 +1585,25 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s return -EINVAL; } - if (erofs_sb_has_compr_cfgs(sbi)) - return z_erofs_build_compr_cfgs(sbi, sb_bh, max_dict_size); + if (erofs_sb_has_compr_cfgs(sbi)) { + ret = z_erofs_build_compr_cfgs(sbi, sb_bh, max_dict_size); + if (ret) + return ret; + } + + z_erofs_mt_enabled = false; +#ifdef EROFS_MT_ENABLED + if (cfg.c_mt_workers > 1) { + pthread_mutex_init(&z_erofs_mt_ctrl.mutex, NULL); + pthread_cond_init(&z_erofs_mt_ctrl.cond, NULL); + ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.wq, + cfg.c_mt_workers, + cfg.c_mt_workers << 2, + z_erofs_mt_private_alloc, + z_erofs_mt_private_free); + z_erofs_mt_enabled = !ret; + } +#endif return 0; } @@ -1271,5 +1616,19 @@ int z_erofs_compress_exit(void) if (ret) return ret; } + + if (z_erofs_mt_enabled) { +#ifdef EROFS_MT_ENABLED + ret = erofs_destroy_workqueue(&z_erofs_mt_ctrl.wq); + if (ret) + return ret; + while (z_erofs_mt_ctrl.idle) { + struct erofs_compress_work *tmp = + z_erofs_mt_ctrl.idle->next; + free(z_erofs_mt_ctrl.idle); + z_erofs_mt_ctrl.idle = tmp; + } +#endif + } return 0; } diff --git a/lib/compressor.c b/lib/compressor.c index 58eae2a..175259e 100644 --- a/lib/compressor.c +++ b/lib/compressor.c @@ -86,6 +86,8 @@ int erofs_compressor_init(struct erofs_sb_info *sbi, struct erofs_compress *c, /* should be written in "minimum compression ratio * 100" */ c->compress_threshold = 100; + c->compression_level = -1; + c->dict_size = 0; if (!alg_name) { c->alg = NULL; diff --git a/mkfs/main.c b/mkfs/main.c index 89252c2..9c96750 100644 --- a/mkfs/main.c +++ b/mkfs/main.c @@ -843,6 +843,15 @@ static int mkfs_parse_options_cfg(int argc, char *argv[]) cfg.c_pclusterblks_packed = pclustersize_packed >> sbi.blkszbits; } +#ifdef EROFS_MT_ENABLED + if (cfg.c_mt_workers > 1 && + (cfg.c_dedupe || cfg.c_fragments || cfg.c_ztailpacking)) { + cfg.c_mt_workers = 1; + erofs_warn("Please note that dedupe/fragments/ztailpacking" + "is NOT supported in multi-threaded mode now, using worker=1."); + } +#endif + return 0; } -- 2.44.0
From: Gao Xiang <hsiangkao@linux.alibaba.com> Add some helpers (relaxed semantics) in order to prepare for the upcoming multi-threaded support. For example, compressor may be initialized more than once in different worker threads, resulting in noisy warnings. This patch makes sure that each message will be printed only once by adding `__warnonce` atomic booleans to each erofs_compressor_init(). Cc: Yifan Zhao <zhaoyifan@sjtu.edu.cn> Signed-off-by: Gao Xiang <hsiangkao@linux.alibaba.com> Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn> --- include/erofs/atomic.h | 28 ++++++++++++++++++++++++++++ lib/compressor_deflate.c | 11 ++++++++--- lib/compressor_libdeflate.c | 6 +++++- lib/compressor_liblzma.c | 5 ++++- 4 files changed, 45 insertions(+), 5 deletions(-) create mode 100644 include/erofs/atomic.h diff --git a/include/erofs/atomic.h b/include/erofs/atomic.h new file mode 100644 index 0000000..214cdb1 --- /dev/null +++ b/include/erofs/atomic.h @@ -0,0 +1,28 @@ +/* SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0 */ +/* + * Copyright (C) 2024 Alibaba Cloud + */ +#ifndef __EROFS_ATOMIC_H +#define __EROFS_ATOMIC_H + +/* + * Just use GCC/clang built-in functions for now + * See: https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html + */ +typedef unsigned long erofs_atomic_t; +typedef char erofs_atomic_bool_t; + +#define erofs_atomic_read(ptr) ({ \ + typeof(*ptr) __n; \ + __atomic_load(ptr, &__n, __ATOMIC_RELAXED); \ +__n;}) + +#define erofs_atomic_set(ptr, n) do { \ + typeof(*ptr) __n = (n); \ + __atomic_store(ptr, &__n, __ATOMIC_RELAXED); \ +} while(0) + +#define erofs_atomic_test_and_set(ptr) \ + __atomic_test_and_set(ptr, __ATOMIC_RELAXED) + +#endif diff --git a/lib/compressor_deflate.c b/lib/compressor_deflate.c index 8629415..e482224 100644 --- a/lib/compressor_deflate.c +++ b/lib/compressor_deflate.c @@ -7,6 +7,7 @@ #include "erofs/print.h" #include "erofs/config.h" #include "compressor.h" +#include "erofs/atomic.h" void *kite_deflate_init(int level, unsigned int dict_size); void kite_deflate_end(void *s); @@ -36,6 +37,8 @@ static int compressor_deflate_exit(struct erofs_compress *c) static int compressor_deflate_init(struct erofs_compress *c) { + static erofs_atomic_bool_t __warnonce; + if (c->private_data) { kite_deflate_end(c->private_data); c->private_data = NULL; @@ -44,9 +47,11 @@ static int compressor_deflate_init(struct erofs_compress *c) if (IS_ERR_VALUE(c->private_data)) return PTR_ERR(c->private_data); - erofs_warn("EXPERIMENTAL DEFLATE algorithm in use. Use at your own risk!"); - erofs_warn("*Carefully* check filesystem data correctness to avoid corruption!"); - erofs_warn("Please send a report to <linux-erofs@lists.ozlabs.org> if something is wrong."); + if (!erofs_atomic_test_and_set(&__warnonce)) { + erofs_warn("EXPERIMENTAL DEFLATE algorithm in use. Use at your own risk!"); + erofs_warn("*Carefully* check filesystem data correctness to avoid corruption!"); + erofs_warn("Please send a report to <linux-erofs@lists.ozlabs.org> if something is wrong."); + } return 0; } diff --git a/lib/compressor_libdeflate.c b/lib/compressor_libdeflate.c index 62d93f7..14cbce4 100644 --- a/lib/compressor_libdeflate.c +++ b/lib/compressor_libdeflate.c @@ -4,6 +4,7 @@ #include "erofs/config.h" #include <libdeflate.h> #include "compressor.h" +#include "erofs/atomic.h" static int libdeflate_compress_destsize(const struct erofs_compress *c, const void *src, unsigned int *srcsize, @@ -82,12 +83,15 @@ static int compressor_libdeflate_exit(struct erofs_compress *c) static int compressor_libdeflate_init(struct erofs_compress *c) { + static erofs_atomic_bool_t __warnonce; + libdeflate_free_compressor(c->private_data); c->private_data = libdeflate_alloc_compressor(c->compression_level); if (!c->private_data) return -ENOMEM; - erofs_warn("EXPERIMENTAL libdeflate compressor in use. Use at your own risk!"); + if (!erofs_atomic_test_and_set(&__warnonce)) + erofs_warn("EXPERIMENTAL libdeflate compressor in use. Use at your own risk!"); return 0; } diff --git a/lib/compressor_liblzma.c b/lib/compressor_liblzma.c index 712f44f..2f19a93 100644 --- a/lib/compressor_liblzma.c +++ b/lib/compressor_liblzma.c @@ -9,6 +9,7 @@ #include "erofs/config.h" #include "erofs/print.h" #include "erofs/internal.h" +#include "erofs/atomic.h" #include "compressor.h" struct erofs_liblzma_context { @@ -85,6 +86,7 @@ static int erofs_compressor_liblzma_init(struct erofs_compress *c) { struct erofs_liblzma_context *ctx; u32 preset; + static erofs_atomic_bool_t __warnonce; ctx = malloc(sizeof(*ctx)); if (!ctx) @@ -103,7 +105,8 @@ static int erofs_compressor_liblzma_init(struct erofs_compress *c) ctx->opt.dict_size = c->dict_size; c->private_data = ctx; - erofs_warn("It may take a longer time since MicroLZMA is still single-threaded for now."); + if (!erofs_atomic_test_and_set(&__warnonce)) + erofs_warn("It may take a longer time since MicroLZMA is still single-threaded for now."); return 0; } -- 2.44.0
This patch introduces a --worker=# parameter for the incoming multi-threaded compression support. It also introduces a segment size used in multi-threaded compression, which has the default value 16MB and cannot be modified. It also introduces a concept called `segment size` to split large files for multi-threading, which has the default value 16MB for now. Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn> --- include/erofs/config.h | 4 ++++ lib/config.c | 4 ++++ mkfs/main.c | 28 ++++++++++++++++++++++++++++ 3 files changed, 36 insertions(+) diff --git a/include/erofs/config.h b/include/erofs/config.h index 73e3ac2..d2f91ff 100644 --- a/include/erofs/config.h +++ b/include/erofs/config.h @@ -75,6 +75,10 @@ struct erofs_configure { char c_force_chunkformat; /* < 0, xattr disabled and INT_MAX, always use inline xattrs */ int c_inline_xattr_tolerance; +#ifdef EROFS_MT_ENABLED + u64 c_segment_size; + u32 c_mt_workers; +#endif u32 c_pclusterblks_max, c_pclusterblks_def, c_pclusterblks_packed; u32 c_max_decompressed_extent_bytes; diff --git a/lib/config.c b/lib/config.c index 947a183..2530274 100644 --- a/lib/config.c +++ b/lib/config.c @@ -38,6 +38,10 @@ void erofs_init_configure(void) cfg.c_pclusterblks_max = 1; cfg.c_pclusterblks_def = 1; cfg.c_max_decompressed_extent_bytes = -1; +#ifdef EROFS_MT_ENABLED + cfg.c_segment_size = 16ULL * 1024 * 1024; + cfg.c_mt_workers = 1; +#endif erofs_stdout_tty = isatty(STDOUT_FILENO); } diff --git a/mkfs/main.c b/mkfs/main.c index 8a68a72..89252c2 100644 --- a/mkfs/main.c +++ b/mkfs/main.c @@ -77,6 +77,9 @@ static struct option long_options[] = { #ifdef HAVE_LIBLZMA {"unlzma", optional_argument, NULL, 519}, {"unxz", optional_argument, NULL, 519}, +#endif +#ifdef EROFS_MT_ENABLED + {"workers", required_argument, NULL, 520}, #endif {0, 0, 0, 0}, }; @@ -187,6 +190,9 @@ static void usage(int argc, char **argv) " --product-out=X X=product_out directory\n" " --fs-config-file=X X=fs_config file\n" " --block-list-file=X X=block_list file\n" +#endif +#ifdef EROFS_MT_ENABLED + " --workers=# set the number of worker threads to # (default=1)\n" #endif ); } @@ -416,6 +422,13 @@ static void erofs_rebuild_cleanup(void) rebuild_src_count = 0; } +#ifdef EROFS_MT_ENABLED +static u32 mkfs_max_worker_num() +{ + return erofs_get_available_processors() ?: 16; +} +#endif + static int mkfs_parse_options_cfg(int argc, char *argv[]) { char *endptr; @@ -660,6 +673,20 @@ static int mkfs_parse_options_cfg(int argc, char *argv[]) erofstar.dumpfile = strdup(optarg); tarerofs_decoder = EROFS_IOS_DECODER_GZIP + (opt - 518); break; +#ifdef EROFS_MT_ENABLED + case 520: + cfg.c_mt_workers = strtoul(optarg, &endptr, 0); + if (errno || *endptr != '\0') { + erofs_err("invalid worker number %s", optarg); + return -EINVAL; + } + if (cfg.c_mt_workers > mkfs_max_worker_num()) { + cfg.c_mt_workers = mkfs_max_worker_num(); + erofs_warn("worker number %s is too large, setting to %u", + optarg, cfg.c_mt_workers); + } + break; +#endif case 'V': version(); exit(0); @@ -815,6 +842,7 @@ static int mkfs_parse_options_cfg(int argc, char *argv[]) } cfg.c_pclusterblks_packed = pclustersize_packed >> sbi.blkszbits; } + return 0; } -- 2.44.0
From: Gao Xiang <hsiangkao@linux.alibaba.com> In order to prepare for multi-threaded decompression. Signed-off-by: Gao Xiang <hsiangkao@linux.alibaba.com> --- configure.ac | 1 + include/erofs/config.h | 1 + lib/config.c | 12 ++++++++++++ 3 files changed, 14 insertions(+) diff --git a/configure.ac b/configure.ac index 3ccd6bb..2e69260 100644 --- a/configure.ac +++ b/configure.ac @@ -256,6 +256,7 @@ AC_CHECK_FUNCS(m4_flatten([ strerror strrchr strtoull + sysconf tmpfile64 utimensat])) diff --git a/include/erofs/config.h b/include/erofs/config.h index eecf575..73e3ac2 100644 --- a/include/erofs/config.h +++ b/include/erofs/config.h @@ -109,6 +109,7 @@ static inline int erofs_selabel_open(const char *file_contexts) void erofs_update_progressinfo(const char *fmt, ...); char *erofs_trim_for_progressinfo(const char *str, int placeholder); +unsigned int erofs_get_available_processors(void); #ifdef __cplusplus } diff --git a/lib/config.c b/lib/config.c index 1096cd1..947a183 100644 --- a/lib/config.c +++ b/lib/config.c @@ -14,6 +14,9 @@ #ifdef HAVE_SYS_IOCTL_H #include <sys/ioctl.h> #endif +#ifdef HAVE_UNISTD_H +#include <unistd.h> +#endif struct erofs_configure cfg; struct erofs_sb_info sbi; @@ -177,3 +180,12 @@ void erofs_update_progressinfo(const char *fmt, ...) fputs(msg, stdout); fputc('\n', stdout); } + +unsigned int erofs_get_available_processors(void) +{ +#if defined(HAVE_UNISTD_H) && defined(HAVE_SYSCONF) + return sysconf(_SC_NPROCESSORS_ONLN); +#else + return 0; +#endif +} -- 2.44.0
Add a workqueue implementation for multi-threading support inspired by xfsprogs. Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn> Suggested-by: Gao Xiang <hsiangkao@linux.alibaba.com> --- configure.ac | 16 +++++ include/erofs/internal.h | 3 + include/erofs/workqueue.h | 35 +++++++++++ lib/Makefile.am | 4 ++ lib/workqueue.c | 129 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 187 insertions(+) create mode 100644 include/erofs/workqueue.h create mode 100644 lib/workqueue.c diff --git a/configure.ac b/configure.ac index 4b59230..3ccd6bb 100644 --- a/configure.ac +++ b/configure.ac @@ -96,6 +96,14 @@ AC_DEFUN([EROFS_UTILS_PARSE_DIRECTORY], AC_ARG_VAR([MAX_BLOCK_SIZE], [The maximum block size which erofs-utils supports]) +AC_MSG_CHECKING([whether to enable multi-threading support]) +AC_ARG_ENABLE([multithreading], + AS_HELP_STRING([--enable-multithreading], + [enable multi-threading support @<:@default=no@:>@]), + [enable_multithreading="$enableval"], + [enable_multithreading="no"]) +AC_MSG_RESULT([$enable_multithreading]) + AC_ARG_ENABLE([debug], [AS_HELP_STRING([--enable-debug], [enable debugging mode @<:@default=no@:>@])], @@ -280,6 +288,13 @@ AS_IF([test "x$MAX_BLOCK_SIZE" = "x"], [ [erofs_cv_max_block_size=4096])) ], [erofs_cv_max_block_size=$MAX_BLOCK_SIZE]) +# Configure multi-threading support +AS_IF([test "x$enable_multithreading" != "xno"], [ + AC_CHECK_HEADERS([pthread.h]) + AC_CHECK_LIB([pthread], [pthread_mutex_lock], [], AC_MSG_ERROR([libpthread is required for multi-threaded build])) + AC_DEFINE(EROFS_MT_ENABLED, 1, [Enable multi-threading support]) +], []) + # Configure debug mode AS_IF([test "x$enable_debug" != "xno"], [], [ dnl Turn off all assert checking. @@ -471,6 +486,7 @@ AS_IF([test "x$enable_fuzzing" != "xyes"], [], [ AM_CONDITIONAL([ENABLE_FUZZING], [test "x${enable_fuzzing}" = "xyes"]) # Set up needed symbols, conditionals and compiler/linker flags +AM_CONDITIONAL([ENABLE_EROFS_MT], [test "x${enable_multithreading}" != "xno"]) AM_CONDITIONAL([ENABLE_LZ4], [test "x${have_lz4}" = "xyes"]) AM_CONDITIONAL([ENABLE_LZ4HC], [test "x${have_lz4hc}" = "xyes"]) AM_CONDITIONAL([ENABLE_FUSE], [test "x${have_fuse}" = "xyes"]) diff --git a/include/erofs/internal.h b/include/erofs/internal.h index 5e968d6..4cd2059 100644 --- a/include/erofs/internal.h +++ b/include/erofs/internal.h @@ -22,6 +22,9 @@ typedef unsigned short umode_t; #include <sys/types.h> /* for off_t definition */ #include <sys/stat.h> /* for S_ISCHR definition */ #include <stdio.h> +#ifdef HAVE_PTHREAD_H +#include <pthread.h> +#endif #ifndef PATH_MAX #define PATH_MAX 4096 /* # chars in a path name including nul */ diff --git a/include/erofs/workqueue.h b/include/erofs/workqueue.h new file mode 100644 index 0000000..47c4f08 --- /dev/null +++ b/include/erofs/workqueue.h @@ -0,0 +1,35 @@ +/* SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0 */ +#ifndef __EROFS_WORKQUEUE_H +#define __EROFS_WORKQUEUE_H + +#include "internal.h" + +struct erofs_workqueue; + +typedef void *(*erofs_wq_func_t)(struct erofs_workqueue *, void *); + +struct erofs_work { + void (*func)(struct erofs_work *work); + struct erofs_work *next; + void *priv; +}; + +struct erofs_workqueue { + struct erofs_work *head, *tail; + pthread_mutex_t lock; + pthread_cond_t cond_empty; + pthread_cond_t cond_full; + pthread_t *workers; + unsigned int nworker; + unsigned int max_jobs; + unsigned int job_count; + bool shutdown; + erofs_wq_func_t on_start, on_exit; +}; + +int erofs_alloc_workqueue(struct erofs_workqueue *wq, unsigned int nworker, + unsigned int max_jobs, erofs_wq_func_t start_func, + erofs_wq_func_t exit_func); +int erofs_queue_work(struct erofs_workqueue *wq, struct erofs_work *work); +int erofs_destroy_workqueue(struct erofs_workqueue *wq); +#endif \ No newline at end of file diff --git a/lib/Makefile.am b/lib/Makefile.am index 54b9c9c..b3bea74 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -53,3 +53,7 @@ liberofs_la_SOURCES += kite_deflate.c compressor_deflate.c if ENABLE_LIBDEFLATE liberofs_la_SOURCES += compressor_libdeflate.c endif +if ENABLE_EROFS_MT +liberofs_la_LDFLAGS = -lpthread +liberofs_la_SOURCES += workqueue.c +endif diff --git a/lib/workqueue.c b/lib/workqueue.c new file mode 100644 index 0000000..398734f --- /dev/null +++ b/lib/workqueue.c @@ -0,0 +1,129 @@ +// SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0 +#include <pthread.h> +#include <stdlib.h> +#include "erofs/workqueue.h" + +static void *worker_thread(void *arg) +{ + struct erofs_workqueue *wq = arg; + struct erofs_work *work; + void *priv = NULL; + + if (wq->on_start) + priv = (wq->on_start)(wq, NULL); + + while (true) { + pthread_mutex_lock(&wq->lock); + + while (wq->job_count == 0 && !wq->shutdown) + pthread_cond_wait(&wq->cond_empty, &wq->lock); + if (wq->job_count == 0 && wq->shutdown) { + pthread_mutex_unlock(&wq->lock); + break; + } + + work = wq->head; + wq->head = work->next; + if (!wq->head) + wq->tail = NULL; + wq->job_count--; + + if (wq->job_count == wq->max_jobs - 1) + pthread_cond_broadcast(&wq->cond_full); + + pthread_mutex_unlock(&wq->lock); + + work->priv = priv; + work->func(work); + } + + if (wq->on_exit) + (wq->on_exit)(wq, priv); + + return NULL; +} + +int erofs_alloc_workqueue(struct erofs_workqueue *wq, unsigned int nworker, + unsigned int max_jobs, erofs_wq_func_t start_func, + erofs_wq_func_t exit_func) +{ + unsigned int i; + int ret; + + if (!wq || nworker <= 0 || max_jobs <= 0) + return -EINVAL; + + wq->head = wq->tail = NULL; + wq->nworker = nworker; + wq->max_jobs = max_jobs; + wq->job_count = 0; + wq->shutdown = false; + wq->on_start = start_func; + wq->on_exit = exit_func; + pthread_mutex_init(&wq->lock, NULL); + pthread_cond_init(&wq->cond_empty, NULL); + pthread_cond_init(&wq->cond_full, NULL); + + wq->workers = malloc(nworker * sizeof(pthread_t)); + if (!wq->workers) + return -ENOMEM; + + for (i = 0; i < nworker; i++) { + ret = pthread_create(&wq->workers[i], NULL, worker_thread, wq); + if (ret) { + while (i) + pthread_cancel(wq->workers[--i]); + free(wq->workers); + return ret; + } + } + + return 0; +} + +int erofs_queue_work(struct erofs_workqueue *wq, struct erofs_work *work) +{ + if (!wq || !work) + return -EINVAL; + + pthread_mutex_lock(&wq->lock); + + while (wq->job_count == wq->max_jobs) + pthread_cond_wait(&wq->cond_full, &wq->lock); + + work->next = NULL; + if (!wq->head) + wq->head = work; + else + wq->tail->next = work; + wq->tail = work; + wq->job_count++; + + pthread_cond_signal(&wq->cond_empty); + pthread_mutex_unlock(&wq->lock); + + return 0; +} + +int erofs_destroy_workqueue(struct erofs_workqueue *wq) +{ + unsigned int i; + + if (!wq) + return -EINVAL; + + pthread_mutex_lock(&wq->lock); + wq->shutdown = true; + pthread_cond_broadcast(&wq->cond_empty); + pthread_mutex_unlock(&wq->lock); + + for (i = 0; i < wq->nworker; i++) + pthread_join(wq->workers[i], NULL); + + free(wq->workers); + pthread_mutex_destroy(&wq->lock); + pthread_cond_destroy(&wq->cond_empty); + pthread_cond_destroy(&wq->cond_full); + + return 0; +} \ No newline at end of file -- 2.44.0