All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH 1/3] submodule: implement `module_clone` as a builtin helper
@ 2015-08-21  1:40 Stefan Beller
  2015-08-21  1:40 ` [RFC PATCH 2/3] run-commands: add an async queue processor Stefan Beller
  2015-08-21  1:40 ` [WIP/PATCH 3/3] submodule: helper to run foreach in parallel Stefan Beller
  0 siblings, 2 replies; 16+ messages in thread
From: Stefan Beller @ 2015-08-21  1:40 UTC (permalink / raw)
  To: git; +Cc: gitster, jrnieder, hvoigt, jens.lehmann, Stefan Beller

`module_clone` is part of the update command, which I want to convert
to C next.

Signed-off-by: Stefan Beller <sbeller@google.com>
---

Replacing the latest patch in sb/submodule-helper as that contains
debug code in `cmd_submodule__helper`.

 builtin/submodule--helper.c | 156 +++++++++++++++++++++++++++++++++++++++++++-
 git-submodule.sh            |  80 +----------------------
 2 files changed, 158 insertions(+), 78 deletions(-)

diff --git a/builtin/submodule--helper.c b/builtin/submodule--helper.c
index 4b32a3c..ae74b80 100644
--- a/builtin/submodule--helper.c
+++ b/builtin/submodule--helper.c
@@ -8,6 +8,7 @@
 #include "submodule.h"
 #include "submodule-config.h"
 #include "string-list.h"
+#include "run-command.h"
 
 static const struct cache_entry **ce_entries;
 static int ce_alloc, ce_used;
@@ -124,6 +125,156 @@ static int module_name(int argc, const char **argv, const char *prefix)
 	return 0;
 }
 
+static int clone_submodule(const char *path, const char *gitdir, const char *url,
+			   const char *depth, const char *reference, int quiet)
+{
+	struct child_process cp;
+	child_process_init(&cp);
+
+	argv_array_push(&cp.args, "clone");
+	argv_array_push(&cp.args, "--no-checkout");
+	if (quiet)
+		argv_array_push(&cp.args, "--quiet");
+	if (depth && strcmp(depth, "")) {
+		argv_array_push(&cp.args, "--depth");
+		argv_array_push(&cp.args, depth);
+	}
+	if (reference && strcmp(reference, "")) {
+		argv_array_push(&cp.args, "--reference");
+		argv_array_push(&cp.args, reference);
+	}
+	if (gitdir) {
+		argv_array_push(&cp.args, "--separate-git-dir");
+		argv_array_push(&cp.args, gitdir);
+	}
+	argv_array_push(&cp.args, url);
+	argv_array_push(&cp.args, path);
+
+	cp.git_cmd = 1;
+	cp.env = local_repo_env;
+
+	cp.no_stdin = 1;
+	cp.no_stdout = 1;
+	cp.no_stderr = 1;
+
+	return run_command(&cp);
+}
+
+/*
+ * Clone a submodule
+ *
+ * $1 = submodule path
+ * $2 = submodule name
+ * $3 = URL to clone
+ * $4 = reference repository to reuse (empty for independent)
+ * $5 = depth argument for shallow clones (empty for deep)
+ *
+ * Prior to calling, cmd_update checks that a possibly existing
+ * path is not a git repository.
+ * Likewise, cmd_add checks that path does not exist at all,
+ * since it is the location of a new submodule.
+ */
+static int module_clone(int argc, const char **argv, const char *prefix)
+{
+	const char *path = NULL, *name = NULL, *url = NULL, *reference = NULL, *depth = NULL;
+	int quiet = 0;
+	FILE *submodule_dot_git;
+	const char *sm_gitdir, *p;
+	struct strbuf rel_path = STRBUF_INIT;
+	struct strbuf sb = STRBUF_INIT;
+
+	struct option module_update_options[] = {
+		OPT_STRING(0, "prefix", &alternative_path,
+			   N_("path"),
+			   N_("alternative anchor for relative paths")),
+		OPT_STRING(0, "path", &path,
+			   N_("path"),
+			   N_("where the new submodule will be cloned to")),
+		OPT_STRING(0, "name", &name,
+			   N_("string"),
+			   N_("name of the new submodule")),
+		OPT_STRING(0, "url", &url,
+			   N_("string"),
+			   N_("url where to clone the submodule from")),
+		OPT_STRING(0, "reference", &reference,
+			   N_("string"),
+			   N_("reference repository")),
+		OPT_STRING(0, "depth", &depth,
+			   N_("string"),
+			   N_("depth for shallow clones")),
+		OPT_END()
+	};
+
+	static const char * const git_submodule_helper_usage[] = {
+		N_("git submodule--helper update [--prefix=<path>] [--quiet] [--remote] [-N|--no-fetch]"
+		   "[-f|--force] [--rebase|--merge] [--reference <repository>]"
+		   "[--depth <depth>] [--recursive] [--] [<path>...]"),
+		NULL
+	};
+
+	argc = parse_options(argc, argv, prefix, module_update_options,
+			     git_submodule_helper_usage, 0);
+
+	if (getenv("GIT_QUIET"))
+		quiet = 1;
+
+	strbuf_addf(&sb, "%s/modules/%s", get_git_dir(), name);
+	sm_gitdir = strbuf_detach(&sb, NULL);
+	strbuf_reset(&sb);
+
+	if (!file_exists(sm_gitdir)) {
+		safe_create_leading_directories_const(sm_gitdir);
+		if (clone_submodule(path, sm_gitdir, url, depth, reference, quiet))
+			die(N_("Clone of '%s' into submodule path '%s' failed"),
+			    url, path);
+	} else {
+		safe_create_leading_directories_const(path);
+		unlink(sm_gitdir);
+	}
+
+	/* Write a .git file in the submodule to redirect to the superproject. */
+	if (alternative_path && !strcmp(alternative_path, "")) {
+		p = relative_path(path, alternative_path, &sb);
+		strbuf_reset(&sb);
+	} else
+		p = path;
+
+	if (safe_create_leading_directories_const(p) < 0)
+		die("Could not create directory '%s'", p);
+
+	strbuf_addf(&sb, "%s/.git", p);
+
+	if (safe_create_leading_directories_const(sb.buf) < 0)
+		die(_("could not create leading directories of '%s'"), sb.buf);
+	submodule_dot_git = fopen(sb.buf, "w");
+	if (!submodule_dot_git)
+		die ("Cannot open file '%s': %s", sb.buf, strerror(errno));
+
+	fprintf(submodule_dot_git, "gitdir: %s\n",
+		relative_path(sm_gitdir, path, &rel_path));
+	if (fclose(submodule_dot_git))
+		die("Could not close file %s", sb.buf);
+	strbuf_reset(&sb);
+
+	/* Redirect the worktree of the submodule in the superprojects config */
+	if (!is_absolute_path(sm_gitdir)) {
+		char *s = (char*)sm_gitdir;
+		strbuf_addf(&sb, "%s/%s", xgetcwd(), sm_gitdir);
+		sm_gitdir = strbuf_detach(&sb, NULL);
+		strbuf_reset(&sb);
+		free(s);
+	}
+	strbuf_addf(&sb, "%s/%s", xgetcwd(), path);
+
+	p = git_pathdup_submodule(path, "config");
+	if (!p)
+		die("Could not get submodule directory for '%s'", path);
+	git_config_set_in_file(p, "core.worktree",
+			       relative_path(sb.buf, sm_gitdir, &rel_path));
+	strbuf_release(&sb);
+	return 0;
+}
+
 int cmd_submodule__helper(int argc, const char **argv, const char *prefix)
 {
 	if (argc < 2)
@@ -135,6 +286,9 @@ int cmd_submodule__helper(int argc, const char **argv, const char *prefix)
 	if (!strcmp(argv[1], "module_name"))
 		return module_name(argc - 2, argv + 2, prefix);
 
+	if (!strcmp(argv[1], "module_clone"))
+		return module_clone(argc - 1, argv + 1, prefix);
+
 usage:
-	usage("git submodule--helper [module_list module_name]\n");
+	usage("git submodule--helper [module_list module_name module_clone]\n");
 }
diff --git a/git-submodule.sh b/git-submodule.sh
index e6ff38d..fb5155e 100755
--- a/git-submodule.sh
+++ b/git-submodule.sh
@@ -178,80 +178,6 @@ get_submodule_config () {
 	printf '%s' "${value:-$default}"
 }
 
