All of lore.kernel.org
 help / color / mirror / Atom feed
* [RFC PATCH 0/5] Demonstrate new parallel threading API
@ 2015-08-25 17:28 Stefan Beller
  2015-08-25 17:28 ` [PATCH 1/5] FIXUP submodule: implement `module_clone` as a builtin helper Stefan Beller
                   ` (4 more replies)
  0 siblings, 5 replies; 20+ messages in thread
From: Stefan Beller @ 2015-08-25 17:28 UTC (permalink / raw)
  To: peff; +Cc: git, jrnieder, gitster, Stefan Beller

This series build on top of origin/sb/submodule-helper.
The first patch is a fixup to the last commit in the target branch
to fix a memory leak. The patch is not really part of the series, but
as I chose to build on top of that series I can fix it up as we go.

The patch 2 adds a new API to easily use a threaded work pool.

patch 3 adds the command `submodule foreach_parallel` (completely untested, RFC!)
which was the original goal of this series. But oh well, I got tricked by Peff
to prove how awesome the new API for parallel threading is, so that's shown off
in patches 4 and 5.

Note: Both patch 4 and 5 delete more lines of code than they add, improving 
readability a lot as you can focus on the actual task and not on the threading
stuff.

Any feedback welcome!
Thanks,
Stefan

Stefan Beller (5):
  FIXUP submodule: implement `module_clone` as a builtin helper
  thread-utils: add a threaded task queue
  submodule: helper to run foreach in parallel
  index-pack: Use the new worker pool
  pack-objects: Use new worker pool

 builtin/index-pack.c        |  71 ++++++-------
 builtin/pack-objects.c      | 175 +++++++++++---------------------
 builtin/submodule--helper.c | 159 +++++++++++++++++++++++++++--
 git-submodule.sh            |   9 ++
 run-command.c               |  29 +++---
 thread-utils.c              | 237 ++++++++++++++++++++++++++++++++++++++++++++
 thread-utils.h              |  40 ++++++++
 7 files changed, 545 insertions(+), 175 deletions(-)

-- 
2.5.0.400.gff86faf

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [PATCH 1/5] FIXUP submodule: implement `module_clone` as a builtin helper
  2015-08-25 17:28 [RFC PATCH 0/5] Demonstrate new parallel threading API Stefan Beller
@ 2015-08-25 17:28 ` Stefan Beller
  2015-08-25 17:28 ` [PATCH 2/5] thread-utils: add a threaded task queue Stefan Beller
                   ` (3 subsequent siblings)
  4 siblings, 0 replies; 20+ messages in thread
From: Stefan Beller @ 2015-08-25 17:28 UTC (permalink / raw)
  To: peff; +Cc: git, jrnieder, gitster, Stefan Beller

This closes the memory leaks as pointed out by Jeff.

Signed-off-by: Stefan Beller <sbeller@google.com>
---
 builtin/submodule--helper.c | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git a/builtin/submodule--helper.c b/builtin/submodule--helper.c
index ae74b80..7e298b4 100644
--- a/builtin/submodule--helper.c
+++ b/builtin/submodule--helper.c
@@ -220,7 +220,6 @@ static int module_clone(int argc, const char **argv, const char *prefix)
 
 	strbuf_addf(&sb, "%s/modules/%s", get_git_dir(), name);
 	sm_gitdir = strbuf_detach(&sb, NULL);
-	strbuf_reset(&sb);
 
 	if (!file_exists(sm_gitdir)) {
 		safe_create_leading_directories_const(sm_gitdir);
@@ -259,12 +258,16 @@ static int module_clone(int argc, const char **argv, const char *prefix)
 	/* Redirect the worktree of the submodule in the superprojects config */
 	if (!is_absolute_path(sm_gitdir)) {
 		char *s = (char*)sm_gitdir;
-		strbuf_addf(&sb, "%s/%s", xgetcwd(), sm_gitdir);
+		if (strbuf_getcwd(&sb))
+			die_errno("unable to get current working directory");
+		strbuf_addf(&sb, "/%s", sm_gitdir);
 		sm_gitdir = strbuf_detach(&sb, NULL);
-		strbuf_reset(&sb);
 		free(s);
 	}
-	strbuf_addf(&sb, "%s/%s", xgetcwd(), path);
+
+	if (strbuf_getcwd(&sb))
+		die_errno("unable to get current working directory");
+	strbuf_addf(&sb, "/%s", path);
 
 	p = git_pathdup_submodule(path, "config");
 	if (!p)
-- 
2.5.0.400.gff86faf

^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PATCH 2/5] thread-utils: add a threaded task queue
  2015-08-25 17:28 [RFC PATCH 0/5] Demonstrate new parallel threading API Stefan Beller
  2015-08-25 17:28 ` [PATCH 1/5] FIXUP submodule: implement `module_clone` as a builtin helper Stefan Beller
@ 2015-08-25 17:28 ` Stefan Beller
  2015-08-25 17:28 ` [PATCH 3/5] submodule: helper to run foreach in parallel Stefan Beller
                   ` (2 subsequent siblings)
  4 siblings, 0 replies; 20+ messages in thread
From: Stefan Beller @ 2015-08-25 17:28 UTC (permalink / raw)
  To: peff; +Cc: git, jrnieder, gitster, Stefan Beller

This adds functionality to do work in a parallel threaded
fashion while the boiler plate code for setting up threads
and tearing them down as well as queuing up tasks is hidden
behind the new API.

Signed-off-by: Stefan Beller <sbeller@google.com>
---
 run-command.c  |  29 ++++---
 thread-utils.c | 237 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 thread-utils.h |  40 ++++++++++
 3 files changed, 294 insertions(+), 12 deletions(-)

diff --git a/run-command.c b/run-command.c
index 28e1d55..cb15cd9 100644
--- a/run-command.c
+++ b/run-command.c
@@ -668,6 +668,22 @@ int git_atexit(void (*handler)(void))
 
 #endif
 
+void setup_main_thread(void)
+{
+	if (!main_thread_set) {
+		/*
+		 * We assume that the first time that start_async is called
+		 * it is from the main thread.
+		 */
+		main_thread_set = 1;
+		main_thread = pthread_self();
+		pthread_key_create(&async_key, NULL);
+		pthread_key_create(&async_die_counter, NULL);
+		set_die_routine(die_async);
+		set_die_is_recursing_routine(async_die_is_recursing);
+	}
+}
+
 int start_async(struct async *async)
 {
 	int need_in, need_out;
@@ -740,18 +756,7 @@ int start_async(struct async *async)
 	else if (async->out)
 		close(async->out);
 #else
-	if (!main_thread_set) {
-		/*
-		 * We assume that the first time that start_async is called
-		 * it is from the main thread.
-		 */
-		main_thread_set = 1;
-		main_thread = pthread_self();
-		pthread_key_create(&async_key, NULL);
-		pthread_key_create(&async_die_counter, NULL);
-		set_die_routine(die_async);
-		set_die_is_recursing_routine(async_die_is_recursing);
-	}
+	setup_main_thread();
 
 	if (proc_in >= 0)
 		set_cloexec(proc_in);
diff --git a/thread-utils.c b/thread-utils.c
index a2135e0..936b3672 100644
--- a/thread-utils.c
+++ b/thread-utils.c
@@ -1,5 +1,7 @@
 #include "cache.h"
 #include "thread-utils.h"
+#include "run-command.h"
+#include "git-compat-util.h"
 
 #if defined(hpux) || defined(__hpux) || defined(_hpux)
 #  include <sys/pstat.h>
@@ -75,3 +77,238 @@ int init_recursive_mutex(pthread_mutex_t *m)
 	}
 	return ret;
 }
+
+#ifndef NO_PTHREADS
+struct job_list {
+	int (*fct)(struct task_queue *tq, void *task);
+	void *task;
+	struct job_list *next;
+};
+
+static pthread_t main_thread;
+static int main_thread_set;
+static pthread_key_t async_key;
+static pthread_key_t async_die_counter;
+
+static NORETURN void die_async(const char *err, va_list params)
+{
+	vreportf("fatal: ", err, params);
+
+	if (!pthread_equal(main_thread, pthread_self()))
+		pthread_exit((void *)128);
+
+	exit(128);
+}
+
+static int async_die_is_recursing(void)
+{
+	void *ret = pthread_getspecific(async_die_counter);
+	pthread_setspecific(async_die_counter, (void *)1);
+	return ret != NULL;
+}
+
+/* FIXME: deduplicate this code with run-command.c */
+static void setup_main_thread(void)
+{
+	if (!main_thread_set) {
+		main_thread_set = 1;
+		main_thread = pthread_self();
+		pthread_key_create(&async_key, NULL);
+		pthread_key_create(&async_die_counter, NULL);
+		set_die_routine(die_async);
+		set_die_is_recursing_routine(async_die_is_recursing);
+	}
+}
+
+struct task_queue {
+	/*
+	 * To avoid deadlocks always acquire the semaphores with lowest priority
+	 * first, priorites are in descending order as listed.
+	 *
+	 * The `mutex` is a general purpose lock for modifying data in the async
+	 * queue, such as adding a new task or adding a return value from
+	 * an already run task.
+	 *
+	 * `workingcount` and `freecount` are opposing semaphores, the sum of
+	 * their values should equal `max_threads` at any time while the `mutex`
+	 * is available.
+	 */
+	sem_t mutex;
+	sem_t workingcount;
+	sem_t freecount;
+
+	pthread_t *threads;
+	unsigned max_threads;
+
+	struct job_list *first;
+	struct job_list *last;
+
+	void (*finish_function)(struct task_queue *tq);
+	int early_return;
+};
+
+static void next_task(struct task_queue *tq,
+		      int (**fct)(struct task_queue *tq, void *task),
+		      void **task,
+		      int *early_return)
+{
+	struct job_list *job = NULL;
+
+	sem_wait(&tq->workingcount);
+	sem_wait(&tq->mutex);
+
+	if (*early_return) {
+		tq->early_return |= *early_return;
+		*fct = NULL;
+		*task = NULL;
+	} else {
+		if (!tq->first)
+			die("BUG: internal error with dequeuing jobs for threads");
+
+		job = tq->first;
+		*fct = job->fct;
+		*task = job->task;
+
+		tq->first = job->next;
+		if (!tq->first)
+			tq->last = NULL;
+	}
+
+	sem_post(&tq->freecount);
+	sem_post(&tq->mutex);
+
+	free(job);
+}
+
+static void *dispatcher(void *args)
+{
+	void *task;
+	int (*fct)(struct task_queue *tq, void *task);
+	int early_return = 0;
+	struct task_queue *tq = args;
+
+	next_task(tq, &fct, &task, &early_return);
+	while (fct || early_return != 0) {
+		early_return = fct(tq, task);
+		next_task(tq, &fct, &task, &early_return);
+	}
+
+	if (tq->finish_function)
+		tq->finish_function(tq);
+
+	pthread_exit(0);
+}
+
+struct task_queue *create_task_queue(unsigned max_threads)
+{
+	struct task_queue *tq = xmalloc(sizeof(*tq));
+
+	int i, ret;
+	if (!max_threads)
+		tq->max_threads = online_cpus();
+	else
+		tq->max_threads = max_threads;
+
+	sem_init(&tq->mutex, 0, 1);
+	sem_init(&tq->workingcount, 0, 0);
+	sem_init(&tq->freecount, 0, tq->max_threads);
+	tq->threads = xmalloc(tq->max_threads * sizeof(pthread_t));
+
+	tq->first = NULL;
+	tq->last = NULL;
+
+	setup_main_thread();
+
+	for (i = 0; i < tq->max_threads; i++) {
+		ret = pthread_create(&tq->threads[i], 0, &dispatcher, tq);
+		if (ret)
+			die("unable to create thread: %s", strerror(ret));
+	}
+
+	tq->early_return = 0;
+
+	return tq;
+}
+
+void add_task(struct task_queue *tq,
+	      int (*fct)(struct task_queue *tq, void *task),
+	      void *task)
+{
+	struct job_list *job_list;
+
+	job_list = xmalloc(sizeof(*job_list));
+	job_list->task = task;
+	job_list->fct = fct;
+	job_list->next = NULL;
+
+	sem_wait(&tq->freecount);
+	sem_wait(&tq->mutex);
+
+	if (!tq->last) {
+		tq->last = job_list;
+		tq->first = tq->last;
+	} else {
+		tq->last->next = job_list;
+		tq->last = tq->last->next;
+	}
+
+	sem_post(&tq->workingcount);
+	sem_post(&tq->mutex);
+}
+
+int finish_task_queue(struct task_queue *tq, void (*fct)(struct task_queue *tq))
+{
+	int ret;
+	int i;
+
+	tq->finish_function = fct;
+
+	for (i = 0; i < tq->max_threads; i++)
+		add_task(tq, NULL, NULL);
+
+	for (i = 0; i < tq->max_threads; i++)
+		pthread_join(tq->threads[i], 0);
+
+	sem_destroy(&tq->mutex);
+	sem_destroy(&tq->workingcount);
+	sem_destroy(&tq->freecount);
+
+	if (tq->first)
+		die("BUG: internal error with queuing jobs for threads");
+
+	free(tq->threads);
+	ret = tq->early_return;
+
+	free(tq);
+	return ret;
+}
+#else /* NO_PTHREADS */
+
+struct task_queue {
+	int early_return;
+};
+
+struct task_queue *create_task_queue(unsigned max_threads)
+{
+	struct task_queue *tq = xmalloc(sizeof(*tq));
+
+	tq->early_return = 0;
+}
+
+void add_task(struct task_queue *tq,
+	      int (*fct)(struct task_queue *tq, void *task),
+	      void *task)
+{
+	if (tq->early_return)
+		return;
+
+	tq->early_return |= fct(tq, task);
+}
+
+int finish_task_queue(struct task_queue *tq, void (*fct)(struct task_queue *tq))
+{
+	int ret = tq->early_return;
+	free(tq);
+	return ret;
+}
+#endif
diff --git a/thread-utils.h b/thread-utils.h
index d9a769d..977d37b 100644
--- a/thread-utils.h
+++ b/thread-utils.h
@@ -7,9 +7,49 @@
 extern int online_cpus(void);
 extern int init_recursive_mutex(pthread_mutex_t*);
 
+#include <pthread.h>
+#include <semaphore.h>
+#include <stdio.h>
+#include <unistd.h>
+
 #else
 
 #define online_cpus() 1
 
 #endif
+
+/*
+ * Creates a struct `task_queue`, which holds a list of tasks. Up to
+ * `max_threads` threads are active to process the enqueued tasks
+ * processing the tasks in a first in first out order.
+ *
+ * If `max_threads` is zero the number of cores available will be used.
+ *
+ * Currently this only works in environments with pthreads, in other
+ * environments, the task will be processed sequentially in `add_task`.
+ */
+struct task_queue *create_task_queue(unsigned max_threads);
+
+/*
+ * The function and data are put into the task queue.
+ *
+ * The function `fct` must not be NULL, as that's used internally
+ * in `finish_task_queue` to signal shutdown. If the return code
+ * of `fct` is unequal to 0, the tasks will stop eventually,
+ * the current parallel tasks will be flushed out.
+ */
+void add_task(struct task_queue *tq,
+	      int (*fct)(struct task_queue *tq, void *task),
+	      void *task);
+
+/*
+ * Waits for all tasks to be done and frees the object. The return code
+ * is zero if all enqueued tasks were processed.
+ *
+ * The function `fct` is called once in each thread after the last task
+ * for that thread was processed. If no thread local cleanup needs to be
+ * performed, pass NULL.
+ */
+int finish_task_queue(struct task_queue *tq, void (*fct)(struct task_queue *tq));
+
 #endif /* THREAD_COMPAT_H */
-- 
2.5.0.400.gff86faf

^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PATCH 3/5] submodule: helper to run foreach in parallel
  2015-08-25 17:28 [RFC PATCH 0/5] Demonstrate new parallel threading API Stefan Beller
  2015-08-25 17:28 ` [PATCH 1/5] FIXUP submodule: implement `module_clone` as a builtin helper Stefan Beller
  2015-08-25 17:28 ` [PATCH 2/5] thread-utils: add a threaded task queue Stefan Beller
@ 2015-08-25 17:28 ` Stefan Beller
  2015-08-25 21:09   ` Junio C Hamano
  2015-08-26 17:06   ` Jeff King
  2015-08-25 17:28 ` [PATCH 4/5] index-pack: Use the new worker pool Stefan Beller
  2015-08-25 17:28 ` [PATCH 5/5] pack-objects: Use " Stefan Beller
  4 siblings, 2 replies; 20+ messages in thread
From: Stefan Beller @ 2015-08-25 17:28 UTC (permalink / raw)
  To: peff; +Cc: git, jrnieder, gitster, Stefan Beller

This runs a command on each submodule in parallel and should eventually
replace `git submodule foreach`.

There is a new option -j/--jobs (inspired by make) to specify the number
of parallel threads.

The jobs=1 case needs to be special cases to exactly replicate the current
default behavior of `git submodule foreach` such as working stdin input.
For more than one job there is no input possible and the output of both
stdout/stderr of the command are put into the stderr in an ordered fashion,
i.e. the tasks to not intermingle their output in races.

what currently works:
 git submodule--helper foreach "echo \$toplevel-\$name-\$path-\$sha1"
which I took for testing during development from t7407.

Signed-off-by: Stefan Beller <sbeller@google.com>
---
 builtin/submodule--helper.c | 148 +++++++++++++++++++++++++++++++++++++++++++-
 git-submodule.sh            |   9 +++
 2 files changed, 155 insertions(+), 2 deletions(-)

diff --git a/builtin/submodule--helper.c b/builtin/submodule--helper.c
index 7e298b4..adfa0e4 100644
--- a/builtin/submodule--helper.c
+++ b/builtin/submodule--helper.c
@@ -8,8 +8,11 @@
 #include "submodule.h"
 #include "submodule-config.h"
 #include "string-list.h"
+#include "thread-utils.h"
 #include "run-command.h"
-
+#ifndef NO_PTHREADS
+#include <semaphore.h>
+#endif
 static const struct cache_entry **ce_entries;
 static int ce_alloc, ce_used;
 static const char *alternative_path;
@@ -278,6 +281,144 @@ static int module_clone(int argc, const char **argv, const char *prefix)
 	return 0;
 }
 