-#
-# Clone a submodule
-#
-# $1 = submodule path
-# $2 = submodule name
-# $3 = URL to clone
-# $4 = reference repository to reuse (empty for independent)
-# $5 = depth argument for shallow clones (empty for deep)
-#
-# Prior to calling, cmd_update checks that a possibly existing
-# path is not a git repository.
-# Likewise, cmd_add checks that path does not exist at all,
-# since it is the location of a new submodule.
-#
-module_clone()
-{
-	sm_path=$1
-	name=$2
-	url=$3
-	reference="$4"
-	depth="$5"
-	quiet=
-	if test -n "$GIT_QUIET"
-	then
-		quiet=-q
-	fi
-
-	gitdir=
-	gitdir_base=
-	base_name=$(dirname "$name")
-
-	gitdir=$(git rev-parse --git-dir)
-	gitdir_base="$gitdir/modules/$base_name"
-	gitdir="$gitdir/modules/$name"
-
-	if test -d "$gitdir"
-	then
-		mkdir -p "$sm_path"
-		rm -f "$gitdir/index"
-	else
-		mkdir -p "$gitdir_base"
-		(
-			clear_local_git_env
-			git clone $quiet ${depth:+"$depth"} -n ${reference:+"$reference"} \
-				--separate-git-dir "$gitdir" "$url" "$sm_path"
-		) ||
-		die "$(eval_gettext "Clone of '\$url' into submodule path '\$sm_path' failed")"
-	fi
-
-	# We already are at the root of the work tree but cd_to_toplevel will
-	# resolve any symlinks that might be present in $PWD
-	a=$(cd_to_toplevel && cd "$gitdir" && pwd)/
-	b=$(cd_to_toplevel && cd "$sm_path" && pwd)/
-	# Remove all common leading directories after a sanity check
-	if test "${a#$b}" != "$a" || test "${b#$a}" != "$b"; then
-		die "$(eval_gettext "Gitdir '\$a' is part of the submodule path '\$b' or vice versa")"
-	fi
-	while test "${a%%/*}" = "${b%%/*}"
-	do
-		a=${a#*/}
-		b=${b#*/}
-	done
-	# Now chop off the trailing '/'s that were added in the beginning
-	a=${a%/}
-	b=${b%/}
-
-	# Turn each leading "*/" component into "../"
-	rel=$(printf '%s\n' "$b" | sed -e 's|[^/][^/]*|..|g')
-	printf '%s\n' "gitdir: $rel/$a" >"$sm_path/.git"
-
-	rel=$(printf '%s\n' "$a" | sed -e 's|[^/][^/]*|..|g')
-	(clear_local_git_env; cd "$sm_path" && GIT_WORK_TREE=. git config core.worktree "$rel/$b")
-}
-
 isnumber()
 {
 	n=$(($1 + 0)) 2>/dev/null && test "$n" = "$1"
@@ -301,7 +227,7 @@ cmd_add()
 			shift
 			;;
 		--depth=*)
-			depth=$1
+			depth="$1"
 			;;
 		--)
 			shift
@@ -412,7 +338,7 @@ Use -f if you really want to add it." >&2
 				echo "$(eval_gettext "Reactivating local git directory for submodule '\$sm_name'.")"
 			fi
 		fi
-		module_clone "$sm_path" "$sm_name" "$realrepo" "$reference" "$depth" || exit
+		git submodule--helper module_clone --prefix "$wt_prefix" --path "$sm_path" --name "$sm_name" --url "$realrepo" "$reference" "$depth" || exit
 		(
 			clear_local_git_env
 			cd "$sm_path" &&
@@ -774,7 +700,7 @@ Maybe you want to use 'update --init'?")"
 
 		if ! test -d "$sm_path"/.git && ! test -f "$sm_path"/.git
 		then
-			module_clone "$sm_path" "$name" "$url" "$reference" "$depth" || exit
+			git submodule--helper module_clone --prefix "$prefix" --path "$sm_path" --name "$name" --url "$url" "$reference" "$depth" || exit
 			cloned_modules="$cloned_modules;$name"
 			subsha1=
 		else
-- 
2.5.0.264.g01b5c38.dirty

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [RFC PATCH 2/3] run-commands: add an async queue processor
  2015-08-21  1:40 [PATCH 1/3] submodule: implement `module_clone` as a builtin helper Stefan Beller
@ 2015-08-21  1:40 ` Stefan Beller
  2015-08-21 19:05   ` Junio C Hamano
  2015-08-21  1:40 ` [WIP/PATCH 3/3] submodule: helper to run foreach in parallel Stefan Beller
  1 sibling, 1 reply; 16+ messages in thread
From: Stefan Beller @ 2015-08-21  1:40 UTC (permalink / raw)
  To: git; +Cc: gitster, jrnieder, hvoigt, jens.lehmann, Stefan Beller

This adds functionality to do work in parallel.

The whole life cycle of such a thread pool would look like

    struct task_queue * tq = create_task_queue(32); // no of threads
    for (...)
        add_task(tq, process_one_item_function, item); // non blocking
    ...
    int ret = finish_task_queue(tq); // blocks until all tasks are done
    if (!tq)
        die ("Not all items were be processed");

The caller must take care of handling the output.

Signed-off-by: Stefan Beller <sbeller@google.com>
---

I sent this a while ago to the list, no comments on it :(
The core functionality stayed the same, but I hope to improved naming and
location of the code.

The WIP is only for the NO_PTHREADS case.


 run-command.c | 212 ++++++++++++++++++++++++++++++++++++++++++++++++++++++----
 run-command.h |  30 +++++++++
 2 files changed, 230 insertions(+), 12 deletions(-)

diff --git a/run-command.c b/run-command.c
index 28e1d55..4029011 100644
--- a/run-command.c
+++ b/run-command.c
@@ -4,6 +4,21 @@
 #include "sigchain.h"
 #include "argv-array.h"
 
+#ifdef NO_PTHREADS
+
+#else
+
+#include "thread-utils.h"
+
+#include <pthread.h>
+#include <semaphore.h>
+#include <stdio.h>
+#include <unistd.h>
+
+#endif
+
+#include "git-compat-util.h"
+
 void child_process_init(struct child_process *child)
 {
 	memset(child, 0, sizeof(*child));
@@ -668,6 +683,22 @@ int git_atexit(void (*handler)(void))
 
 #endif
 
+void setup_main_thread()
+{
+	if (!main_thread_set) {
+		/*
+		 * We assume that the first time that start_async is called
+		 * it is from the main thread.
+		 */
+		main_thread_set = 1;
+		main_thread = pthread_self();
+		pthread_key_create(&async_key, NULL);
+		pthread_key_create(&async_die_counter, NULL);
+		set_die_routine(die_async);
+		set_die_is_recursing_routine(async_die_is_recursing);
+	}
+}
+
 int start_async(struct async *async)
 {
 	int need_in, need_out;
@@ -740,18 +771,7 @@ int start_async(struct async *async)
 	else if (async->out)
 		close(async->out);
 #else
-	if (!main_thread_set) {
-		/*
-		 * We assume that the first time that start_async is called
-		 * it is from the main thread.
-		 */
-		main_thread_set = 1;
-		main_thread = pthread_self();
-		pthread_key_create(&async_key, NULL);
-		pthread_key_create(&async_die_counter, NULL);
-		set_die_routine(die_async);
-		set_die_is_recursing_routine(async_die_is_recursing);
-	}
+	setup_main_thread();
 
 	if (proc_in >= 0)
 		set_cloexec(proc_in);
@@ -852,3 +872,171 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint)
 	close(cmd->out);
 	return finish_command(cmd);
 }
+
+#ifndef NO_PTHREADS
+struct job_list {
+	int (*fct)(struct task_queue *aq, void *task);
+	void *task;
+	struct job_list *next;
+};
+#endif
+
+struct task_queue {
+#ifndef NO_PTHREADS
+	/*
+	 * To avoid deadlocks always aquire the semaphores with lowest priority
+	 * first, priorites are in descending order as listed.
+	 *
+	 * The `mutex` is a general purpose lock for modifying data in the async
+	 * queue, such as adding a new task or adding a return value from
+	 * an already run task.
+	 *
+	 * `workingcount` and `freecount` are opposing semaphores, the sum of
+	 * their values should equal `max_threads` at any time while the `mutex`
+	 * is available.
+	 */
+	sem_t mutex;
+	sem_t workingcount;
+	sem_t freecount;
+
+	pthread_t *threads;
+	unsigned max_threads;
+
+	struct job_list *first;
+	struct job_list *last;
+#endif
+	int early_return;
+};
+
+#ifndef NO_PTHREADS
+
+static void get_task(struct task_queue *aq,
+		     int (**fct)(struct task_queue *aq, void *task),
+		     void **task,
+		     int *early_return)
+{
+	struct job_list *job;
+
+	sem_wait(&aq->workingcount);
+	sem_wait(&aq->mutex);
+
+	if (!aq->first)
+		die("BUG: internal error with dequeuing jobs for threads");
+	job = aq->first;
+	*fct = job->fct;
+	*task = job->task;
+	aq->early_return |= *early_return;
+	*early_return = aq->early_return;
+	aq->first = job->next;
+	if (!aq->first)
+		aq->last = NULL;
+
+	sem_post(&aq->freecount);
+	sem_post(&aq->mutex);
+
+	free(job);
+}
+
+static void* dispatcher(void *args)
+{
+	void *task;
+	int (*fct)(struct task_queue *aq, void *data);
+	int early_return = 0;
+	struct task_queue *aq = args;
+
+	get_task(aq, &fct, &task, &early_return);
+	while (fct || early_return != 0) {
+		early_return = fct(aq, task);
+		get_task(aq, &fct, &task, &early_return);
+	}
+
+	pthread_exit(0);
+}
+#endif
+
+struct task_queue *create_task_queue(unsigned max_threads)
+{
+	struct task_queue *aq = xmalloc(sizeof(*aq));
+
+#ifndef NO_PTHREADS
+	int i;
+	if (!max_threads)
+		aq->max_threads = online_cpus();
+	else
+		aq->max_threads = max_threads;
+
+	sem_init(&aq->mutex, 0, 1);
+	sem_init(&aq->workingcount, 0, 0);
+	sem_init(&aq->freecount, 0, aq->max_threads);
+	aq->threads = xmalloc(aq->max_threads * sizeof(pthread_t));
+
+	for (i = 0; i < aq->max_threads; i++)
+		pthread_create(&aq->threads[i], 0, &dispatcher, aq);
+
+	aq->first = NULL;
+	aq->last = NULL;
+
+	setup_main_thread();
+#endif
+	aq->early_return = 0;
+
+	return aq;
+}
+
+void add_task(struct task_queue *aq,
+	      int (*fct)(struct task_queue *aq, void *task),
+	      void *task)
+{
+#ifndef NO_PTHREADS
+	struct job_list *job_list;
+
+	job_list = xmalloc(sizeof(*job_list));
+	job_list->task = task;
+	job_list->fct = fct;
+	job_list->next = NULL;
+
+	sem_wait(&aq->freecount);
+	sem_wait(&aq->mutex);
+
+	if (!aq->last) {
+		aq->last = job_list;
+		aq->first = aq->last;
+	} else {
+		aq->last->next = job_list;
+		aq->last = aq->last->next;
+	}
+
+	sem_post(&aq->workingcount);
+	sem_post(&aq->mutex);
+#else
+	ALLOC_GROW(aq->ret->ret, aq->ret->count + 1, aq->ret->alloc);
+	aq->ret->ret[aq->ret->count++] = aq->function(job);
+#endif
+}
+
+int finish_task_queue(struct task_queue *aq)
+{
+	int ret;
+#ifndef NO_PTHREADS
+	int i;
+	for (i = 0; i < aq->max_threads; i++)
+		add_task(aq, NULL, NULL);
+
+	for (i = 0; i < aq->max_threads; i++)
+		pthread_join(aq->threads[i], 0);
+
+	sem_destroy(&aq->mutex);
+	sem_destroy(&aq->workingcount);
+	sem_destroy(&aq->freecount);
+
+	if (aq->first)
+		die("BUG: internal error with queuing jobs for threads");
+
+	free(aq->threads);
+#endif
+	ret = aq->early_return;
+
+	free(aq);
+	return ret;
+}
+
diff --git a/run-command.h b/run-command.h
index 5b4425a..c2cfd49 100644
--- a/run-command.h
+++ b/run-command.h
@@ -119,4 +119,34 @@ struct async {
 int start_async(struct async *async);
 int finish_async(struct async *async);
 
+/*
+ * Creates a struct `task_queue`, which holds a list of tasks. Up to
+ * `max_threads` threads are active to process the enqueued tasks
+ * processing the tasks in a first in first out order.
+ *
+ * If `max_threads` is zero the number of cores available will be used.
+ *
+ * Currently this only works in environments with pthreads, in other
+ * environments, the task will be processed sequentially in `add_task`.
+ */
+struct task_queue *create_task_queue(unsigned max_threads);
+
+/*
+ * The function and data are put into the task queue.
+ *
+ * The function `fct` must not be NULL, as that's used internally
+ * in `finish_task_queue` to signal shutdown. If the return code
+ * of `fct` is unequal to 0, the tasks will stop eventually,
+ * the current parallel tasks will be flushed out.
+ */
+void add_task(struct task_queue *aq,
+	      int (*fct)(struct task_queue *aq, void *task),
+	      void *task);
+
+/*
+ * Waits for all tasks to be done and frees the object. The return code
+ * is zero if all enqueued tasks were processed.
+ */
+int finish_task_queue(struct task_queue *aq);
+
 #endif
-- 
2.5.0.264.g01b5c38.dirty

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [WIP/PATCH 3/3] submodule: helper to run foreach in parallel
  2015-08-21  1:40 [PATCH 1/3] submodule: implement `module_clone` as a builtin helper Stefan Beller
  2015-08-21  1:40 ` [RFC PATCH 2/3] run-commands: add an async queue processor Stefan Beller
@ 2015-08-21  1:40 ` Stefan Beller
  2015-08-21 19:23   ` Junio C Hamano
  1 sibling, 1 reply; 16+ messages in thread
From: Stefan Beller @ 2015-08-21  1:40 UTC (permalink / raw)
  To: git; +Cc: gitster, jrnieder, hvoigt, jens.lehmann, Stefan Beller

This runs a command on each submodule in parallel and should eventually
replace `git submodule foreach`.

There is a new option -j/--jobs (inspired by make) to specify the number
of parallel threads.

The jobs=1 case needs to be special cases to exactly replicate the current
default behavior of `git submodule foreach` such as working stdin input.
For more than one job there is no input possible and the output of both
stdout/stderr of the command are put into the stderr in an ordered fashion,
i.e. the tasks to not intermingle their output in races.

what currently works:
 git submodule--helper foreach "echo \$toplevel-\$name-\$path-\$sha1"
which I took for testing during development from t7407.

Signed-off-by: Stefan Beller <sbeller@google.com>
---

This is still WIP, but comments are welcome.

 builtin/submodule--helper.c | 147 +++++++++++++++++++++++++++++++++++++++++++-
 git-submodule.sh            |   9 +++
 2 files changed, 154 insertions(+), 2 deletions(-)

diff --git a/builtin/submodule--helper.c b/builtin/submodule--helper.c
index ae74b80..9823302 100644
--- a/builtin/submodule--helper.c
+++ b/builtin/submodule--helper.c
@@ -9,7 +9,9 @@
 #include "submodule-config.h"
 #include "string-list.h"
 #include "run-command.h"
-
+#ifndef NO_PTHREADS
+#include <semaphore.h>
+#endif
 static const struct cache_entry **ce_entries;
 static int ce_alloc, ce_used;
 static const char *alternative_path;
@@ -275,6 +277,144 @@ static int module_clone(int argc, const char **argv, const char *prefix)
 	return 0;
 }
 
+struct submodule_args {
+	const char *name;
+	const char *path;
+	const char *sha1;
+	const char *toplevel;
+	const char *prefix;
+	const char **cmd;
+	struct submodule_output *out;
+	sem_t *mutex;
+};
+
+int run_cmd_submodule(struct task_queue *aq, void *task)
+{
+	int i;
+	struct submodule_args *args = task;
+	struct strbuf out = STRBUF_INIT;
+	struct strbuf sb = STRBUF_INIT;
+	struct child_process *cp = xmalloc(sizeof(*cp));
+	char buf[1024];
+
+	strbuf_addf(&out, N_("Entering %s\n"), relative_path(args->path, args->prefix, &sb));
+
+	child_process_init(cp);
+	argv_array_pushv(&cp->args, args->cmd);
+
+	argv_array_pushf(&cp->env_array, "name=%s", args->name);
+	argv_array_pushf(&cp->env_array, "path=%s", args->path);
+	argv_array_pushf(&cp->env_array, "sha1=%s", args->sha1);
+	argv_array_pushf(&cp->env_array, "toplevel=%s", args->toplevel);
+
+	for (i = 0; local_repo_env[i]; i++)
+		argv_array_push(&cp->env_array, local_repo_env[i]);
+
+	cp->no_stdin = 1;
+	cp->out = 0;
+	cp->err = -1;
+	cp->dir = args->path;
+	cp->stdout_to_stderr = 1;
+	cp->use_shell = 1;
+
+	if (start_command(cp)) {
+		die("Could not start command");
+		for (i = 0; cp->args.argv; i++)
+			fprintf(stderr, "%s\n", cp->args.argv[i]);
+	}
+
+	while (1) {
+		ssize_t len = xread(cp->err, buf, sizeof(buf));
+		if (len < 0)
+			die("Read from child failed");
+		else if (len == 0)
+			break;
+		else {
+			strbuf_add(&out, buf, len);
+		}
+	}
+	if (finish_command(cp))
+		die("command died with error");
+
+	sem_wait(args->mutex);
+	fputs(out.buf, stderr);
+	sem_post(args->mutex);
+
+	return 0;
+}
+
+int module_foreach_parallel(int argc, const char **argv, const char *prefix)
+{
+	int i, recursive = 0, number_threads = 1, quiet = 0;
+	static struct pathspec pathspec;
+	struct strbuf sb = STRBUF_INIT;
+	struct task_queue *aq;
+	char **cmd;
+	const char **nullargv = {NULL};
+	sem_t *mutex = xmalloc(sizeof(*mutex));
+
+	struct option module_update_options[] = {
+		OPT_STRING(0, "prefix", &alternative_path,
+			   N_("path"),
+			   N_("alternative anchor for relative paths")),
+		OPT_STRING(0, "cmd", &cmd,
+			   N_("string"),
+			   N_("command to run")),
+		OPT_BOOL('r', "--recursive", &recursive,
+			 N_("Recurse into nexted submodules")),
+		OPT_INTEGER('j', "jobs", &number_threads,
+			    N_("Recurse into nexted submodules")),
+		OPT__QUIET(&quiet, N_("Suppress output")),
+		OPT_END()
+	};
+
+	static const char * const git_submodule_helper_usage[] = {
+		N_("git submodule--helper foreach [--prefix=<path>] [<path>...]"),
+		NULL
+	};
+
+	argc = parse_options(argc, argv, prefix, module_update_options,
+			     git_submodule_helper_usage, 0);
+
+	if (module_list_compute(0, nullargv, NULL, &pathspec) < 0)
+		return 1;
+
+	gitmodules_config();
+
+	aq = create_task_queue(number_threads);
+
+	for (i = 0; i < ce_used; i++) {
+		const struct submodule *sub;
+		const struct cache_entry *ce = ce_entries[i];
+		struct submodule_args *args = malloc(sizeof(*args));
+
+		if (ce_stage(ce))
+			args->sha1 = xstrdup(sha1_to_hex(null_sha1));
+		else
+			args->sha1 = xstrdup(sha1_to_hex(ce->sha1));
+
+		strbuf_reset(&sb);
+		strbuf_addf(&sb, "%s/.git", ce->name);
+		if (!file_exists(sb.buf))
+			continue;
+
+		args->path = ce->name;
+		sub = submodule_from_path(null_sha1, args->path);
+		if (!sub)
+			die("No submodule mapping found in .gitmodules for path '%s'", args->path);
+
+		args->name = sub->name;
+		args->toplevel = xstrdup(xgetcwd());
+		args->cmd = argv;
+		args->mutex = mutex;
+		args->prefix = alternative_path;
+		add_task(aq, run_cmd_submodule, args);
+	}
+
+	finish_task_queue(aq);
+	return 0;
+}
+
 int cmd_submodule__helper(int argc, const char **argv, const char *prefix)
 {
 	if (argc < 2)
@@ -289,6 +429,9 @@ int cmd_submodule__helper(int argc, const char **argv, const char *prefix)
 	if (!strcmp(argv[1], "module_clone"))
 		return module_clone(argc - 1, argv + 1, prefix);
 
+	if (!strcmp(argv[1], "foreach"))
+		return module_foreach_parallel(argc - 1, argv + 1, prefix);
+
 usage:
-	usage("git submodule--helper [module_list module_name module_clone]\n");
+	usage("git submodule--helper [module_list module_name module_clone foreach]\n");
 }
diff --git a/git-submodule.sh b/git-submodule.sh
index fb5155e..fa18434 100755
--- a/git-submodule.sh
+++ b/git-submodule.sh
@@ -431,6 +431,15 @@ cmd_foreach()
 }
 
 #