+struct submodule_args {
+	const char *name;
+	const char *path;
+	const char *sha1;
+	const char *toplevel;
+	const char *prefix;
+	const char **cmd;
+	struct submodule_output *out;
+	sem_t *mutex;
+};
+
+int run_cmd_submodule(struct task_queue *aq, void *task)
+{
+	int i;
+	struct submodule_args *args = task;
+	struct strbuf out = STRBUF_INIT;
+	struct strbuf sb = STRBUF_INIT;
+	struct child_process *cp = xmalloc(sizeof(*cp));
+	char buf[1024];
+
+	strbuf_addf(&out, N_("Entering %s\n"), relative_path(args->path, args->prefix, &sb));
+
+	child_process_init(cp);
+	argv_array_pushv(&cp->args, args->cmd);
+
+	argv_array_pushf(&cp->env_array, "name=%s", args->name);
+	argv_array_pushf(&cp->env_array, "path=%s", args->path);
+	argv_array_pushf(&cp->env_array, "sha1=%s", args->sha1);
+	argv_array_pushf(&cp->env_array, "toplevel=%s", args->toplevel);
+
+	for (i = 0; local_repo_env[i]; i++)
+		argv_array_push(&cp->env_array, local_repo_env[i]);
+
+	cp->no_stdin = 1;
+	cp->out = 0;
+	cp->err = -1;
+	cp->dir = args->path;
+	cp->stdout_to_stderr = 1;
+	cp->use_shell = 1;
+
+	if (start_command(cp)) {
+		die("Could not start command");
+		for (i = 0; cp->args.argv; i++)
+			fprintf(stderr, "%s\n", cp->args.argv[i]);
+	}
+
+	while (1) {
+		ssize_t len = xread(cp->err, buf, sizeof(buf));
+		if (len < 0)
+			die("Read from child failed");
+		else if (len == 0)
+			break;
+		else {
+			strbuf_add(&out, buf, len);
+		}
+	}
+	if (finish_command(cp))
+		die("command died with error");
+
+	sem_wait(args->mutex);
+	fputs(out.buf, stderr);
+	sem_post(args->mutex);
+
+	return 0;
+}
+
+int module_foreach_parallel(int argc, const char **argv, const char *prefix)
+{
+	int i, recursive = 0, number_threads = 1, quiet = 0;
+	static struct pathspec pathspec;
+	struct strbuf sb = STRBUF_INIT;
+	struct task_queue *aq;
+	char **cmd;
+	const char **nullargv = {NULL};
+	sem_t *mutex = xmalloc(sizeof(*mutex));
+
+	struct option module_update_options[] = {
+		OPT_STRING(0, "prefix", &alternative_path,
+			   N_("path"),
+			   N_("alternative anchor for relative paths")),
+		OPT_STRING(0, "cmd", &cmd,
+			   N_("string"),
+			   N_("command to run")),
+		OPT_BOOL('r', "--recursive", &recursive,
+			 N_("Recurse into nexted submodules")),
+		OPT_INTEGER('j', "jobs", &number_threads,
+			    N_("Recurse into nexted submodules")),
+		OPT__QUIET(&quiet, N_("Suppress output")),
+		OPT_END()
+	};
+
+	static const char * const git_submodule_helper_usage[] = {
+		N_("git submodule--helper foreach [--prefix=<path>] [<path>...]"),
+		NULL
+	};
+
+	argc = parse_options(argc, argv, prefix, module_update_options,
+			     git_submodule_helper_usage, 0);
+
+	if (module_list_compute(0, nullargv, NULL, &pathspec) < 0)
+		return 1;
+
+	gitmodules_config();
+
+	aq = create_task_queue(number_threads);
+
+	for (i = 0; i < ce_used; i++) {
+		const struct submodule *sub;
+		const struct cache_entry *ce = ce_entries[i];
+		struct submodule_args *args = malloc(sizeof(*args));
+
+		if (ce_stage(ce))
+			args->sha1 = xstrdup(sha1_to_hex(null_sha1));
+		else
+			args->sha1 = xstrdup(sha1_to_hex(ce->sha1));
+
+		strbuf_reset(&sb);
+		strbuf_addf(&sb, "%s/.git", ce->name);
+		if (!file_exists(sb.buf))
+			continue;
+
+		args->path = ce->name;
+		sub = submodule_from_path(null_sha1, args->path);
+		if (!sub)
+			die("No submodule mapping found in .gitmodules for path '%s'", args->path);
+
+		args->name = sub->name;
+		args->toplevel = xstrdup(xgetcwd());
+		args->cmd = argv;
+		args->mutex = mutex;
+		args->prefix = alternative_path;
+		add_task(aq, run_cmd_submodule, args);
+	}
+
+	finish_task_queue(aq, NULL);
+	return 0;
+}
+
 int cmd_submodule__helper(int argc, const char **argv, const char *prefix)
 {
 	if (argc < 2)
@@ -292,6 +433,9 @@ int cmd_submodule__helper(int argc, const char **argv, const char *prefix)
 	if (!strcmp(argv[1], "module_clone"))
 		return module_clone(argc - 1, argv + 1, prefix);
 
+	if (!strcmp(argv[1], "foreach"))
+		return module_foreach_parallel(argc - 1, argv + 1, prefix);
+
 usage:
-	usage("git submodule--helper [module_list module_name module_clone]\n");
+	usage("git submodule--helper [module_list module_name module_clone foreach]\n");
 }
diff --git a/git-submodule.sh b/git-submodule.sh
index fb5155e..fa18434 100755
--- a/git-submodule.sh
+++ b/git-submodule.sh
@@ -431,6 +431,15 @@ cmd_foreach()
 }
 
 #
+# Execute an arbitrary command sequence in each checked out
+# submodule in parallel.
+#
+cmd_foreach_parallel()
+{
+	git submodule--helper module_foreach_parallel --prefix "$wt_prefix" $@
+}
+
+#
 # Register submodules in .git/config
 #
 # $@ = requested paths (default to all)
-- 
2.5.0.400.gff86faf

^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PATCH 4/5] index-pack: Use the new worker pool
  2015-08-25 17:28 [RFC PATCH 0/5] Demonstrate new parallel threading API Stefan Beller
                   ` (2 preceding siblings ...)
  2015-08-25 17:28 ` [PATCH 3/5] submodule: helper to run foreach in parallel Stefan Beller
@ 2015-08-25 17:28 ` Stefan Beller
  2015-08-25 19:03   ` Jeff King
  2015-08-25 17:28 ` [PATCH 5/5] pack-objects: Use " Stefan Beller
  4 siblings, 1 reply; 20+ messages in thread
From: Stefan Beller @ 2015-08-25 17:28 UTC (permalink / raw)
  To: peff; +Cc: git, jrnieder, gitster, Stefan Beller

By treating each object as its own task the workflow is easier to follow
as the function used in the worker threads doesn't need any control logic
any more.

Signed-off-by: Stefan Beller <sbeller@google.com>
---
 builtin/index-pack.c | 71 +++++++++++++++++++++++-----------------------------
 1 file changed, 32 insertions(+), 39 deletions(-)

diff --git a/builtin/index-pack.c b/builtin/index-pack.c
index 3f10840..826bd22 100644
--- a/builtin/index-pack.c
+++ b/builtin/index-pack.c
@@ -11,6 +11,7 @@
 #include "exec_cmd.h"
 #include "streaming.h"
 #include "thread-utils.h"
+#include "run-command.h"
 
 static const char index_pack_usage[] =
 "git index-pack [-v] [-o <index-file>] [--keep | --keep=<msg>] [--verify] [--strict] (<pack-file> | --stdin [--fix-thin] [<pack-file>])";
@@ -95,7 +96,6 @@ static const char *curr_pack;
 #ifndef NO_PTHREADS
 
 static struct thread_local *thread_data;
-static int nr_dispatched;
 static int threads_active;
 
 static pthread_mutex_t read_mutex;
@@ -106,10 +106,6 @@ static pthread_mutex_t counter_mutex;
 #define counter_lock()		lock_mutex(&counter_mutex)
 #define counter_unlock()	unlock_mutex(&counter_mutex)
 
-static pthread_mutex_t work_mutex;
-#define work_lock()		lock_mutex(&work_mutex)
-#define work_unlock()		unlock_mutex(&work_mutex)
-
 static pthread_mutex_t deepest_delta_mutex;
 #define deepest_delta_lock()	lock_mutex(&deepest_delta_mutex)
 #define deepest_delta_unlock()	unlock_mutex(&deepest_delta_mutex)
@@ -140,7 +136,6 @@ static void init_thread(void)
 	int i;
 	init_recursive_mutex(&read_mutex);
 	pthread_mutex_init(&counter_mutex, NULL);
-	pthread_mutex_init(&work_mutex, NULL);
 	pthread_mutex_init(&type_cas_mutex, NULL);
 	if (show_stat)
 		pthread_mutex_init(&deepest_delta_mutex, NULL);
@@ -163,7 +158,6 @@ static void cleanup_thread(void)
 	threads_active = 0;
 	pthread_mutex_destroy(&read_mutex);
 	pthread_mutex_destroy(&counter_mutex);
-	pthread_mutex_destroy(&work_mutex);
 	pthread_mutex_destroy(&type_cas_mutex);
 	if (show_stat)
 		pthread_mutex_destroy(&deepest_delta_mutex);
@@ -181,9 +175,6 @@ static void cleanup_thread(void)
 #define counter_lock()
 #define counter_unlock()
 
-#define work_lock()
-#define work_unlock()
-
 #define deepest_delta_lock()
 #define deepest_delta_unlock()
 
@@ -1075,28 +1066,29 @@ static void resolve_base(struct object_entry *obj)
 }
 
 #ifndef NO_PTHREADS
-static void *threaded_second_pass(void *data)
+static int threaded_second_pass(struct task_queue *tq, void *data)
 {
-	set_thread_data(data);
-	for (;;) {
-		int i;
-		counter_lock();
-		display_progress(progress, nr_resolved_deltas);
-		counter_unlock();
-		work_lock();
-		while (nr_dispatched < nr_objects &&
-		       is_delta_type(objects[nr_dispatched].type))
-			nr_dispatched++;
-		if (nr_dispatched >= nr_objects) {
-			work_unlock();
-			break;
-		}
-		i = nr_dispatched++;
-		work_unlock();
+	if (!get_thread_data()) {
+		struct thread_local *t = xmalloc(sizeof(*t));
+		t->pack_fd = open(curr_pack, O_RDONLY);
+		if (t->pack_fd == -1)
+			die_errno(_("unable to open %s"), curr_pack);
 
-		resolve_base(&objects[i]);
+		set_thread_data(t);
 	}
-	return NULL;
+
+	resolve_base(data);
+
+	counter_lock();
+	display_progress(progress, nr_resolved_deltas);
+	counter_unlock();
+	return 0;
+}
+
+void cleanup_threaded_second_pass(struct task_queue *aq)
+{
+	struct thread_local *t = get_thread_data();
+	free(t);
 }
 #endif
 
@@ -1195,18 +1187,19 @@ static void resolve_deltas(void)
 					  nr_ref_deltas + nr_ofs_deltas);
 
 #ifndef NO_PTHREADS
-	nr_dispatched = 0;
 	if (nr_threads > 1 || getenv("GIT_FORCE_THREADS")) {
+		struct task_queue *tq;
+
 		init_thread();
-		for (i = 0; i < nr_threads; i++) {
-			int ret = pthread_create(&thread_data[i].thread, NULL,
-						 threaded_second_pass, thread_data + i);
-			if (ret)
-				die(_("unable to create thread: %s"),
-				    strerror(ret));
-		}
-		for (i = 0; i < nr_threads; i++)
-			pthread_join(thread_data[i].thread, NULL);
+		tq = create_task_queue(nr_threads);
+
+		for (i = 0; i < nr_objects; i++)
+			if (!is_delta_type(objects[i].type))
+				add_task(tq, threaded_second_pass, &objects[i]);
+
+		if (finish_task_queue(tq, cleanup_threaded_second_pass))
+			die("Not all threads have finished");
+
 		cleanup_thread();
 		return;
 	}
-- 
2.5.0.400.gff86faf

^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PATCH 5/5] pack-objects: Use new worker pool
  2015-08-25 17:28 [RFC PATCH 0/5] Demonstrate new parallel threading API Stefan Beller
                   ` (3 preceding siblings ...)
  2015-08-25 17:28 ` [PATCH 4/5] index-pack: Use the new worker pool Stefan Beller
@ 2015-08-25 17:28 ` Stefan Beller
  4 siblings, 0 replies; 20+ messages in thread
From: Stefan Beller @ 2015-08-25 17:28 UTC (permalink / raw)
  To: peff; +Cc: git, jrnieder, gitster, Stefan Beller

Before we had <n> threads doing the delta finding work, and the main thread
was load balancing the threads, i.e. moving work from a thread with a large
amount left to an idle thread whenever such a situation arose.

This moves the load balancing to the threads themselves. As soon as one
thread is done working it will look at its peer threads and will pickup
half the work load from the thread with the largest pending load.

By having the load balancing as part of the threads, the locking and
communication model becomes easier, such that we don't need so many
mutexes any more. It also demonstrates the usage of the new threading
pool being useful.

Signed-off-by: Stefan Beller <sbeller@google.com>
---
 builtin/pack-objects.c | 175 ++++++++++++++++---------------------------------
 1 file changed, 57 insertions(+), 118 deletions(-)

diff --git a/builtin/pack-objects.c b/builtin/pack-objects.c
index 62cc16d..f46d2df 100644
--- a/builtin/pack-objects.c
+++ b/builtin/pack-objects.c
@@ -17,6 +17,7 @@
 #include "pack-objects.h"
 #include "progress.h"
 #include "refs.h"
+#include "run-command.h"
 #include "streaming.h"
 #include "thread-utils.h"
 #include "pack-bitmap.h"
@@ -1887,26 +1888,12 @@ static void try_to_free_from_threads(size_t size)
 
 static try_to_free_t old_try_to_free_routine;
 
-/*
- * The main thread waits on the condition that (at least) one of the workers
- * has stopped working (which is indicated in the .working member of
- * struct thread_params).
- * When a work thread has completed its work, it sets .working to 0 and
- * signals the main thread and waits on the condition that .data_ready
- * becomes 1.
- */
-
 struct thread_params {
-	pthread_t thread;
 	struct object_entry **list;
 	unsigned list_size;
 	unsigned remaining;
 	int window;
 	int depth;
-	int working;
-	int data_ready;
-	pthread_mutex_t mutex;
-	pthread_cond_t cond;
 	unsigned *processed;
 };
 
@@ -1933,7 +1920,52 @@ static void cleanup_threaded_search(void)
 	pthread_mutex_destroy(&progress_mutex);
 }
 
-static void *threaded_find_deltas(void *arg)
+static struct thread_params *p;
+
+static void threaded_split_largest_workload(struct thread_params *target)
+{
+	int i;
+
+	struct object_entry **list;
+	struct thread_params *victim = NULL;
+	unsigned sub_size = 0;
+
+	/* Find a victim */
+	progress_lock();
+	for (i = 0; i < delta_search_threads; i++)
+		if (p[i].remaining > 2*window &&
+		    (!victim || victim->remaining < p[i].remaining))
+			victim = &p[i];
+
+	if (victim) {
+		sub_size = victim->remaining / 2;
+		list = victim->list + victim->list_size - sub_size;
+		while (sub_size && list[0]->hash &&
+		       list[0]->hash == list[-1]->hash) {
+			list++;
+			sub_size--;
+		}
+		if (!sub_size) {
+			/*
+			 * It is possible for some "paths" to have
+			 * so many objects that no hash boundary
+			 * might be found.  Let's just steal the
+			 * exact half in that case.
+			 */
+			sub_size = victim->remaining / 2;
+			list -= sub_size;
+		}
+		victim->list_size -= sub_size;
+		victim->remaining -= sub_size;
+
+		target->list = list;
+		target->list_size = sub_size;
+		target->remaining = sub_size;
+	}
+	progress_unlock();
+}
+
+static int threaded_find_deltas(struct task_queue *tq, void *arg)
 {
 	struct thread_params *me = arg;
 
@@ -1941,34 +1973,17 @@ static void *threaded_find_deltas(void *arg)
 		find_deltas(me->list, &me->remaining,
 			    me->window, me->depth, me->processed);
 
-		progress_lock();
-		me->working = 0;
-		pthread_cond_signal(&progress_cond);
-		progress_unlock();
-
-		/*
-		 * We must not set ->data_ready before we wait on the
-		 * condition because the main thread may have set it to 1
-		 * before we get here. In order to be sure that new
-		 * work is available if we see 1 in ->data_ready, it
-		 * was initialized to 0 before this thread was spawned
-		 * and we reset it to 0 right away.
-		 */
-		pthread_mutex_lock(&me->mutex);
-		while (!me->data_ready)
-			pthread_cond_wait(&me->cond, &me->mutex);
-		me->data_ready = 0;
-		pthread_mutex_unlock(&me->mutex);
+		threaded_split_largest_workload(me);
 	}