+# Execute an arbitrary command sequence in each checked out
+# submodule in parallel.
+#
+cmd_foreach_parallel()
+{
+	git submodule--helper module_foreach_parallel --prefix "$wt_prefix" $@
+}
+
+#
 # Register submodules in .git/config
 #
 # $@ = requested paths (default to all)
-- 
2.5.0.264.g01b5c38.dirty

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* Re: [RFC PATCH 2/3] run-commands: add an async queue processor
  2015-08-21  1:40 ` [RFC PATCH 2/3] run-commands: add an async queue processor Stefan Beller
@ 2015-08-21 19:05   ` Junio C Hamano
  2015-08-21 19:44     ` Jeff King
  2015-08-21 19:45     ` Stefan Beller
  0 siblings, 2 replies; 16+ messages in thread
From: Junio C Hamano @ 2015-08-21 19:05 UTC (permalink / raw)
  To: Stefan Beller
  Cc: git, jrnieder, Johannes Sixt, Jeff King, hvoigt, jens.lehmann

Stefan Beller <sbeller@google.com> writes:

> This adds functionality to do work in parallel.
>
> The whole life cycle of such a thread pool would look like
>
>     struct task_queue * tq = create_task_queue(32); // no of threads
>     for (...)
>         add_task(tq, process_one_item_function, item); // non blocking
>     ...
>     int ret = finish_task_queue(tq); // blocks until all tasks are done
>     if (!tq)
>         die ("Not all items were be processed");
>
> The caller must take care of handling the output.
>
> Signed-off-by: Stefan Beller <sbeller@google.com>
> ---
>
> I sent this a while ago to the list, no comments on it :(

The primary reason I suspect is because you sent to a wrong set of
people.  Submodule folks have largely been working in the scripted
ones, and may not necessarily be the ones who are most familiar with
the run-command infrastructure.

"shortlog --no-merges" tells me that the obvious suspects are j6t
and peff.

> The core functionality stayed the same, but I hope to improved naming and
> location of the code.
>
> The WIP is only for the NO_PTHREADS case.

>  run-command.c | 212 ++++++++++++++++++++++++++++++++++++++++++++++++++++++----
>  run-command.h |  30 +++++++++
>  2 files changed, 230 insertions(+), 12 deletions(-)
>
> diff --git a/run-command.c b/run-command.c
> index 28e1d55..4029011 100644
> --- a/run-command.c
> +++ b/run-command.c
> @@ -4,6 +4,21 @@
>  #include "sigchain.h"
>  #include "argv-array.h"
>  
> +#ifdef NO_PTHREADS
> +
> +#else
> +
> +#include "thread-utils.h"
> +
> +#include <pthread.h>
> +#include <semaphore.h>
> +#include <stdio.h>
> +#include <unistd.h>
> +
> +#endif
> +
> +#include "git-compat-util.h"
> +

This goes against the way we have been organizing the header files.

http://thread.gmane.org/gmane.comp.version-control.git/276241/focus=276265

> @@ -668,6 +683,22 @@ int git_atexit(void (*handler)(void))
>  
>  #endif
>  
> +void setup_main_thread()

void setup_main_thread(void)

> @@ -852,3 +872,171 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint)
>  	close(cmd->out);
>  	return finish_command(cmd);
>  }
> +
> +#ifndef NO_PTHREADS
> +struct job_list {
> +	int (*fct)(struct task_queue *aq, void *task);
> +	void *task;
> +	struct job_list *next;
> +};
> +#endif
> +
> +struct task_queue {
> +#ifndef NO_PTHREADS
> +	/*
> +	 * To avoid deadlocks always aquire the semaphores with lowest priority

acquire.

> +	 * first, priorites are in descending order as listed.
> +	 *
> +	 * The `mutex` is a general purpose lock for modifying data in the async
> +	 * queue, such as adding a new task or adding a return value from
> +	 * an already run task.
> +	 *
> +	 * `workingcount` and `freecount` are opposing semaphores, the sum of
> +	 * their values should equal `max_threads` at any time while the `mutex`
> +	 * is available.
> +	 */
> +	sem_t mutex;
> +	sem_t workingcount;
> +	sem_t freecount;
> +
> +	pthread_t *threads;
> +	unsigned max_threads;
> +
> +	struct job_list *first;
> +	struct job_list *last;
> +#endif
> +	int early_return;
> +};
> +
> +#ifndef NO_PTHREADS
> +
> +static void get_task(struct task_queue *aq,
> +		     int (**fct)(struct task_queue *aq, void *task),
> +		     void **task,
> +		     int *early_return)
> +{
> +	struct job_list *job;
> +
> +	sem_wait(&aq->workingcount);
> +	sem_wait(&aq->mutex);
> +
> +	if (!aq->first)
> +		die("BUG: internal error with dequeuing jobs for threads");
> +	job = aq->first;
> +	*fct = job->fct;
> +	*task = job->task;
> +	aq->early_return |= *early_return;
> +	*early_return = aq->early_return;
> +	aq->first = job->next;
> +	if (!aq->first)
> +		aq->last = NULL;
> +
> +	sem_post(&aq->freecount);
> +	sem_post(&aq->mutex);
> +
> +	free(job);
> +}
> +
> +static void* dispatcher(void *args)

static void *dispatcher(....)

> +{
> +	void *task;
> +	int (*fct)(struct task_queue *aq, void *data);

s/data/task/?

> +	int early_return = 0;
> +	struct task_queue *aq = args;
> +
> +	get_task(aq, &fct, &task, &early_return);
> +	while (fct || early_return != 0) {
> +		early_return = fct(aq, task);
> +		get_task(aq, &fct, &task, &early_return);
> +	}

If the func said "we are done, you may stop dispatching now", do you
still want to do another get_task()?

> +	pthread_exit(0);
> +}
> +#endif
> +
> +struct task_queue *create_task_queue(unsigned max_threads)
> +{
> +	struct task_queue *aq = xmalloc(sizeof(*aq));
> +
> +#ifndef NO_PTHREADS
> +	int i;
> +	if (!max_threads)
> +		aq->max_threads = online_cpus();
> +	else
> +		aq->max_threads = max_threads;
> +
> +	sem_init(&aq->mutex, 0, 1);
> +	sem_init(&aq->workingcount, 0, 0);
> +	sem_init(&aq->freecount, 0, aq->max_threads);
> +	aq->threads = xmalloc(aq->max_threads * sizeof(pthread_t));
> +
> +	for (i = 0; i < aq->max_threads; i++)
> +		pthread_create(&aq->threads[i], 0, &dispatcher, aq);
> +
> +	aq->first = NULL;
> +	aq->last = NULL;


Shouldn't these be initialized before letting threads call into
dispatcher?  The workingcount semaphore that is initialized to 0 may
prevent them from peeking into these pointers and barfing, but still...

> +
> +	setup_main_thread();
> +#endif
> +	aq->early_return = 0;
> +
> +	return aq;
> +}

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [WIP/PATCH 3/3] submodule: helper to run foreach in parallel
  2015-08-21  1:40 ` [WIP/PATCH 3/3] submodule: helper to run foreach in parallel Stefan Beller
@ 2015-08-21 19:23   ` Junio C Hamano
  2015-08-21 20:21     ` Stefan Beller
  0 siblings, 1 reply; 16+ messages in thread
From: Junio C Hamano @ 2015-08-21 19:23 UTC (permalink / raw)
  To: Stefan Beller; +Cc: git, jrnieder, hvoigt, jens.lehmann

Stefan Beller <sbeller@google.com> writes:

> +struct submodule_args {
> +	const char *name;
> +	const char *path;
> +	const char *sha1;
> +	const char *toplevel;
> +	const char *prefix;
> +	const char **cmd;
> +	struct submodule_output *out;
> +	sem_t *mutex;
> +};

I do not see what submodule_output looks like in the patch, but I
guess you are capturing _all_ output from a task and emitting
everything at the end, after the task is done?

I have to wonder if that is what people would expect and more
importantly if that is the most useful.  I am sympathetic to the
desire to avoid the output totally mixed up from different processes
emitting things in an uncoordinated manner, and "slurp everything
and then show everything in one go" is certainly _one_ way to do so,
but early feedback also counts.  Besides, because the order in which
tasks are dispatched and completed is unpredictable, you cannot
expect a machine parseable output _without_ assuming some help from
the command invoked by each task (e.g. by prefixing the task's output
with some string that identifies which submodule the output is about).

Once you assume that the command is _aware_ that it needs to help
the foreach-parallel infrastructure so that the user can sift their
collective outputs to make sense of them, wouldn't a line-buffered
intermixing also acceptable, and wouldn't it be a much lower impact
approach to solve the same problem?

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [RFC PATCH 2/3] run-commands: add an async queue processor
  2015-08-21 19:05   ` Junio C Hamano
@ 2015-08-21 19:44     ` Jeff King
  2015-08-21 19:48       ` Stefan Beller
                         ` (2 more replies)
  2015-08-21 19:45     ` Stefan Beller
  1 sibling, 3 replies; 16+ messages in thread
From: Jeff King @ 2015-08-21 19:44 UTC (permalink / raw)
  To: Junio C Hamano
  Cc: Stefan Beller, git, jrnieder, Johannes Sixt, hvoigt, jens.lehmann