-	/* leave ->working 1 so that this doesn't get more work assigned */
-	return NULL;
+
+	return 0;
 }
 
 static void ll_find_deltas(struct object_entry **list, unsigned list_size,
 			   int window, int depth, unsigned *processed)
 {
-	struct thread_params *p;
-	int i, ret, active_threads = 0;
+	struct task_queue *tq;
+	int i;
 
 	init_threaded_search();
 
@@ -1980,8 +1995,11 @@ static void ll_find_deltas(struct object_entry **list, unsigned list_size,
 	if (progress > pack_to_stdout)
 		fprintf(stderr, "Delta compression using up to %d threads.\n",
 				delta_search_threads);
+
 	p = xcalloc(delta_search_threads, sizeof(*p));
 
+	tq = create_task_queue(delta_search_threads);
+
 	/* Partition the work amongst work threads. */
 	for (i = 0; i < delta_search_threads; i++) {
 		unsigned sub_size = list_size / (delta_search_threads - i);
@@ -1993,8 +2011,6 @@ static void ll_find_deltas(struct object_entry **list, unsigned list_size,
 		p[i].window = window;
 		p[i].depth = depth;
 		p[i].processed = processed;
-		p[i].working = 1;
-		p[i].data_ready = 0;
 
 		/* try to split chunks on "path" boundaries */
 		while (sub_size && sub_size < list_size &&
@@ -2008,87 +2024,10 @@ static void ll_find_deltas(struct object_entry **list, unsigned list_size,
 
 		list += sub_size;
 		list_size -= sub_size;
+		add_task(tq, threaded_find_deltas, &p[i]);
 	}
 
-	/* Start work threads. */
-	for (i = 0; i < delta_search_threads; i++) {
-		if (!p[i].list_size)
-			continue;
-		pthread_mutex_init(&p[i].mutex, NULL);
-		pthread_cond_init(&p[i].cond, NULL);
-		ret = pthread_create(&p[i].thread, NULL,
-				     threaded_find_deltas, &p[i]);
-		if (ret)
-			die("unable to create thread: %s", strerror(ret));
-		active_threads++;
-	}
-
-	/*
-	 * Now let's wait for work completion.  Each time a thread is done
-	 * with its work, we steal half of the remaining work from the
-	 * thread with the largest number of unprocessed objects and give
-	 * it to that newly idle thread.  This ensure good load balancing
-	 * until the remaining object list segments are simply too short
-	 * to be worth splitting anymore.
-	 */
-	while (active_threads) {
-		struct thread_params *target = NULL;
-		struct thread_params *victim = NULL;
-		unsigned sub_size = 0;
-
-		progress_lock();
-		for (;;) {
-			for (i = 0; !target && i < delta_search_threads; i++)
-				if (!p[i].working)
-					target = &p[i];
-			if (target)
-				break;
-			pthread_cond_wait(&progress_cond, &progress_mutex);
-		}
-
-		for (i = 0; i < delta_search_threads; i++)
-			if (p[i].remaining > 2*window &&
-			    (!victim || victim->remaining < p[i].remaining))
-				victim = &p[i];
-		if (victim) {
-			sub_size = victim->remaining / 2;
-			list = victim->list + victim->list_size - sub_size;
-			while (sub_size && list[0]->hash &&
-			       list[0]->hash == list[-1]->hash) {
-				list++;
-				sub_size--;
-			}
-			if (!sub_size) {
-				/*
-				 * It is possible for some "paths" to have
-				 * so many objects that no hash boundary
-				 * might be found.  Let's just steal the
-				 * exact half in that case.
-				 */
-				sub_size = victim->remaining / 2;
-				list -= sub_size;
-			}
-			target->list = list;
-			victim->list_size -= sub_size;
-			victim->remaining -= sub_size;
-		}
-		target->list_size = sub_size;
-		target->remaining = sub_size;
-		target->working = 1;
-		progress_unlock();
-
-		pthread_mutex_lock(&target->mutex);
-		target->data_ready = 1;
-		pthread_cond_signal(&target->cond);
-		pthread_mutex_unlock(&target->mutex);
-
-		if (!sub_size) {
-			pthread_join(target->thread, NULL);
-			pthread_cond_destroy(&target->cond);
-			pthread_mutex_destroy(&target->mutex);
-			active_threads--;
-		}
-	}
+	finish_task_queue(tq, NULL);
 	cleanup_threaded_search();
 	free(p);
 }
-- 
2.5.0.400.gff86faf

^ permalink raw reply related	[flat|nested] 20+ messages in thread

* Re: [PATCH 4/5] index-pack: Use the new worker pool
  2015-08-25 17:28 ` [PATCH 4/5] index-pack: Use the new worker pool Stefan Beller
@ 2015-08-25 19:03   ` Jeff King
  2015-08-25 19:23     ` Stefan Beller
  2015-08-25 20:41     ` Junio C Hamano
  0 siblings, 2 replies; 20+ messages in thread
From: Jeff King @ 2015-08-25 19:03 UTC (permalink / raw)
  To: Stefan Beller; +Cc: git, jrnieder, gitster

On Tue, Aug 25, 2015 at 10:28:25AM -0700, Stefan Beller wrote:

> By treating each object as its own task the workflow is easier to follow
> as the function used in the worker threads doesn't need any control logic
> any more.

Have you tried running t/perf/p5302 on this?

I seem to get a pretty consistent 2%-ish slowdown, both against git.git
and linux.git. That's not a lot, but I'm wondering if there is some
low-hanging fruit in the locking, or in the pattern of work being
dispatched. Or it may just be noise, but it seems fairly consistent.

-Peff

^ permalink raw reply	[flat|nested] 20+ messages in thread

* Re: [PATCH 4/5] index-pack: Use the new worker pool
  2015-08-25 19:03   ` Jeff King
@ 2015-08-25 19:23     ` Stefan Beller
  2015-08-25 20:41     ` Junio C Hamano
  1 sibling, 0 replies; 20+ messages in thread
From: Stefan Beller @ 2015-08-25 19:23 UTC (permalink / raw)
  To: Jeff King; +Cc: git, Jonathan Nieder, Junio C Hamano

On Tue, Aug 25, 2015 at 12:03 PM, Jeff King <peff@peff.net> wrote:
> On Tue, Aug 25, 2015 at 10:28:25AM -0700, Stefan Beller wrote:
>
>> By treating each object as its own task the workflow is easier to follow
>> as the function used in the worker threads doesn't need any control logic
>> any more.
>
> Have you tried running t/perf/p5302 on this?
>
> I seem to get a pretty consistent 2%-ish slowdown, both against git.git
> and linux.git. That's not a lot, but I'm wondering if there is some
> low-hanging fruit in the locking, or in the pattern of work being
> dispatched. Or it may just be noise, but it seems fairly consistent.

I did not run any perf tests, just the standard tests.

Maybe the progress display can be moved out to another thread,
such that it doesn't block the threads doing actual work.
Also the progress display can be done a bit more sloppy,
we are allowed to drop old events, if we have newer information.
[Though this reasoning is not looking at the threading pool code,
because ..um.. my code is perfect]

>
> -Peff

^ permalink raw reply	[flat|nested] 20+ messages in thread

* Re: [PATCH 4/5] index-pack: Use the new worker pool
  2015-08-25 19:03   ` Jeff King
  2015-08-25 19:23     ` Stefan Beller
@ 2015-08-25 20:41     ` Junio C Hamano
  2015-08-25 20:59       ` Stefan Beller
  1 sibling, 1 reply; 20+ messages in thread
From: Junio C Hamano @ 2015-08-25 20:41 UTC (permalink / raw)
  To: Jeff King; +Cc: Stefan Beller, git, jrnieder

Jeff King <peff@peff.net> writes:

> On Tue, Aug 25, 2015 at 10:28:25AM -0700, Stefan Beller wrote:
>
>> By treating each object as its own task the workflow is easier to follow
>> as the function used in the worker threads doesn't need any control logic
>> any more.
>
> Have you tried running t/perf/p5302 on this?
>
> I seem to get a pretty consistent 2%-ish slowdown, both against git.git
> and linux.git. That's not a lot, but I'm wondering if there is some
> low-hanging fruit in the locking, or in the pattern of work being
> dispatched. Or it may just be noise, but it seems fairly consistent.

The pattern of work dispatch hopefully is the same, no?  add_task()
does the "append at the end" thing and next_task() picks from the
front of the queue.  The original is "we have globally N things,
so far M things have been handled, and we want a new one, so we pick
the M+1th one and do it".

The amount of memory that is used to represent a single task may be
much larger than the original, with overhead coming from job_list
structure and the doubly-linked list.  We may not be able to spin up
30 threads and throw a million tasks at them using this, because of
the overhead.  It would be more suited to handle a pattern in which
an overlord actively creates new tasks while worker threads chew
them, using the add_task/dispatch as the medium for communication
between them.

^ permalink raw reply	[flat|nested] 20+ messages in thread

* Re: [PATCH 4/5] index-pack: Use the new worker pool
  2015-08-25 20:41     ` Junio C Hamano
@ 2015-08-25 20:59       ` Stefan Beller
  2015-08-25 21:12         ` Junio C Hamano
  0 siblings, 1 reply; 20+ messages in thread
From: Stefan Beller @ 2015-08-25 20:59 UTC (permalink / raw)
  To: Junio C Hamano; +Cc: Jeff King, git, Jonathan Nieder

On Tue, Aug 25, 2015 at 1:41 PM, Junio C Hamano <gitster@pobox.com> wrote:
> Jeff King <peff@peff.net> writes:
>
>> On Tue, Aug 25, 2015 at 10:28:25AM -0700, Stefan Beller wrote:
>>
>>> By treating each object as its own task the workflow is easier to follow
>>> as the function used in the worker threads doesn't need any control logic
>>> any more.
>>
>> Have you tried running t/perf/p5302 on this?
>>
>> I seem to get a pretty consistent 2%-ish slowdown, both against git.git
>> and linux.git. That's not a lot, but I'm wondering if there is some
>> low-hanging fruit in the locking, or in the pattern of work being
>> dispatched. Or it may just be noise, but it seems fairly consistent.
>
> The pattern of work dispatch hopefully is the same, no?  add_task()
> does the "append at the end" thing and next_task() picks from the
> front of the queue.  The original is "we have globally N things,
> so far M things have been handled, and we want a new one, so we pick
> the M+1th one and do it".
>
> The amount of memory that is used to represent a single task may be
> much larger than the original, with overhead coming from job_list
> structure and the doubly-linked list.  We may not be able to spin up
> 30 threads and throw a million tasks at them using this, because of
> the overhead.

I thought about making the add_task block after a certain size of job_list
such that there is always just enough tasks available to process.
Then we would not need to used a huge amount of memory for having
a long line of waiting tasks; the shorter the line is the less we can tolerate
burst behavior in the threads.

> It would be more suited to handle a pattern in which
> an overlord actively creates new tasks while worker threads chew
> them, using the add_task/dispatch as the medium for communication
> between them.

Not sure I follow there.

Original implementation:
We have M threads sitting around the table, all of them trying to obtain food
from the one bowl on the table and then eating it.
Once the bowl is all eaten, we can stop.

New pattern:
One cook puts all the food items on the sushi-go-round-belt with a fancy plate
and the threads grab them one by one still using locks (but they are internal to
the belt).

Are you saying we're content with just a bowl and everyone helps themselves
for getting food?

^ permalink raw reply	[flat|nested] 20+ messages in thread

* Re: [PATCH 3/5] submodule: helper to run foreach in parallel
  2015-08-25 17:28 ` [PATCH 3/5] submodule: helper to run foreach in parallel Stefan Beller
@ 2015-08-25 21:09   ` Junio C Hamano
  2015-08-25 21:42     ` Stefan Beller
  2015-08-26 17:06   ` Jeff King
  1 sibling, 1 reply; 20+ messages in thread
From: Junio C Hamano @ 2015-08-25 21:09 UTC (permalink / raw)
  To: Stefan Beller; +Cc: peff, git, jrnieder

Stefan Beller <sbeller@google.com> writes:

> This runs a command on each submodule in parallel and should eventually
> replace `git submodule foreach`.
>
> There is a new option -j/--jobs (inspired by make) to specify the number
> of parallel threads.
>
> The jobs=1 case needs to be special cases to exactly replicate the current
> default behavior of `git submodule foreach` such as working stdin input.

"git submodule foreach-parallel -j1" feels like a roundabout way to
say "git submodule foreach"; "I want you to go parallel.  Oh, not
really, I want you to do one thing at a time".

I do not think these two have to be equivalent---users who need the
traditional one-by-one "foreach" behaviour (including the standard
input and other aspects that are unreasonable to expect
"foreach-parallel -jN" to replicate) can and should choose to use
"foreach", not "foreach-parallel -j1".

In any case, I do not think I saw any special casing of -j1 case that
attempts to leave the standard input operational.  Did I miss
something, or is the above describing what is left to do?

> For more than one job there is no input possible and the output of both
> stdout/stderr of the command are put into the stderr in an ordered fashion,
> i.e. the tasks to not intermingle their output in races.

To rephrase what I said earlier, "for parallel version, the above
things happen, even with numthreads==1", is perfectly fine.

> +	cp->no_stdin = 1;
> +	cp->out = 0;
> +	cp->err = -1;
> +	cp->dir = args->path;
> +	cp->stdout_to_stderr = 1;

So the standard error and the standard output are mixed to a single
pipe ...

> +	cp->use_shell = 1;
> +
> +	if (start_command(cp)) {
> +		die("Could not start command");
> +		for (i = 0; cp->args.argv; i++)
> +			fprintf(stderr, "%s\n", cp->args.argv[i]);
> +	}
> +
> +	while (1) {
> +		ssize_t len = xread(cp->err, buf, sizeof(buf));
> +		if (len < 0)
> +			die("Read from child failed");
> +		else if (len == 0)
> +			break;
> +		else {
> +			strbuf_add(&out, buf, len);
> +		}

... and the whole thing is accumulated in core???

> +	}
> +	if (finish_command(cp))
> +		die("command died with error");
> +
> +	sem_wait(args->mutex);
> +	fputs(out.buf, stderr);
> +	sem_post(args->mutex);

... and emitted to standard error?

I would have expected that the standard error would be left alone
(i.e. letting warnings from multiple jobs to be mixed together
simply because everybody writes to the same file descriptor), while
the standard output would be line-buffered, perhaps captured by the
above loop and then emitted under mutex, or something.

I think I said this earlier, but latency to the first output counts
as an activity feedback mechanism.

> +	if (module_list_compute(0, nullargv, NULL, &pathspec) < 0)
> +		return 1;
> +
> +	gitmodules_config();
> +
> +	aq = create_task_queue(number_threads);
> +
> +	for (i = 0; i < ce_used; i++) {
> +		const struct submodule *sub;
> +		const struct cache_entry *ce = ce_entries[i];
> +		struct submodule_args *args = malloc(sizeof(*args));
> +
> +		if (ce_stage(ce))
> +			args->sha1 = xstrdup(sha1_to_hex(null_sha1));
> +		else
> +			args->sha1 = xstrdup(sha1_to_hex(ce->sha1));
> +
> +		strbuf_reset(&sb);
> +		strbuf_addf(&sb, "%s/.git", ce->name);
> +		if (!file_exists(sb.buf))
> +			continue;
> +
> +		args->path = ce->name;
> +		sub = submodule_from_path(null_sha1, args->path);
> +		if (!sub)
> +			die("No submodule mapping found in .gitmodules for path '%s'", args->path);
> +
> +		args->name = sub->name;
> +		args->toplevel = xstrdup(xgetcwd());
> +		args->cmd = argv;
> +		args->mutex = mutex;
> +		args->prefix = alternative_path;
> +		add_task(aq, run_cmd_submodule, args);
> +	}
> +
> +	finish_task_queue(aq, NULL);

This is very nice.  Declare a task queue with N workers, throw bunch
of task to it and then wait for all of them to complete.  Things
can't be simpler than that ;-).  One thing that other callers of the
mechanism might want may be to plug and unplug the task queue, but
that can wait until the need arises.

^ permalink raw reply	[flat|nested] 20+ messages in thread

* Re: [PATCH 4/5] index-pack: Use the new worker pool
  2015-08-25 20:59       ` Stefan Beller
@ 2015-08-25 21:12         ` Junio C Hamano
  2015-08-25 22:39           ` Stefan Beller
  0 siblings, 1 reply; 20+ messages in thread
From: Junio C Hamano @ 2015-08-25 21:12 UTC (permalink / raw)
  To: Stefan Beller; +Cc: Jeff King, git, Jonathan Nieder

Stefan Beller <sbeller@google.com> writes:

> Not sure I follow there.
>
> Original implementation:
> We have M threads sitting around the table, all of them trying to obtain food
> from the one bowl on the table and then eating it.
> Once the bowl is all eaten, we can stop.
>
> New pattern:
> One cook puts all the food items on the sushi-go-round-belt with a fancy plate
> and the threads grab them one by one still using locks (but they are internal to
> the belt).
>
> Are you saying we're content with just a bowl and everyone helps themselves
> for getting food?

No.  I am questioning how big overhead is for having the
go-round-belt that must hold all dishes to be eaten, which did not
exist in the original arrangement.

^ permalink raw reply	[flat|nested] 20+ messages in thread

* Re: [PATCH 3/5] submodule: helper to run foreach in parallel
  2015-08-25 21:09   ` Junio C Hamano
@ 2015-08-25 21:42     ` Stefan Beller
  2015-08-25 22:23       ` Junio C Hamano
  0 siblings, 1 reply; 20+ messages in thread
From: Stefan Beller @ 2015-08-25 21:42 UTC (permalink / raw)
  To: Junio C Hamano; +Cc: Jeff King, git, Jonathan Nieder

On Tue, Aug 25, 2015 at 2:09 PM, Junio C Hamano <gitster@pobox.com> wrote:
> Stefan Beller <sbeller@google.com> writes:
>
>> This runs a command on each submodule in parallel and should eventually
>> replace `git submodule foreach`.
>>
>> There is a new option -j/--jobs (inspired by make) to specify the number
>> of parallel threads.
>>
>> The jobs=1 case needs to be special cases to exactly replicate the current
>> default behavior of `git submodule foreach` such as working stdin input.
>
> "git submodule foreach-parallel -j1" feels like a roundabout way to
> say "git submodule foreach"; "I want you to go parallel.  Oh, not
> really, I want you to do one thing at a time".

Eventually I want to drop the -parallel, such that git foreach ... will
just work as it always had, and if there is a --jobs argument we
may want to change the semantics a bit to make it work with the
piping to stdout/err.

>
> I do not think these two have to be equivalent---users who need the
> traditional one-by-one "foreach" behaviour (including the standard
> input and other aspects that are unreasonable to expect
> "foreach-parallel -jN" to replicate) can and should choose to use
> "foreach", not "foreach-parallel -j1".
>
> In any case, I do not think I saw any special casing of -j1 case that
> attempts to leave the standard input operational.  Did I miss
> something, or is the above describing what is left to do?

It is the later, I am not quite confident yet about this very patch,
but I was rather waiting for feedback what people deem is important.

>
>> For more than one job there is no input possible and the output of both
>> stdout/stderr of the command are put into the stderr in an ordered fashion,
>> i.e. the tasks to not intermingle their output in races.
>
> To rephrase what I said earlier, "for parallel version, the above
> things happen, even with numthreads==1", is perfectly fine.

>
>> +     cp->no_stdin = 1;
>> +     cp->out = 0;
>> +     cp->err = -1;
>> +     cp->dir = args->path;
>> +     cp->stdout_to_stderr = 1;
>
> So the standard error and the standard output are mixed to a single
> pipe ...

I was very focused on fetch, which would report progress and
information to stderr only.

>
>> +     cp->use_shell = 1;
>> +
>> +     if (start_command(cp)) {
>> +             die("Could not start command");
>> +             for (i = 0; cp->args.argv; i++)
>> +                     fprintf(stderr, "%s\n", cp->args.argv[i]);
>> +     }
>> +
>> +     while (1) {
>> +             ssize_t len = xread(cp->err, buf, sizeof(buf));
>> +             if (len < 0)
>> +                     die("Read from child failed");
>> +             else if (len == 0)
>> +                     break;
>> +             else {
>> +                     strbuf_add(&out, buf, len);
>> +             }
>
> ... and the whole thing is accumulated in core???

The pipes have a limit, so we need to empty them to prevent back-pressure?
And because we want to have the output of one task at a time, we need to
save it up until we can put out the whole output, no?

>
>> +     }
>> +     if (finish_command(cp))
>> +             die("command died with error");
>> +
>> +     sem_wait(args->mutex);
>> +     fputs(out.buf, stderr);
>> +     sem_post(args->mutex);
>
> ... and emitted to standard error?
>
> I would have expected that the standard error would be left alone

`git fetch` which may be a good candidate for such an operation
provides progress on stderr, and we don't want to intermingle
2 different submodule fetch progress displays
("I need to work offline for a bit, so let me get all of the latest stuff,
so I'll run `git submodule foreach -j 16 -- git fetch --all" though ideally
we want to have `git fetch --recurse-submodules -j16` instead )

> (i.e. letting warnings from multiple jobs to be mixed together
> simply because everybody writes to the same file descriptor), while
> the standard output would be line-buffered, perhaps captured by the
> above loop and then emitted under mutex, or something.

>
> I think I said this earlier, but latency to the first output counts

"to the first stderr"
in this case?

So you would want one channel (stderr) for a fast reporting possibility
and another channel (stdout) for a well ordered output mode to cover
both options. And the command to be run must adhere to the our
selection of stderr for fast reporting and stdout for output to wait on.

> as an activity feedback mechanism.
>
>> +     if (module_list_compute(0, nullargv, NULL, &pathspec) < 0)
>> +             return 1;
>> +
>> +     gitmodules_config();
>> +
>> +     aq = create_task_queue(number_threads);
>> +
>> +     for (i = 0; i < ce_used; i++) {
>> +             const struct submodule *sub;
>> +             const struct cache_entry *ce = ce_entries[i];
>> +             struct submodule_args *args = malloc(sizeof(*args));
>> +
>> +             if (ce_stage(ce))
>> +                     args->sha1 = xstrdup(sha1_to_hex(null_sha1));
>> +             else
>> +                     args->sha1 = xstrdup(sha1_to_hex(ce->sha1));
>> +
>> +             strbuf_reset(&sb);
>> +             strbuf_addf(&sb, "%s/.git", ce->name);
>> +             if (!file_exists(sb.buf))
>> +                     continue;
>> +
>> +             args->path = ce->name;
>> +             sub = submodule_from_path(null_sha1, args->path);
>> +             if (!sub)
>> +                     die("No submodule mapping found in .gitmodules for path '%s'", args->path);
>> +
>> +             args->name = sub->name;
>> +             args->toplevel = xstrdup(xgetcwd());
>> +             args->cmd = argv;
>> +             args->mutex = mutex;
>> +             args->prefix = alternative_path;
>> +             add_task(aq, run_cmd_submodule, args);
>> +     }
>> +
>> +     finish_task_queue(aq, NULL);
>
> This is very nice.  Declare a task queue with N workers, throw bunch
> of task to it and then wait for all of them to complete.  Things
> can't be simpler than that ;-).  One thing that other callers of the
> mechanism might want may be to plug and unplug the task queue, but
> that can wait until the need arises.

I think a queue is the simplest thing at the moment. Eventually we
may want to have a way to specify a DAG of workers (e.g. 2 tasks reading from
files to a buffer, 4 tasks to preprocess the output from the first 2 tasks, and
then 16 tasks to do heavy workload processing, and just one thread doing the
stdout/err handling).

In the current patches the task queue is passed around and every worker thread
has access to it, in case it wants to start new tasks itself, so it
may be doable.

Why would we want to unplug the task queue from somewhere else?

^ permalink raw reply	[flat|nested] 20+ messages in thread

* Re: [PATCH 3/5] submodule: helper to run foreach in parallel
  2015-08-25 21:42     ` Stefan Beller
@ 2015-08-25 22:23       ` Junio C Hamano
  2015-08-25 22:44         ` Junio C Hamano
  0 siblings, 1 reply; 20+ messages in thread
From: Junio C Hamano @ 2015-08-25 22:23 UTC (permalink / raw)
  To: Stefan Beller; +Cc: Jeff King, git, Jonathan Nieder

Stefan Beller <sbeller@google.com> writes:

>>> +     while (1) {
>>> +             ssize_t len = xread(cp->err, buf, sizeof(buf));
>>> +             if (len < 0)
>>> +                     die("Read from child failed");
>>> +             else if (len == 0)
>>> +                     break;
>>> +             else {
>>> +                     strbuf_add(&out, buf, len);
>>> +             }
>>
>> ... and the whole thing is accumulated in core???
>
> The pipes have a limit, so we need to empty them to prevent back-pressure?

Of course.  But that does not lead to "we hold everything in core".
This side could choose to emit (under protection of args->mutex)
early, e.g. after reading a line, emit it to our standard output (or
our standard error).

> And because we want to have the output of one task at a time, we need to
> save it up until we can put out the whole output, no?

I do not necessarily agree, and I think I said that already:

  http://thread.gmane.org/gmane.comp.version-control.git/276273/focus=276321

>>> +     }
>>> +     if (finish_command(cp))
>>> +             die("command died with error");
>>> +
>>> +     sem_wait(args->mutex);
>>> +     fputs(out.buf, stderr);
>>> +     sem_post(args->mutex);
>>
>> ... and emitted to standard error?
>>
>> I would have expected that the standard error would be left alone
>
> `git fetch` which may be a good candidate for such an operation
> provides progress on stderr, and we don't want to intermingle
> 2 different submodule fetch progress displays
> ("I need to work offline for a bit, so let me get all of the latest stuff,
> so I'll run `git submodule foreach -j 16 -- git fetch --all" though ideally
> we want to have `git fetch --recurse-submodules -j16` instead )
>
>> (i.e. letting warnings from multiple jobs to be mixed together
>> simply because everybody writes to the same file descriptor), while
>> the standard output would be line-buffered, perhaps captured by the
>> above loop and then emitted under mutex, or something.
>
>>
>> I think I said this earlier, but latency to the first output counts
>
> "to the first stderr"
> in this case?

I didn't mean "output==the standard output stream".  As I said in
$gmane/276321, an early output, as an indication that we are doing
something, is important.

> Why would we want to unplug the task queue from somewhere else?

When you have a dispatcher more intelligent than a stupid FIFO, I
would imagine that you would want to be able to do this pattern,
especially when coming up with a task (not performing a task) takes
non-trivial amount of work:

	prepare task queue and have N threads waiting on it;

	plug the queue, i.e. tell threads that do not start picking
	tasks out of it yet;

	large enough loop to fill the queue to a reasonable size
	while keeping the threads waiting;

	unplug the queue.  Now the threads can pick tasks from the
	queue, but they have many to choose from, and a dispatcher
	can do better than simple FIFO can take advantage of it;

	keep filling the queue with more tasks, if necessary;

        and finally, wait for everything to finish.

Without "plug/unplug" interface, you _could_ do the above by doing
something stupid like

	prepare a task queue and have N threads waiting on it;

	loop to find enough number of tasks but do not put them to
	task queue, as FIFO will eat them one-by-one; instead hold
	onto them in a custom data structure that is outside the
	task queue system;

	tight and quick loop to move them to the task queue;

	keep finding more tasks and feed them to the task queue;

        and finally, wait for everything to finish.

^ permalink raw reply	[flat|nested] 20+ messages in thread

* Re: [PATCH 4/5] index-pack: Use the new worker pool
  2015-08-25 21:12         ` Junio C Hamano
@ 2015-08-25 22:39           ` Stefan Beller
  2015-08-25 22:50             ` Junio C Hamano
  0 siblings, 1 reply; 20+ messages in thread
From: Stefan Beller @ 2015-08-25 22:39 UTC (permalink / raw)
  To: Junio C Hamano; +Cc: Jeff King, git, Jonathan Nieder

On Tue, Aug 25, 2015 at 2:12 PM, Junio C Hamano <gitster@pobox.com> wrote:
> Stefan Beller <sbeller@google.com> writes:
>
>> Not sure I follow there.
>>
>> Original implementation:
>> We have M threads sitting around the table, all of them trying to obtain food
>> from the one bowl on the table and then eating it.
>> Once the bowl is all eaten, we can stop.
>>
>> New pattern:
>> One cook puts all the food items on the sushi-go-round-belt with a fancy plate
>> and the threads grab them one by one still using locks (but they are internal to
>> the belt).
>>
>> Are you saying we're content with just a bowl and everyone helps themselves
>> for getting food?
>
> No.  I am questioning how big overhead is for having the
> go-round-belt that must hold all dishes to be eaten, which did not
> exist in the original arrangement.
>
>

Then please don't pick up this patch. This and patch 5 are there to convince
Jeff this is a good API, worth being introduced and not over engineered, just
solving a problem we're interested in with a minimal amount of code to side
track from the actual goal we want to pursue.

^ permalink raw reply	[flat|nested] 20+ messages in thread

* Re: [PATCH 3/5] submodule: helper to run foreach in parallel
  2015-08-25 22:23       ` Junio C Hamano
@ 2015-08-25 22:44         ` Junio C Hamano
  0 siblings, 0 replies; 20+ messages in thread
From: Junio C Hamano @ 2015-08-25 22:44 UTC (permalink / raw)
  To: Stefan Beller; +Cc: Jeff King, git, Jonathan Nieder

Junio C Hamano <gitster@pobox.com> writes:

>> Why would we want to unplug the task queue from somewhere else?
>
> When you have a dispatcher more intelligent than a stupid FIFO, I
> would imagine that you would want to be able to do this pattern,
> especially when coming up with a task (not performing a task) takes
> non-trivial amount of work:
>
> 	prepare task queue and have N threads waiting on it;
>
> 	plug the queue, i.e. tell threads that do not start picking
> 	tasks out of it yet;

s/that do not/not to/; sorry for grammo resulting from a lot of
editing while doing other things X-<.

> ...
>       and finally, wait for everything to finish.
>
> Without "plug/unplug" interface, you _could_ do the above by doing
> something stupid like
> ...
>       and finally, wait for everything to finish.

... but having to use yet another queue outside only because the
task queue cannot be plugged feels stupid.

But because we do not yet have a way to plug custom dispatching
logic to the dispatcher, the above is in the "but that can wait
until the need arises." category, as I said.

^ permalink raw reply	[flat|nested] 20+ messages in thread

* Re: [PATCH 4/5] index-pack: Use the new worker pool
  2015-08-25 22:39           ` Stefan Beller
@ 2015-08-25 22:50             ` Junio C Hamano
  0 siblings, 0 replies; 20+ messages in thread
From: Junio C Hamano @ 2015-08-25 22:50 UTC (permalink / raw)
  To: Stefan Beller; +Cc: Jeff King, git, Jonathan Nieder

Stefan Beller <sbeller@google.com> writes:

> Then please don't pick up this patch. This and patch 5 are there to convince
> Jeff this is a good API, worth being introduced and not over engineered, just
> solving a problem we're interested in with a minimal amount of code to side
> track from the actual goal we want to pursue.

Don't worry.  I did not have intention to queue index-pack and
pack-objects rewrite unless the use of the new API makes them
demonstratably better.

The criteria for "worth being introduced" would include "not
excessively heavyweight", I would think.  And it was good that we
have these two patches to judge if the earlier one (2-3/5) is a good
thing to add in a concrete way.  Perhaps the 2% slowdown might be
showing the performance characteristics of your worker pool model,
and Peff's suggestion to at least see (if not solve) where the
overhead is coming from was a very reasonable one, I would think.

^ permalink raw reply	[flat|nested] 20+ messages in thread

* Re: [PATCH 3/5] submodule: helper to run foreach in parallel
  2015-08-25 17:28 ` [PATCH 3/5] submodule: helper to run foreach in parallel Stefan Beller
  2015-08-25 21:09   ` Junio C Hamano
@ 2015-08-26 17:06   ` Jeff King
  2015-08-26 17:21     ` Stefan Beller
  1 sibling, 1 reply; 20+ messages in thread
From: Jeff King @ 2015-08-26 17:06 UTC (permalink / raw)
  To: Stefan Beller; +Cc: git, jrnieder, gitster

On Tue, Aug 25, 2015 at 10:28:24AM -0700, Stefan Beller wrote:

> +int module_foreach_parallel(int argc, const char **argv, const char *prefix)
> +{
> [...]
> +	for (i = 0; i < ce_used; i++) {
> +		const struct submodule *sub;
> +		const struct cache_entry *ce = ce_entries[i];
> +		struct submodule_args *args = malloc(sizeof(*args));
> +
> +		if (ce_stage(ce))
> +			args->sha1 = xstrdup(sha1_to_hex(null_sha1));
> +		else
> +			args->sha1 = xstrdup(sha1_to_hex(ce->sha1));
> +
> +		strbuf_reset(&sb);
> +		strbuf_addf(&sb, "%s/.git", ce->name);
> +		if (!file_exists(sb.buf))
> +			continue;

"args" and "args->sha1" go out of scope and leak here.

> +		args->name = sub->name;
> +		args->toplevel = xstrdup(xgetcwd());

Another xgetcwd leak. :) I think this one can just drop the xstrdup.

(Both of these were spotted by Coverity. I know you have played with it
a little, so if you are actually reading the emails it sends, I'll stop
relaying them).

-Peff

^ permalink raw reply	[flat|nested] 20+ messages in thread

* Re: [PATCH 3/5] submodule: helper to run foreach in parallel
  2015-08-26 17:06   ` Jeff King
@ 2015-08-26 17:21     ` Stefan Beller
  0 siblings, 0 replies; 20+ messages in thread
From: Stefan Beller @ 2015-08-26 17:21 UTC (permalink / raw)
  To: Jeff King; +Cc: git, Jonathan Nieder, Junio C Hamano

On Wed, Aug 26, 2015 at 10:06 AM, Jeff King <peff@peff.net> wrote:
> On Tue, Aug 25, 2015 at 10:28:24AM -0700, Stefan Beller wrote:
>
>> +int module_foreach_parallel(int argc, const char **argv, const char *prefix)
>> +{
>> [...]
>> +     for (i = 0; i < ce_used; i++) {
>> +             const struct submodule *sub;
>> +             const struct cache_entry *ce = ce_entries[i];
>> +             struct submodule_args *args = malloc(sizeof(*args));
>> +
>> +             if (ce_stage(ce))
>> +                     args->sha1 = xstrdup(sha1_to_hex(null_sha1));
>> +             else
>> +                     args->sha1 = xstrdup(sha1_to_hex(ce->sha1));
>> +
>> +             strbuf_reset(&sb);
>> +             strbuf_addf(&sb, "%s/.git", ce->name);
>> +             if (!file_exists(sb.buf))
>> +                     continue;
>
> "args" and "args->sha1" go out of scope and leak here.
>
>> +             args->name = sub->name;
>> +             args->toplevel = xstrdup(xgetcwd());
>
> Another xgetcwd leak. :) I think this one can just drop the xstrdup.
>
> (Both of these were spotted by Coverity. I know you have played with it
> a little, so if you are actually reading the emails it sends, I'll stop
> relaying them).

Fixing those memleaks was the first thing I did this morning. :)

And yeah I do pay attention to these emails. (I maintain the automatic testing
every other day, so there is that.)

>
> -Peff

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [PATCH 3/5] submodule: helper to run foreach in parallel
  2015-08-27  0:52 [RFC PATCH 0/5] Progressing with `git submodule foreach_parallel` Stefan Beller
@ 2015-08-27  0:52 ` Stefan Beller
  0 siblings, 0 replies; 20+ messages in thread
From: Stefan Beller @ 2015-08-27  0:52 UTC (permalink / raw)
  To: git; +Cc: peff, jrnieder, gitster, Stefan Beller

Similar to `git submodule foreach` the new command
`git submodule foreach_parallel` will run a command
on each submodule.

The commands are run in parallel up to the number of
cores by default, or you can specify '-j 4' tun just
run with 4 threads for example.

One major difference to `git submodule foreach` is the
handling of input and output to the commands. Because
of the parallel nature of the execution it is not trivial
how to schedule the std{in,out,err} channel for submodule
the command is run in. So in this patch there is no
support for stdin.

The goal of the output for std{out, err} is to look like
the single threaded version as much as possible, so
stdout and stderr from one submodule operation are
buffered together in one single channel and output
together when the output is allowed.

To do that, we'll have a mutex for the output, which
each thread will try to acquire and directly pipe their
output to the standard output if they are lucky to
get the mutex.

If they do not have the mutex each thread will buffer
their output.

Example:
Let's assume we have 5 submodules A,B,C,D,E and the
operation on each submodule takes a different amount
of time (say `git fetch`), then the output of
`git submodule foreach` might look like this:

 time -->
 output: |---A---|   |-B-|   |----C-----------|   |-D-|   |-E-|

When we schedule these threads into two threads, a schedule
and sample output over time may look like this:

thread 1: |---A---|   |-D-|   |-E-|
thread 2: |-B-|   |----C-----------|

output:   |---A---| B |----C-------| E D

So A will be perceived as it would run normally in
the single threaded version of foreach. As B has finished
by the time the mutex becomes available, the whole buffer
will just be dumped into the standard output. This will be
perceived by the user as just a 'very fast' operation of B.
Once that is done, C takes the mutex, and flushes the piled
up buffer to standard output. In case the subcommand is a
git command, we have a progress display, which will just
look like the first half of C happend really quickly.

Notice how E and D are put out in a different order than the
original as the new parallel foreach doesn't care about the
order.

So this way of output is really good for human consumption
and not for machine consumption as you always see the progress,
but it is not easy to tell which output comes from which
command as there is no indication other than displaying
"Entering <submodule path>" for each beginning section of
output.

Maybe we want to integrate the unthreaded foreach eventually
into the new code base in C and have special cases for that,
such as accepting stdin again.

Signed-off-by: Stefan Beller <sbeller@google.com>
---
 builtin/submodule--helper.c | 160 +++++++++++++++++++++++++++++++++++++++++++-
 git-submodule.sh            |  11 ++-
 2 files changed, 168 insertions(+), 3 deletions(-)

diff --git a/builtin/submodule--helper.c b/builtin/submodule--helper.c
index f11fb9c..2c06f28 100644
--- a/builtin/submodule--helper.c
+++ b/builtin/submodule--helper.c
@@ -8,8 +8,11 @@
 #include "submodule.h"
 #include "submodule-config.h"
 #include "string-list.h"
+#include "thread-utils.h"
 #include "run-command.h"
-
+#ifndef NO_PTHREADS
+#include <semaphore.h>
+#endif
 static const struct cache_entry **ce_entries;
 static int ce_alloc, ce_used;
 static const char *alternative_path;
@@ -279,6 +282,155 @@ static int module_clone(int argc, const char **argv, const char *prefix)
 	return 0;
 }
 
+struct submodule_args {
+	const char *name;
+	const char *path;
+	const char *sha1;
+	const char *toplevel;
+	const char *prefix;
+	const char **cmd;
+	pthread_mutex_t *mutex;
+};
+
+int run_cmd_submodule(struct task_queue *aq, void *task)
+{
+	int i, lock_acquired = 0;
+	struct submodule_args *args = task;
+	struct strbuf out = STRBUF_INIT;
+	struct strbuf sb = STRBUF_INIT;
+	struct child_process *cp = xmalloc(sizeof(*cp));
+	char buf[1024];
+
+	strbuf_addf(&out, N_("Entering %s\n"), relative_path(args->path, args->prefix, &sb));
+
+	child_process_init(cp);
+	argv_array_pushv(&cp->args, args->cmd);
+
+	argv_array_pushf(&cp->env_array, "name=%s", args->name);
+	argv_array_pushf(&cp->env_array, "path=%s", args->path);
+	argv_array_pushf(&cp->env_array, "sha1=%s", args->sha1);
+	argv_array_pushf(&cp->env_array, "toplevel=%s", args->toplevel);
+
+	for (i = 0; local_repo_env[i]; i++)
+		argv_array_push(&cp->env_array, local_repo_env[i]);
+
+	cp->no_stdin = 1;
+	cp->out = 0;
+	cp->err = -1;
+	cp->dir = args->path;
+	cp->stdout_to_stderr = 1;
+	cp->use_shell = 1;
+
+	if (start_command(cp)) {
+		die("Could not start command");
+		for (i = 0; cp->args.argv; i++)
+			fprintf(stderr, "%s\n", cp->args.argv[i]);
+	}
+
+	while (1) {
+		ssize_t len = xread(cp->err, buf, sizeof(buf));
+		if (len < 0)
+			die("Read from child failed");
+		else if (len == 0)
+			break;
+		else {
+			strbuf_add(&out, buf, len);
+		}
+		if (!pthread_mutex_trylock(args->mutex))
+			lock_acquired = 1;
+		if (lock_acquired) {
+			fputs(out.buf, stderr);
+			strbuf_reset(&out);
+		}
+	}
+	if (finish_command(cp))
+		die("command died with error");
+
+	if (!lock_acquired)
+		pthread_mutex_lock(args->mutex);
+
+	fputs(out.buf, stderr);
+	pthread_mutex_unlock(args->mutex);
+
+	return 0;
+}
+
+int module_foreach_parallel(int argc, const char **argv, const char *prefix)
+{
+	int i, recursive = 0, number_threads = 0, quiet = 0;
+	static struct pathspec pathspec;
+	struct strbuf sb = STRBUF_INIT;
+	struct task_queue *aq;
+	char **cmd;
+	const char **nullargv = {NULL};
+	pthread_mutex_t mutex;
+
+	struct option module_update_options[] = {
+		OPT_STRING(0, "prefix", &alternative_path,
+			   N_("path"),
+			   N_("alternative anchor for relative paths")),
+		OPT_STRING(0, "cmd", &cmd,
+			   N_("string"),
+			   N_("command to run")),
+		OPT_BOOL('r', "--recursive", &recursive,
+			 N_("Recurse into nexted submodules")),
+		OPT_INTEGER('j', "jobs", &number_threads,
+			    N_("Recurse into nexted submodules")),
+		OPT__QUIET(&quiet, N_("Suppress output")),
+		OPT_END()
+	};
+
+	static const char * const git_submodule_helper_usage[] = {
+		N_("git submodule--helper foreach [--prefix=<path>] [<path>...]"),
+		NULL
+	};
+
+	argc = parse_options(argc, argv, prefix, module_update_options,
+			     git_submodule_helper_usage, 0);
+
+	if (module_list_compute(0, nullargv, NULL, &pathspec) < 0)
+		return 1;
+
+	gitmodules_config();
+
+	pthread_mutex_init(&mutex, NULL);
+	aq = create_task_queue(number_threads);
+
+	for (i = 0; i < ce_used; i++) {
+		const struct submodule *sub;
+		const struct cache_entry *ce = ce_entries[i];
+		struct submodule_args *args = malloc(sizeof(*args));
+
+		if (ce_stage(ce))
+			args->sha1 = xstrdup(sha1_to_hex(null_sha1));
+		else
+			args->sha1 = xstrdup(sha1_to_hex(ce->sha1));
+
+		strbuf_reset(&sb);
+		strbuf_addf(&sb, "%s/.git", ce->name);
+		if (!file_exists(sb.buf)) {
+			free(args);
+			continue;
+		}
+
+		args->path = ce->name;
+		sub = submodule_from_path(null_sha1, args->path);
+		if (!sub)
+			die("No submodule mapping found in .gitmodules for path '%s'", args->path);
+
+		args->name = sub->name;
+		args->toplevel = xgetcwd();
+		args->cmd = argv;
+		args->mutex = &mutex;
+		args->prefix = alternative_path;
+		add_task(aq, run_cmd_submodule, args);
+	}
+
+	finish_task_queue(aq, NULL);
+	pthread_mutex_destroy(&mutex);
+	return 0;
+}
+
 int cmd_submodule__helper(int argc, const char **argv, const char *prefix)
 {
 	if (argc < 2)
@@ -293,6 +445,10 @@ int cmd_submodule__helper(int argc, const char **argv, const char *prefix)
 	if (!strcmp(argv[1], "module_clone"))
 		return module_clone(argc - 1, argv + 1, prefix);
 
+	if (!strcmp(argv[1], "foreach_parallel"))
+		return module_foreach_parallel(argc - 1, argv + 1, prefix);
+
 usage:
-	usage("git submodule--helper [module_list module_name module_clone]\n");
+	fprintf(stderr, "%s", argv[1]);
+	usage("git submodule--helper [module_list module_name module_clone foreach_parallel]\n");
 }
diff --git a/git-submodule.sh b/git-submodule.sh
index fb5155e..f06488a 100755
--- a/git-submodule.sh
+++ b/git-submodule.sh
@@ -431,6 +431,15 @@ cmd_foreach()
 }
 
 #
+# Execute an arbitrary command sequence in each checked out
+# submodule in parallel.
+#
+cmd_foreach_parallel()
+{
+	git submodule--helper foreach_parallel --prefix "$wt_prefix" $@
+}
+
+#
 # Register submodules in .git/config
 #
 # $@ = requested paths (default to all)
@@ -1225,7 +1234,7 @@ cmd_sync()
 while test $# != 0 && test -z "$command"
 do
 	case "$1" in
-	add | foreach | init | deinit | update | status | summary | sync)
+	add | foreach | foreach_parallel | init | deinit | update | status | summary | sync)
 		command=$1
 		;;
 	-q|--quiet)