On Fri, Aug 21, 2015 at 12:05:13PM -0700, Junio C Hamano wrote:

> The primary reason I suspect is because you sent to a wrong set of
> people.  Submodule folks have largely been working in the scripted
> ones, and may not necessarily be the ones who are most familiar with
> the run-command infrastructure.
> 
> "shortlog --no-merges" tells me that the obvious suspects are j6t
> and peff.

No good deed goes unpunished. ;)

Before even looking at the implementation, my first question would be
whether this pattern is applicable in several places in git (i.e., is it
worth the extra complexity of abstracting out in the first place). I
think there are a few task-queue patterns already in git; for example
the delta search in pack-objects. Is the interface given here sufficient
to convert pack-objects? Is the result nicer to read? Is it as
efficient?

We do not need to convert all possible call-sites to the new abstracted
code at once. But I find that converting at least _one_ is a good litmus
test to confirm that a new interface is generally useful.

-Peff

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [RFC PATCH 2/3] run-commands: add an async queue processor
  2015-08-21 19:05   ` Junio C Hamano
  2015-08-21 19:44     ` Jeff King
@ 2015-08-21 19:45     ` Stefan Beller
  2015-08-21 20:47       ` Junio C Hamano
  1 sibling, 1 reply; 16+ messages in thread
From: Stefan Beller @ 2015-08-21 19:45 UTC (permalink / raw)
  To: Junio C Hamano
  Cc: git, Jonathan Nieder, Johannes Sixt, Jeff King, Heiko Voigt,
	Jens Lehmann

On Fri, Aug 21, 2015 at 12:05 PM, Junio C Hamano <gitster@pobox.com> wrote:
> Stefan Beller <sbeller@google.com> writes:
>
>> This adds functionality to do work in parallel.
>>
>> The whole life cycle of such a thread pool would look like
>>
>>     struct task_queue * tq = create_task_queue(32); // no of threads
>>     for (...)
>>         add_task(tq, process_one_item_function, item); // non blocking
>>     ...
>>     int ret = finish_task_queue(tq); // blocks until all tasks are done
>>     if (!tq)
>>         die ("Not all items were be processed");
>>
>> The caller must take care of handling the output.
>>
>> Signed-off-by: Stefan Beller <sbeller@google.com>
>> ---
>>
>> I sent this a while ago to the list, no comments on it :(
>
> The primary reason I suspect is because you sent to a wrong set of
> people.  Submodule folks have largely been working in the scripted
> ones, and may not necessarily be the ones who are most familiar with
> the run-command infrastructure.
>
> "shortlog --no-merges" tells me that the obvious suspects are j6t
> and peff.

noted.

>
>> The core functionality stayed the same, but I hope to improved naming and
>> location of the code.
>>
>> The WIP is only for the NO_PTHREADS case.
>
>>  run-command.c | 212 ++++++++++++++++++++++++++++++++++++++++++++++++++++++----
>>  run-command.h |  30 +++++++++
>>  2 files changed, 230 insertions(+), 12 deletions(-)
>>
>> diff --git a/run-command.c b/run-command.c
>> index 28e1d55..4029011 100644
>> --- a/run-command.c
>> +++ b/run-command.c
>> @@ -4,6 +4,21 @@
>>  #include "sigchain.h"
>>  #include "argv-array.h"
>>
>> +#ifdef NO_PTHREADS
>> +
>> +#else
>> +
>> +#include "thread-utils.h"
>> +
>> +#include <pthread.h>
>> +#include <semaphore.h>
>> +#include <stdio.h>
>> +#include <unistd.h>
>> +
>> +#endif
>> +
>> +#include "git-compat-util.h"
>> +
>
> This goes against the way we have been organizing the header files.
>
> http://thread.gmane.org/gmane.comp.version-control.git/276241/focus=276265

ok
>
>> @@ -668,6 +683,22 @@ int git_atexit(void (*handler)(void))
>>
>>  #endif
>>
>> +void setup_main_thread()
>
> void setup_main_thread(void)
>
>> @@ -852,3 +872,171 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint)
>>       close(cmd->out);
>>       return finish_command(cmd);
>>  }
>> +
>> +#ifndef NO_PTHREADS
>> +struct job_list {
>> +     int (*fct)(struct task_queue *aq, void *task);
>> +     void *task;
>> +     struct job_list *next;
>> +};
>> +#endif
>> +
>> +struct task_queue {
>> +#ifndef NO_PTHREADS
>> +     /*
>> +      * To avoid deadlocks always aquire the semaphores with lowest priority
>
> acquire.
>
>> +      * first, priorites are in descending order as listed.
>> +      *
>> +      * The `mutex` is a general purpose lock for modifying data in the async
>> +      * queue, such as adding a new task or adding a return value from
>> +      * an already run task.
>> +      *
>> +      * `workingcount` and `freecount` are opposing semaphores, the sum of
>> +      * their values should equal `max_threads` at any time while the `mutex`
>> +      * is available.
>> +      */
>> +     sem_t mutex;
>> +     sem_t workingcount;
>> +     sem_t freecount;
>> +
>> +     pthread_t *threads;
>> +     unsigned max_threads;
>> +
>> +     struct job_list *first;
>> +     struct job_list *last;
>> +#endif
>> +     int early_return;
>> +};
>> +
>> +#ifndef NO_PTHREADS
>> +
>> +static void get_task(struct task_queue *aq,
>> +                  int (**fct)(struct task_queue *aq, void *task),
>> +                  void **task,
>> +                  int *early_return)
>> +{
>> +     struct job_list *job;
>> +
>> +     sem_wait(&aq->workingcount);
>> +     sem_wait(&aq->mutex);
>> +
>> +     if (!aq->first)
>> +             die("BUG: internal error with dequeuing jobs for threads");
>> +     job = aq->first;
>> +     *fct = job->fct;
>> +     *task = job->task;
>> +     aq->early_return |= *early_return;
>> +     *early_return = aq->early_return;
>> +     aq->first = job->next;
>> +     if (!aq->first)
>> +             aq->last = NULL;
>> +
>> +     sem_post(&aq->freecount);
>> +     sem_post(&aq->mutex);
>> +
>> +     free(job);
>> +}
>> +
>> +static void* dispatcher(void *args)
>
> static void *dispatcher(....)
>
>> +{
>> +     void *task;
>> +     int (*fct)(struct task_queue *aq, void *data);
>
> s/data/task/?
>
>> +     int early_return = 0;
>> +     struct task_queue *aq = args;
>> +
>> +     get_task(aq, &fct, &task, &early_return);
>> +     while (fct || early_return != 0) {
>> +             early_return = fct(aq, task);
>> +             get_task(aq, &fct, &task, &early_return);
>> +     }
>
> If the func said "we are done, you may stop dispatching now", do you
> still want to do another get_task()?

This question shows me I messed up readability of this. `get_task` both gets
a new task as well as taking the value from early_return writing it to
aq->early_return,
such that other threads are notified.

Maybe I should do that explicitely here and not get the new task.

>
>> +     pthread_exit(0);
>> +}
>> +#endif
>> +
>> +struct task_queue *create_task_queue(unsigned max_threads)
>> +{
>> +     struct task_queue *aq = xmalloc(sizeof(*aq));
>> +
>> +#ifndef NO_PTHREADS
>> +     int i;
>> +     if (!max_threads)
>> +             aq->max_threads = online_cpus();
>> +     else
>> +             aq->max_threads = max_threads;
>> +
>> +     sem_init(&aq->mutex, 0, 1);
>> +     sem_init(&aq->workingcount, 0, 0);
>> +     sem_init(&aq->freecount, 0, aq->max_threads);
>> +     aq->threads = xmalloc(aq->max_threads * sizeof(pthread_t));
>> +
>> +     for (i = 0; i < aq->max_threads; i++)
>> +             pthread_create(&aq->threads[i], 0, &dispatcher, aq);
>> +
>> +     aq->first = NULL;
>> +     aq->last = NULL;
>
>
> Shouldn't these be initialized before letting threads call into
> dispatcher?  The workingcount semaphore that is initialized to 0 may
> prevent them from peeking into these pointers and barfing, but still...

They are initialized to NULL as the empty queue doesn't need a
container element.
Do we do queues in another way usually?

>
>> +
>> +     setup_main_thread();
>> +#endif
>> +     aq->early_return = 0;
>> +
>> +     return aq;
>> +}

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [RFC PATCH 2/3] run-commands: add an async queue processor
  2015-08-21 19:44     ` Jeff King
@ 2015-08-21 19:48       ` Stefan Beller
  2015-08-21 19:51         ` Jeff King
  2015-08-21 20:41       ` Junio C Hamano
  2015-08-21 23:40       ` Stefan Beller
  2 siblings, 1 reply; 16+ messages in thread
From: Stefan Beller @ 2015-08-21 19:48 UTC (permalink / raw)
  To: Jeff King
  Cc: Junio C Hamano, git, Jonathan Nieder, Johannes Sixt, Heiko Voigt,
	Jens Lehmann

On Fri, Aug 21, 2015 at 12:44 PM, Jeff King <peff@peff.net> wrote:
> On Fri, Aug 21, 2015 at 12:05:13PM -0700, Junio C Hamano wrote:
>
>> The primary reason I suspect is because you sent to a wrong set of
>> people.  Submodule folks have largely been working in the scripted
>> ones, and may not necessarily be the ones who are most familiar with
>> the run-command infrastructure.
>>
>> "shortlog --no-merges" tells me that the obvious suspects are j6t
>> and peff.
>
> No good deed goes unpunished. ;)
>
> Before even looking at the implementation, my first question would be
> whether this pattern is applicable in several places in git (i.e., is it
> worth the extra complexity of abstracting out in the first place). I
> think there are a few task-queue patterns already in git; for example
> the delta search in pack-objects. Is the interface given here sufficient
> to convert pack-objects? Is the result nicer to read? Is it as
> efficient?
>
> We do not need to convert all possible call-sites to the new abstracted
> code at once. But I find that converting at least _one_ is a good litmus
> test to confirm that a new interface is generally useful.

Ok, I'll head off to convert one place.

>
> -Peff

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [RFC PATCH 2/3] run-commands: add an async queue processor
  2015-08-21 19:48       ` Stefan Beller
@ 2015-08-21 19:51         ` Jeff King
  2015-08-21 20:12           ` Stefan Beller
  0 siblings, 1 reply; 16+ messages in thread
From: Jeff King @ 2015-08-21 19:51 UTC (permalink / raw)
  To: Stefan Beller
  Cc: Junio C Hamano, git, Jonathan Nieder, Johannes Sixt, Heiko Voigt,
	Jens Lehmann

On Fri, Aug 21, 2015 at 12:48:23PM -0700, Stefan Beller wrote:

> > Before even looking at the implementation, my first question would be
> > whether this pattern is applicable in several places in git (i.e., is it
> > worth the extra complexity of abstracting out in the first place). I
> > think there are a few task-queue patterns already in git; for example
> > the delta search in pack-objects. Is the interface given here sufficient
> > to convert pack-objects? Is the result nicer to read? Is it as
> > efficient?
> >
> > We do not need to convert all possible call-sites to the new abstracted
> > code at once. But I find that converting at least _one_ is a good litmus
> > test to confirm that a new interface is generally useful.
> 
> Ok, I'll head off to convert one place.

Thanks. By the way, reading over what I wrote, it sounds a little
harsher than I meant. It is not "if you do not convert an existing site,
we cannot accept a new interface, period". But I would like at least
some explanation as an alternative, like "I'm pretty sure we can convert
site X to this, but it is not a good time to do so now because of Y".
Where "Y" might be "we need to do this other refactoring work first", or
"it would be disruptive to other work in the area".

-Peff

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [RFC PATCH 2/3] run-commands: add an async queue processor
  2015-08-21 19:51         ` Jeff King
@ 2015-08-21 20:12           ` Stefan Beller
  0 siblings, 0 replies; 16+ messages in thread
From: Stefan Beller @ 2015-08-21 20:12 UTC (permalink / raw)
  To: Jeff King
  Cc: Junio C Hamano, git, Jonathan Nieder, Johannes Sixt, Heiko Voigt,
	Jens Lehmann

On Fri, Aug 21, 2015 at 12:51 PM, Jeff King <peff@peff.net> wrote:
> On Fri, Aug 21, 2015 at 12:48:23PM -0700, Stefan Beller wrote:
>
>> > Before even looking at the implementation, my first question would be
>> > whether this pattern is applicable in several places in git (i.e., is it
>> > worth the extra complexity of abstracting out in the first place). I
>> > think there are a few task-queue patterns already in git; for example
>> > the delta search in pack-objects. Is the interface given here sufficient
>> > to convert pack-objects? Is the result nicer to read? Is it as
>> > efficient?
>> >
>> > We do not need to convert all possible call-sites to the new abstracted
>> > code at once. But I find that converting at least _one_ is a good litmus
>> > test to confirm that a new interface is generally useful.
>>
>> Ok, I'll head off to convert one place.
>
> Thanks. By the way, reading over what I wrote, it sounds a little
> harsher than I meant.

It did not sound harsh at all. I was just reading an internal mailing list,
which cites over generalizing as a very bad practice worse than
premature optimization, so I totally understand your concern and
agree.

> It is not "if you do not convert an existing site,
> we cannot accept a new interface, period". But I would like at least
> some explanation as an alternative, like "I'm pretty sure we can convert
> site X to this, but it is not a good time to do so now because of Y".
> Where "Y" might be "we need to do this other refactoring work first", or
> "it would be disruptive to other work in the area".
>
> -Peff

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [WIP/PATCH 3/3] submodule: helper to run foreach in parallel
  2015-08-21 19:23   ` Junio C Hamano
@ 2015-08-21 20:21     ` Stefan Beller
  0 siblings, 0 replies; 16+ messages in thread
From: Stefan Beller @ 2015-08-21 20:21 UTC (permalink / raw)
  To: Junio C Hamano; +Cc: git, Jonathan Nieder, Heiko Voigt, Jens Lehmann

On Fri, Aug 21, 2015 at 12:23 PM, Junio C Hamano <gitster@pobox.com> wrote:
> Stefan Beller <sbeller@google.com> writes:
>
>> +struct submodule_args {
>> +     const char *name;
>> +     const char *path;
>> +     const char *sha1;
>> +     const char *toplevel;
>> +     const char *prefix;
>> +     const char **cmd;
>> +     struct submodule_output *out;
>> +     sem_t *mutex;
>> +};
>
> I do not see what submodule_output looks like in the patch, but I
> guess you are capturing _all_ output from a task and emitting
> everything at the end, after the task is done?

It was a leftover and will be removed. Sorry for wasting your time.
So jrnieder and me had a discussion on how to do get the output right.

And we had 2 designs:

1) The worker thread acquires a mutex to be allowed to write to
  stdout. This makes early output easy for each thread in case of
  error messages. It also requires less resources than 2).
  Also the expected code complexity is lower as the decisions
  are made locally.

2) Have a single handler which deals with all output of all tasks.
  This handler would `select` on a huge number of pipes from
  all the tasks, (so we would require a lot of pipes). And this central
  handler (which would be submodule_output in this case) would
  take care of having no intermingled racy output from tasks, with
  prefixes and all bells and whistles. This handler could also
  give a progress meter (300 out of 500 submodules updated already)
  for all threads, or when the last task is running switch to piping
  the output of that task to stdout in real time, so you'd get progress
  of the last task as you're already used to.

The outcome of the discussion was to split the worker pool/load
distribution from the output handling in any case as there is no need
to couple them. In my very first designs I had the output handling
as part of the asynchronous worker pool.

This RFC implements 1), that's why there is only the mutex.
I plan on enhancing this to let the last task output in real time (no buffering)
as well as another counter "task n/m completed" after each task
is done.

>
> I have to wonder if that is what people would expect and more
> importantly if that is the most useful.  I am sympathetic to the
> desire to avoid the output totally mixed up from different processes
> emitting things in an uncoordinated manner, and "slurp everything
> and then show everything in one go" is certainly _one_ way to do so,
> but early feedback also counts.  Besides, because the order in which
> tasks are dispatched and completed is unpredictable, you cannot
> expect a machine parseable output _without_ assuming some help from
> the command invoked by each task (e.g. by prefixing the task's output
> with some string that identifies which submodule the output is about).

I was very focused on the "submodule foreach" output, which (in case of
no -q given), displays a

    "Entering %s"

as the first line for each finished task. I suspect that is not enough to
make it a good machine parseable output. So I will prefix each
line with a running number of the task. Maybe like this:

  [001/500] Entering 'foo/bar'
  [001/500] Here goes the text for task foo/bar
  [001/500] Here goes another text for task foo/bar
  [002/500] Entering 'foo/baz'
  [002/500] Here goes the text for task foo/baz
  [002/500] Here goes another text for task foo/bar
  [003/500] Entering 'foo/bla'
  ...

This will make the output for each task distinguishable from
other tasks as well offering some sort of progress meter.
(The fewer submodules you have the less fine grained
the progress bar becomes)

Jonathan suggested to add a capability to the git protocol for a
machine readable progress meter in another channel. so we do not
need to parse the current output Counting/Compressing/Transfer/etc

>
> Once you assume that the command is _aware_ that it needs to help
> the foreach-parallel infrastructure so that the user can sift their
> collective outputs to make sense of them, wouldn't a line-buffered
> intermixing also acceptable, and wouldn't it be a much lower impact
> approach to solve the same problem?

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [RFC PATCH 2/3] run-commands: add an async queue processor
  2015-08-21 19:44     ` Jeff King
  2015-08-21 19:48       ` Stefan Beller
@ 2015-08-21 20:41       ` Junio C Hamano
  2015-08-21 23:40       ` Stefan Beller
  2 siblings, 0 replies; 16+ messages in thread
From: Junio C Hamano @ 2015-08-21 20:41 UTC (permalink / raw)
  To: Jeff King
  Cc: Stefan Beller, git, jrnieder, Johannes Sixt, hvoigt, jens.lehmann

Jeff King <peff@peff.net> writes:

> On Fri, Aug 21, 2015 at 12:05:13PM -0700, Junio C Hamano wrote:
>
>> The primary reason I suspect is because you sent to a wrong set of
>> people.  Submodule folks have largely been working in the scripted
>> ones, and may not necessarily be the ones who are most familiar with
>> the run-command infrastructure.
>> 
>> "shortlog --no-merges" tells me that the obvious suspects are j6t
>> and peff.
>
> No good deed goes unpunished. ;)
>
> Before even looking at the implementation, my first question would be
> whether this pattern is applicable in several places in git (i.e., is it
> worth the extra complexity of abstracting out in the first place). I
> think there are a few task-queue patterns already in git; for example
> the delta search in pack-objects. Is the interface given here sufficient
> to convert pack-objects? Is the result nicer to read? Is it as
> efficient?
>
> We do not need to convert all possible call-sites to the new abstracted
> code at once. But I find that converting at least _one_ is a good litmus
> test to confirm that a new interface is generally useful.