-- 
2.5.0.264.g784836d

^ permalink raw reply related	[flat|nested] 20+ messages in thread

end of thread, other threads:[~2015-08-27  0:53 UTC | newest]

Thread overview: 20+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2015-08-25 17:28 [RFC PATCH 0/5] Demonstrate new parallel threading API Stefan Beller
2015-08-25 17:28 ` [PATCH 1/5] FIXUP submodule: implement `module_clone` as a builtin helper Stefan Beller
2015-08-25 17:28 ` [PATCH 2/5] thread-utils: add a threaded task queue Stefan Beller
2015-08-25 17:28 ` [PATCH 3/5] submodule: helper to run foreach in parallel Stefan Beller
2015-08-25 21:09   ` Junio C Hamano
2015-08-25 21:42     ` Stefan Beller
2015-08-25 22:23       ` Junio C Hamano
2015-08-25 22:44         ` Junio C Hamano
2015-08-26 17:06   ` Jeff King
2015-08-26 17:21     ` Stefan Beller
2015-08-25 17:28 ` [PATCH 4/5] index-pack: Use the new worker pool Stefan Beller
2015-08-25 19:03   ` Jeff King
2015-08-25 19:23     ` Stefan Beller
2015-08-25 20:41     ` Junio C Hamano
2015-08-25 20:59       ` Stefan Beller
2015-08-25 21:12         ` Junio C Hamano
2015-08-25 22:39           ` Stefan Beller
2015-08-25 22:50             ` Junio C Hamano
2015-08-25 17:28 ` [PATCH 5/5] pack-objects: Use " Stefan Beller
2015-08-27  0:52 [RFC PATCH 0/5] Progressing with `git submodule foreach_parallel` Stefan Beller
2015-08-27  0:52 ` [PATCH 3/5] submodule: helper to run foreach in parallel Stefan Beller

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.