Ah, thanks for saying this.  I recall saying something similar
earlier and totally agree with you.

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [RFC PATCH 2/3] run-commands: add an async queue processor
  2015-08-21 19:45     ` Stefan Beller
@ 2015-08-21 20:47       ` Junio C Hamano
  2015-08-21 20:56         ` Stefan Beller
  0 siblings, 1 reply; 16+ messages in thread
From: Junio C Hamano @ 2015-08-21 20:47 UTC (permalink / raw)
  To: Stefan Beller
  Cc: git, Jonathan Nieder, Johannes Sixt, Jeff King, Heiko Voigt,
	Jens Lehmann

Stefan Beller <sbeller@google.com> writes:

>>> +struct task_queue *create_task_queue(unsigned max_threads)
>>> +{
>>> +     struct task_queue *aq = xmalloc(sizeof(*aq));
>>> +
>>> +#ifndef NO_PTHREADS
>>> +     int i;
>>> +     if (!max_threads)
>>> +             aq->max_threads = online_cpus();
>>> +     else
>>> +             aq->max_threads = max_threads;
>>> +
>>> +     sem_init(&aq->mutex, 0, 1);
>>> +     sem_init(&aq->workingcount, 0, 0);
>>> +     sem_init(&aq->freecount, 0, aq->max_threads);
>>> +     aq->threads = xmalloc(aq->max_threads * sizeof(pthread_t));
>>> +
>>> +     for (i = 0; i < aq->max_threads; i++)
>>> +             pthread_create(&aq->threads[i], 0, &dispatcher, aq);
>>> +
>>> +     aq->first = NULL;
>>> +     aq->last = NULL;
>>
>>
>> Shouldn't these be initialized before letting threads call into
>> dispatcher?  The workingcount semaphore that is initialized to 0 may
>> prevent them from peeking into these pointers and barfing, but still...
>
> They are initialized to NULL as the empty queue doesn't need a
> container element.
> Do we do queues in another way usually?

I do not think we are on the same wavelength.  What I meant was to
do this:

	aq = xmalloc(...);
        set up _everything_ in aq and make it a consistent state;
        /* aq->first and aq->last are part of _everything_ in aq */
        for (many times)
        	pthread_create(...);

	/* No aq->first = aq->last = NULL assignment here */

instead of

	aq = xmalloc(...);
        set up part of aq;
        for (many times)
        	pthread_create(...);
	belatedly initialize aq->first and aq->last and finally
        aq becomes a consistent state.

which is what we see above.  The latter works _only_ because the
threads created are blocked waiting on aq->workingcount which is
initialized to block before threads are created to run dispatch,
and one of the early things dispatch does is to try acquiring that
semaphore to block before accessing aq->first and aq->last.

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [RFC PATCH 2/3] run-commands: add an async queue processor
  2015-08-21 20:47       ` Junio C Hamano
@ 2015-08-21 20:56         ` Stefan Beller
  0 siblings, 0 replies; 16+ messages in thread
From: Stefan Beller @ 2015-08-21 20:56 UTC (permalink / raw)
  To: Junio C Hamano
  Cc: git, Jonathan Nieder, Johannes Sixt, Jeff King, Heiko Voigt,
	Jens Lehmann

On Fri, Aug 21, 2015 at 1:47 PM, Junio C Hamano <gitster@pobox.com> wrote:
>
> I do not think we are on the same wavelength.  What I meant was to
> do this:
>
>         aq = xmalloc(...);
>         set up _everything_ in aq and make it a consistent state;
>         /* aq->first and aq->last are part of _everything_ in aq */
>         for (many times)
>                 pthread_create(...);
>
>         /* No aq->first = aq->last = NULL assignment here */
>
> instead of
>
>         aq = xmalloc(...);
>         set up part of aq;
>         for (many times)
>                 pthread_create(...);
>         belatedly initialize aq->first and aq->last and finally
>         aq becomes a consistent state.
>
> which is what we see above.  The latter works _only_ because the
> threads created are blocked waiting on aq->workingcount which is
> initialized to block before threads are created to run dispatch,
> and one of the early things dispatch does is to try acquiring that
> semaphore to block before accessing aq->first and aq->last.

I see your point and it makes sense to me as it makes the
mental memory model cleaner

I viewed the methods as atomic units (you would not call add_task
before the constructor has finished, so no harm there. And a dispatcher
in the pthread would just block on the `workingcount` semaphore as the
semaphores are the main inter thread communication. I viewed the queue
as just a secondary thing to distribute the work load.

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [RFC PATCH 2/3] run-commands: add an async queue processor
  2015-08-21 19:44     ` Jeff King
  2015-08-21 19:48       ` Stefan Beller
  2015-08-21 20:41       ` Junio C Hamano
@ 2015-08-21 23:40       ` Stefan Beller
  2015-08-24 21:22         ` Junio C Hamano
  2 siblings, 1 reply; 16+ messages in thread
From: Stefan Beller @ 2015-08-21 23:40 UTC (permalink / raw)
  To: Jeff King
  Cc: Junio C Hamano, git, Jonathan Nieder, Johannes Sixt, Heiko Voigt,
	Jens Lehmann

On Fri, Aug 21, 2015 at 12:44 PM, Jeff King <peff@peff.net> wrote:
> On Fri, Aug 21, 2015 at 12:05:13PM -0700, Junio C Hamano wrote:
>
>> The primary reason I suspect is because you sent to a wrong set of
>> people.  Submodule folks have largely been working in the scripted
>> ones, and may not necessarily be the ones who are most familiar with
>> the run-command infrastructure.
>>
>> "shortlog --no-merges" tells me that the obvious suspects are j6t
>> and peff.
>
> No good deed goes unpunished. ;)
>
> Before even looking at the implementation, my first question would be
> whether this pattern is applicable in several places in git (i.e., is it
> worth the extra complexity of abstracting out in the first place). I
> think there are a few task-queue patterns already in git; for example
> the delta search in pack-objects. Is the interface given here sufficient
> to convert pack-objects? Is the result nicer to read? Is it as
> efficient?

I have converted index-pack threading now, and it looks quite easy:

diff --git a/builtin/index-pack.c b/builtin/index-pack.c
index 3f10840..159ee36 100644
--- a/builtin/index-pack.c
+++ b/builtin/index-pack.c
@@ -11,6 +11,7 @@
 #include "exec_cmd.h"
 #include "streaming.h"
 #include "thread-utils.h"
+#include "run-command.h"

 static const char index_pack_usage[] =
 "git index-pack [-v] [-o <index-file>] [--keep | --keep=<msg>]
[--verify] [--strict] (<pack-file> | --stdin [--fix-thin]
[<pack-file>])";
@@ -1075,7 +1076,7 @@ static void resolve_base(struct object_entry *obj)
 }

 #ifndef NO_PTHREADS
-static void *threaded_second_pass(void *data)
+static int threaded_second_pass(struct task_queue *tq, void *data)
 {
        set_thread_data(data);
        for (;;) {
@@ -1096,7 +1097,7 @@ static void *threaded_second_pass(void *data)

                resolve_base(&objects[i]);
        }
-       return NULL;
+       return 0;
 }
 #endif

@@ -1195,18 +1196,18 @@ static void resolve_deltas(void)
                                          nr_ref_deltas + nr_ofs_deltas);

 #ifndef NO_PTHREADS
-       nr_dispatched = 0;
        if (nr_threads > 1 || getenv("GIT_FORCE_THREADS")) {
+               nr_dispatched = 0;
                init_thread();
-               for (i = 0; i < nr_threads; i++) {
-                       int ret = pthread_create(&thread_data[i].thread, NULL,
-                                                threaded_second_pass,
thread_data + i);
-                       if (ret)
-                               die(_("unable to create thread: %s"),
-                                   strerror(ret));
-               }
+
+               tq = create_task_queue(nr_threads);
+
                for (i = 0; i < nr_threads; i++)
-                       pthread_join(thread_data[i].thread, NULL);
+                       add_task(tq, threaded_second_pass, thread_data + i);
+
+               if (finish_task_queue(tq))
+                       die("Not all threads have finished");
+
                cleanup_thread();
                return;
        }
---
(tests pass)
This was cheating as I picked to convert index-pack as opposed to upload-pack
(index-pack is very similar to a queued workload. This was just moving
the thread
creation into the new proposed queue processor.)

I realize now this can be adapted a bit more, to show off the queue features
but would require a larger rewrite. So instead of just creating the threads and
then locking, we get rid of the worker lock like this:

diff --git a/builtin/index-pack.c b/builtin/index-pack.c
index 3f10840..797efea 100644
--- a/builtin/index-pack.c
+++ b/builtin/index-pack.c
@@ -11,6 +11,7 @@
 #include "exec_cmd.h"
 #include "streaming.h"
 #include "thread-utils.h"
+#include "run-command.h"

 static const char index_pack_usage[] =
 "git index-pack [-v] [-o <index-file>] [--keep | --keep=<msg>]
[--verify] [--strict] (<pack-file> | --stdin [--fix-thin]
[<pack-file>])";
@@ -106,10 +107,6 @@ static pthread_mutex_t counter_mutex;
 #define counter_lock() lock_mutex(&counter_mutex)
 #define counter_unlock() unlock_mutex(&counter_mutex)

-static pthread_mutex_t work_mutex;
-#define work_lock() lock_mutex(&work_mutex)
-#define work_unlock() unlock_mutex(&work_mutex)
-
 static pthread_mutex_t deepest_delta_mutex;
 #define deepest_delta_lock() lock_mutex(&deepest_delta_mutex)
 #define deepest_delta_unlock() unlock_mutex(&deepest_delta_mutex)
@@ -140,7 +137,6 @@ static void init_thread(void)
  int i;
  init_recursive_mutex(&read_mutex);
  pthread_mutex_init(&counter_mutex, NULL);
- pthread_mutex_init(&work_mutex, NULL);
  pthread_mutex_init(&type_cas_mutex, NULL);
  if (show_stat)
  pthread_mutex_init(&deepest_delta_mutex, NULL);
@@ -163,7 +159,6 @@ static void cleanup_thread(void)
  threads_active = 0;
  pthread_mutex_destroy(&read_mutex);
  pthread_mutex_destroy(&counter_mutex);
- pthread_mutex_destroy(&work_mutex);
  pthread_mutex_destroy(&type_cas_mutex);
  if (show_stat)
  pthread_mutex_destroy(&deepest_delta_mutex);
@@ -181,9 +176,6 @@ static void cleanup_thread(void)
 #define counter_lock()
 #define counter_unlock()

-#define work_lock()
-#define work_unlock()
-
 #define deepest_delta_lock()
 #define deepest_delta_unlock()

@@ -1075,28 +1067,24 @@ static void resolve_base(struct object_entry *obj)
 }

 #ifndef NO_PTHREADS
-static void *threaded_second_pass(void *data)
+static int threaded_second_pass(struct task_queue *tq, void *data)
 {
- set_thread_data(data);
- for (;;) {
- int i;
- counter_lock();
- display_progress(progress, nr_resolved_deltas);
- counter_unlock();
- work_lock();
- while (nr_dispatched < nr_objects &&
-       is_delta_type(objects[nr_dispatched].type))
- nr_dispatched++;
- if (nr_dispatched >= nr_objects) {
- work_unlock();
- break;
- }
- i = nr_dispatched++;
- work_unlock();
+ if (!get_thread_data()) {
+ struct thread_local *t = xmalloc(sizeof(*t));
+ t->pack_fd = open(curr_pack, O_RDONLY);
+ if (t->pack_fd == -1)
+ die_errno(_("unable to open %s"), curr_pack);

- resolve_base(&objects[i]);
+ set_thread_data(t);
+ /* TODO: I haven't figured out where to free this memory */
  }
- return NULL;
+
+ resolve_base(data);
+
+ counter_lock();
+ display_progress(progress, nr_resolved_deltas);
+ counter_unlock();
+ return 0;
 }
 #endif

@@ -1195,18 +1183,21 @@ static void resolve_deltas(void)
   nr_ref_deltas + nr_ofs_deltas);

 #ifndef NO_PTHREADS
- nr_dispatched = 0;
  if (nr_threads > 1 || getenv("GIT_FORCE_THREADS")) {
+ struct task_queue *tq;
+
  init_thread();
- for (i = 0; i < nr_threads; i++) {
- int ret = pthread_create(&thread_data[i].thread, NULL,
- threaded_second_pass, thread_data + i);
- if (ret)
- die(_("unable to create thread: %s"),
-    strerror(ret));
- }
- for (i = 0; i < nr_threads; i++)
- pthread_join(thread_data[i].thread, NULL);
+ tq = create_task_queue(nr_threads);
+
+ for (nr_dispatched = 0; nr_dispatched < nr_objects; nr_dispatched++)
+ if (!is_delta_type(objects[nr_dispatched].type))
+ add_task(tq, threaded_second_pass, &objects[nr_dispatched]);
+
+ if (finish_task_queue(tq))
+ die("Not all threads have finished");
+
+ /* Here might be a good place to free the thread local storage caches */
+
  cleanup_thread();
  return;
  }
----

This looks very pleasant to me as we have no lock contention all the time,
with the worker lock, but the main thread can load the work queue easily
while we directly start processing in the worker threads.
This also reduces the lines of code which usually is a good sign.

However there is a memory leak in this second solution.

How do you think these two approaches look like?

(I cannot really say objectively if it's easier to read, as I wrote the code,
so of course it is easy for me)

Why did I not pick upload-pack?
========================

I could not spot easily how to make it a typical queuing problem.
We start in the threads, and once in a while we're like: "Uhg, this
thread has more load than the other, let's shove a bit over there"

So what we would need there is splitting the work in smallest chunks
from the beginning and just load it into the queue via add_task



>
> We do not need to convert all possible call-sites to the new abstracted
> code at once. But I find that converting at least _one_ is a good litmus
> test to confirm that a new interface is generally useful.
>
> -Peff

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* Re: [RFC PATCH 2/3] run-commands: add an async queue processor
  2015-08-21 23:40       ` Stefan Beller
@ 2015-08-24 21:22         ` Junio C Hamano
  0 siblings, 0 replies; 16+ messages in thread
From: Junio C Hamano @ 2015-08-24 21:22 UTC (permalink / raw)
  To: Stefan Beller
  Cc: Jeff King, git, Jonathan Nieder, Johannes Sixt, Heiko Voigt,
	Jens Lehmann

Stefan Beller <sbeller@google.com> writes:

> diff --git a/builtin/index-pack.c b/builtin/index-pack.c
> index 3f10840..159ee36 100644
> --- a/builtin/index-pack.c
> +++ b/builtin/index-pack.c
> @@ -11,6 +11,7 @@
>  #include "exec_cmd.h"
>  #include "streaming.h"
>  #include "thread-utils.h"
> +#include "run-command.h"
>
>  static const char index_pack_usage[] =
>  "git index-pack [-v] [-o <index-file>] [--keep | --keep=<msg>]
> [--verify] [--strict] (<pack-file> | --stdin [--fix-thin]
> [<pack-file>])";
> @@ -1075,7 +1076,7 @@ static void resolve_base(struct object_entry *obj)
>  }
>
>  #ifndef NO_PTHREADS
> -static void *threaded_second_pass(void *data)
> +static int threaded_second_pass(struct task_queue *tq, void *data)
>  {
>         set_thread_data(data);
>         for (;;) {
> @@ -1096,7 +1097,7 @@ static void *threaded_second_pass(void *data)
>
>                 resolve_base(&objects[i]);
>         }
> -       return NULL;
> +       return 0;
>  }
>  #endif
>
> @@ -1195,18 +1196,18 @@ static void resolve_deltas(void)
>                                           nr_ref_deltas + nr_ofs_deltas);
>
>  #ifndef NO_PTHREADS
> -       nr_dispatched = 0;
>         if (nr_threads > 1 || getenv("GIT_FORCE_THREADS")) {
> +               nr_dispatched = 0;
>                 init_thread();
> -               for (i = 0; i < nr_threads; i++) {
> -                       int ret = pthread_create(&thread_data[i].thread, NULL,
> -                                                threaded_second_pass,
> thread_data + i);
> -                       if (ret)
> -                               die(_("unable to create thread: %s"),
> -                                   strerror(ret));
> -               }
> +
> +               tq = create_task_queue(nr_threads);
> +
>                 for (i = 0; i < nr_threads; i++)
> -                       pthread_join(thread_data[i].thread, NULL);
> +                       add_task(tq, threaded_second_pass, thread_data + i);
> +
> +               if (finish_task_queue(tq))
> +                       die("Not all threads have finished");
> +
>                 cleanup_thread();
>                 return;
>         }

This looks quite straight-forward, but that is not too surprising,
as the "dispatcher" side naturally should have a similar logic to
manage threads by creating and joining them ;-)

> @@ -1075,28 +1067,24 @@ static void resolve_base(struct object_entry *obj)
>  }
>
>  #ifndef NO_PTHREADS
> -static void *threaded_second_pass(void *data)
> +static int threaded_second_pass(struct task_queue *tq, void *data)
>  {
> - set_thread_data(data);
> - for (;;) {
> - int i;
> - counter_lock();
> - display_progress(progress, nr_resolved_deltas);
> - counter_unlock();
> - work_lock();
> - while (nr_dispatched < nr_objects &&
> -       is_delta_type(objects[nr_dispatched].type))
> - nr_dispatched++;
> - if (nr_dispatched >= nr_objects) {
> - work_unlock();
> - break;
> - }
> - i = nr_dispatched++;
> - work_unlock();
> + if (!get_thread_data()) {
> + struct thread_local *t = xmalloc(sizeof(*t));
> + t->pack_fd = open(curr_pack, O_RDONLY);
> + if (t->pack_fd == -1)
> + die_errno(_("unable to open %s"), curr_pack);
>
> - resolve_base(&objects[i]);
> + set_thread_data(t);
> + /* TODO: I haven't figured out where to free this memory */

Sorry but it is hard to grok what is going on in the code with
squished indentation.

> Why did I not pick upload-pack?
> ========================
>
> I could not spot easily how to make it a typical queuing problem.
> We start in the threads, and once in a while we're like: "Uhg, this
> thread has more load than the other, let's shove a bit over there"
>
> So what we would need there is splitting the work in smallest chunks
> from the beginning and just load it into the queue via add_task

... or a way for the overload and tasks to communicate with each
other and rebalance.  If I am not mistaken, it has a big negative
consequence for pack-objects to split the work to too small a chunk,
as the chunk boundary also becomes boundary of find delta bases.

^ permalink raw reply	[flat|nested] 16+ messages in thread

end of thread, other threads:[~2015-08-24 21:22 UTC | newest]

Thread overview: 16+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2015-08-21  1:40 [PATCH 1/3] submodule: implement `module_clone` as a builtin helper Stefan Beller
2015-08-21  1:40 ` [RFC PATCH 2/3] run-commands: add an async queue processor Stefan Beller
2015-08-21 19:05   ` Junio C Hamano
2015-08-21 19:44     ` Jeff King
2015-08-21 19:48       ` Stefan Beller
2015-08-21 19:51         ` Jeff King
2015-08-21 20:12           ` Stefan Beller
2015-08-21 20:41       ` Junio C Hamano
2015-08-21 23:40       ` Stefan Beller
2015-08-24 21:22         ` Junio C Hamano
2015-08-21 19:45     ` Stefan Beller
2015-08-21 20:47       ` Junio C Hamano
2015-08-21 20:56         ` Stefan Beller
2015-08-21  1:40 ` [WIP/PATCH 3/3] submodule: helper to run foreach in parallel Stefan Beller
2015-08-21 19:23   ` Junio C Hamano
2015-08-21 20:21     ` Stefan Beller

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.