All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCHv4 00/14] fetch submodules in parallel and a preview on parallel "submodule update"
@ 2015-09-23  1:45 Stefan Beller
  2015-09-23  1:45 ` [PATCHv4 01/14] submodule: Send "Fetching submodule <foo>" to standard error Stefan Beller
                   ` (13 more replies)
  0 siblings, 14 replies; 39+ messages in thread
From: Stefan Beller @ 2015-09-23  1:45 UTC (permalink / raw)
  To: git
  Cc: ramsay, jacob.keller, peff, gitster, jrnieder,
	johannes.schindelin, Jens.Lehmann, vlovich, sunshine,
	Stefan Beller

Today there was lots of discussion on the correct way of reading the
strbufs as well as some discussion on the structure of the
asynchronous parallel process loop.

Patches 1-8 bring parallel fetching of submodules, and have had some good exposure
to review and feedback is incorporated.

Patches 9-14 bring parallel submodule updates.
Patch 14 is not ready yet (i.e. test suite failures), but the cleanups before
in patch 9-13 can be reviewed without wasting time.

Any feedback welcome,
Thanks,
Stefan

Diff to v3 below. The patches can also be found at [1]
[1] https://github.com/stefanbeller/git/tree/submodulec_nonthreaded_parallel_4

Jonathan Nieder (1):
  submodule: Send "Fetching submodule <foo>" to standard error

Stefan Beller (13):
  xread: poll on non blocking fds
  xread_nonblock: add functionality to read from fds without blocking
  strbuf: add strbuf_read_once to read without blocking
  run-command: factor out return value computation
  run-command: add an asynchronous parallel child processor
  fetch_populated_submodules: use new parallel job processing
  submodules: allow parallel fetching, add tests and documentation
  submodule-config: Untangle logic in parse_config
  submodule config: keep update strategy around
  git submodule update: cmd_update_recursive
  git submodule update: cmd_update_clone
  git submodule update: cmd_update_fetch
  Rewrite submodule update in C

 Documentation/fetch-options.txt |   7 +
 builtin/fetch.c                 |   6 +-
 builtin/pull.c                  |   6 +
 builtin/submodule--helper.c     | 251 +++++++++++++++++++++++++++++
 git-compat-util.h               |   1 +
 git-submodule.sh                | 339 ++++++++++++++--------------------------
 run-command.c                   | 320 ++++++++++++++++++++++++++++++++++---
 run-command.h                   |  36 +++++
 strbuf.c                        |  11 ++
 strbuf.h                        |   9 ++
 submodule-config.c              |  85 +++++-----
 submodule-config.h              |   1 +
 submodule.c                     | 120 +++++++++-----
 submodule.h                     |   2 +-
 t/t0061-run-command.sh          |  20 +++
 t/t5526-fetch-submodules.sh     |  70 ++++++---
 test-run-command.c              |  24 +++
 wrapper.c                       |  35 ++++-
 18 files changed, 987 insertions(+), 356 deletions(-)

diff --git a/builtin/submodule--helper.c b/builtin/submodule--helper.c
index baa7563..b79117a 100644
--- a/builtin/submodule--helper.c
+++ b/builtin/submodule--helper.c
@@ -382,10 +382,14 @@ static int update_next_task(void *data,
 		argv_array_pushf(&cp->env_array, "sm_path=%s", sub->path);
 		argv_array_pushf(&cp->env_array, "name=%s", sub->name);
 		argv_array_pushf(&cp->env_array, "url=%s", sub->url);
+		argv_array_pushf(&cp->env_array, "sha1=%s", sha1_to_hex(ce->sha1));
 		argv_array_pushf(&cp->env_array, "update_module=%s", update_module);
 
 		cp->git_cmd = 1;
+		cp->no_stdin = 1;
 		cp->stdout_to_stderr = 1;
+		cp->err = -1;
+		argv_array_init(&cp->args);
 		argv_array_push(&cp->args, "submodule");
 		if (!file_exists(sm_gitdir))
 			argv_array_push(&cp->args, "update_clone");
diff --git a/run-command.c b/run-command.c
index 06d5a5d..494e1f8 100644
--- a/run-command.c
+++ b/run-command.c
@@ -276,8 +276,10 @@ static int wait_or_whine(pid_t pid, const char *argv0)
 		failed_errno = errno;
 		error("waitpid for %s failed: %s", argv0, strerror(errno));
 	} else {
-		if (waiting != pid
-		   || (determine_return_value(status, &code, &failed_errno, argv0) < 0))
+		if (waiting != pid || (determine_return_value(status,
+							      &code,
+							      &failed_errno,
+							      argv0) < 0))
 			error("waitpid is confused (%s)", argv0);
 	}
 
@@ -870,7 +872,6 @@ struct parallel_processes {
 
 	int max_processes;
 	int nr_processes;
-	unsigned all_tasks_started : 1;
 
 	get_next_task_fn get_next_task;
 	start_failure_fn start_failure;
@@ -899,9 +900,9 @@ void default_start_failure(void *data,
 	struct strbuf sb = STRBUF_INIT;
 
 	for (i = 0; cp->argv[i]; i++)
-		strbuf_addf(&sb, "%s ", cp->argv[i]);
+		strbuf_addf(&sb, " %s", cp->argv[i]);
 
-	die_errno("Starting a child failed:\n%s", sb.buf);
+	die_errno("Starting a child failed:%s", sb.buf);
 }
 
 void default_return_value(void *data,
@@ -915,12 +916,12 @@ void default_return_value(void *data,
 		return;
 
 	for (i = 0; cp->argv[i]; i++)
-		strbuf_addf(&sb, "%s ", cp->argv[i]);
+		strbuf_addf(&sb, " %s", cp->argv[i]);
 
-	die_errno("A child failed with return code:\n%s\n%d", sb.buf, result);
+	die_errno("A child failed with return code %d:%s", result, sb.buf);
 }
 
-static void run_processes_parallel_init(struct parallel_processes *pp,
+static void pp_init(struct parallel_processes *pp,
 					int n, void *data,
 					get_next_task_fn get_next_task,
 					start_failure_fn start_failure,
@@ -941,7 +942,6 @@ static void run_processes_parallel_init(struct parallel_processes *pp,
 	pp->return_value = return_value ? return_value : default_return_value;
 
 	pp->nr_processes = 0;
-	pp->all_tasks_started = 0;
 	pp->output_owner = 0;
 	pp->children = xcalloc(n, sizeof(*pp->children));
 	pp->pfd = xcalloc(n, sizeof(*pp->pfd));
@@ -954,9 +954,10 @@ static void run_processes_parallel_init(struct parallel_processes *pp,
 	}
 }
 
-static void run_processes_parallel_cleanup(struct parallel_processes *pp)
+static void pp_cleanup(struct parallel_processes *pp)
 {
 	int i;
+
 	for (i = 0; i < pp->max_processes; i++)
 		strbuf_release(&pp->children[i].err);
 
@@ -976,7 +977,8 @@ static void set_nonblocking(int fd)
 			"output will be degraded");
 }
 
-static void run_processes_parallel_start_one(struct parallel_processes *pp)
+/* returns 1 if a process was started, 0 otherwise */
+static int pp_start_one(struct parallel_processes *pp)
 {
 	int i;
 
@@ -988,10 +990,9 @@ static void run_processes_parallel_start_one(struct parallel_processes *pp)
 
 	if (!pp->get_next_task(pp->data,
 			       &pp->children[i].process,
-			       &pp->children[i].err)) {
-		pp->all_tasks_started = 1;
-		return;
-	}
+			       &pp->children[i].err))
+		return 1;
+
 	if (start_command(&pp->children[i].process))
 		pp->start_failure(pp->data,
 				  &pp->children[i].process,
@@ -1002,23 +1003,17 @@ static void run_processes_parallel_start_one(struct parallel_processes *pp)
 	pp->nr_processes++;
 	pp->children[i].in_use = 1;
 	pp->pfd[i].fd = pp->children[i].process.err;
+	return 0;
 }
 
-static void run_processes_parallel_start_as_needed(struct parallel_processes *pp)
-{
-	while (pp->nr_processes < pp->max_processes &&
-	       !pp->all_tasks_started)
-		run_processes_parallel_start_one(pp);
-}
-
-static void run_processes_parallel_buffer_stderr(struct parallel_processes *pp)
+static void pp_buffer_stderr(struct parallel_processes *pp)
 {
 	int i;
 
 	while ((i = poll(pp->pfd, pp->max_processes, 100)) < 0) {
 		if (errno == EINTR)
 			continue;
-		run_processes_parallel_cleanup(pp);
+		pp_cleanup(pp);
 		die_errno("poll");
 	}
 
@@ -1033,7 +1028,7 @@ static void run_processes_parallel_buffer_stderr(struct parallel_processes *pp)
 	}
 }
 
-static void run_processes_parallel_output(struct parallel_processes *pp)
+static void pp_output(struct parallel_processes *pp)
 {
 	int i = pp->output_owner;
 	if (pp->children[i].in_use &&
@@ -1043,7 +1038,7 @@ static void run_processes_parallel_output(struct parallel_processes *pp)
 	}
 }
 
-static void run_processes_parallel_collect_finished(struct parallel_processes *pp)
+static void pp_collect_finished(struct parallel_processes *pp)
 {
 	int i = 0;
 	pid_t pid;
@@ -1063,17 +1058,11 @@ static void run_processes_parallel_collect_finished(struct parallel_processes *p
 			    pid == pp->children[i].process.pid)
 				break;
 		if (i == pp->max_processes)
-			/*
-			 * waitpid returned another process id
-			 * which we are not waiting for.
-			 */
-			return;
-
-		if (strbuf_read_once(&pp->children[i].err,
-				     pp->children[i].process.err, 0) < 0 &&
-		    errno != EAGAIN)
-			die_errno("strbuf_read_once");
+			die("BUG: found a child process we were not aware of");
 
+		if (strbuf_read(&pp->children[i].err,
+				pp->children[i].process.err, 0) < 0)
+			die_errno("strbuf_read");
 
 		if (determine_return_value(wait_status, &code, &errno,
 					   pp->children[i].process.argv[0]) < 0)
@@ -1122,18 +1111,20 @@ int run_processes_parallel(int n, void *data,
 			   return_value_fn return_value)
 {
 	struct parallel_processes pp;
-	run_processes_parallel_init(&pp, n, data,
-				    get_next_task,
-				    start_failure,
-				    return_value);
-
-	while (!pp.all_tasks_started || pp.nr_processes > 0) {
-		run_processes_parallel_start_as_needed(&pp);
-		run_processes_parallel_buffer_stderr(&pp);
-		run_processes_parallel_output(&pp);
-		run_processes_parallel_collect_finished(&pp);
+	pp_init(&pp, n, data, get_next_task, start_failure, return_value);
+
+	while (1) {
+		while (pp.nr_processes < pp.max_processes &&
+		       !pp_start_one(&pp))
+			; /* nothing */
+		if (!pp.nr_processes)
+			break;
+		pp_buffer_stderr(&pp);
+		pp_output(&pp);
+		pp_collect_finished(&pp);
 	}
-	run_processes_parallel_cleanup(&pp);
+
+	pp_cleanup(&pp);
 
 	return 0;
 }
diff --git a/run-command.h b/run-command.h
index 0c1b363..3807fd1 100644
--- a/run-command.h
+++ b/run-command.h
@@ -155,6 +155,4 @@ int run_processes_parallel(int n, void *data,
 			   start_failure_fn,
 			   return_value_fn);
 
-void run_processes_parallel_schedule_error(struct strbuf *err);
-
 #endif
diff --git a/strbuf.h b/strbuf.h
index 4d4e5b1..ea69665 100644
--- a/strbuf.h
+++ b/strbuf.h
@@ -367,8 +367,11 @@ extern size_t strbuf_fread(struct strbuf *, size_t, FILE *);
 extern ssize_t strbuf_read(struct strbuf *, int fd, size_t hint);
 
 /**
- * Same as strbuf_read, just returns non-blockingly by ignoring EAGAIN.
- * The fd must have set O_NONBLOCK.
+ * Read from a file descriptor that is marked as O_NONBLOCK without
+ * blocking.  Returns the number of new bytes appended to the sb.
+ * Negative return value signals there was an error returned from
+ * underlying read(2), in which case the caller should check errno.
+ * e.g. errno == EAGAIN when the read may have blocked.
  */
 extern ssize_t strbuf_read_once(struct strbuf *, int fd, size_t hint);
 
diff --git a/submodule-config.c b/submodule-config.c
index 0298a60..8b8c7d1 100644
--- a/submodule-config.c
+++ b/submodule-config.c
@@ -258,93 +258,72 @@ static int parse_config(const char *var, const char *value, void *data)
 	if (!name_and_item_from_var(var, &name, &item))
 		return 0;
 
-	submodule = lookup_or_create_by_name(me->cache, me->gitmodules_sha1,
-			name.buf);
+	submodule = lookup_or_create_by_name(me->cache,
+					     me->gitmodules_sha1,
+					     name.buf);
 
 	if (!strcmp(item.buf, "path")) {
-		struct strbuf path = STRBUF_INIT;
-		if (!value) {
+		if (!value)
 			ret = config_error_nonbool(var);
-			goto release_return;
-		}
-		if (!me->overwrite && submodule->path != NULL) {
+		else if (!me->overwrite && submodule->path != NULL)
 			warn_multiple_config(me->commit_sha1, submodule->name,
 					"path");
-			goto release_return;
+		else {
+			if (submodule->path)
+				cache_remove_path(me->cache, submodule);
+			free((void *) submodule->path);
+			submodule->path = xstrdup(value);
+			cache_put_path(me->cache, submodule);
 		}
-
-		if (submodule->path)
-			cache_remove_path(me->cache, submodule);
-		free((void *) submodule->path);
-		strbuf_addstr(&path, value);
-		submodule->path = strbuf_detach(&path, NULL);
-		cache_put_path(me->cache, submodule);
 	} else if (!strcmp(item.buf, "fetchrecursesubmodules")) {
 		/* when parsing worktree configurations we can die early */
 		int die_on_error = is_null_sha1(me->gitmodules_sha1);
 		if (!me->overwrite &&
-		    submodule->fetch_recurse != RECURSE_SUBMODULES_NONE) {
+		    submodule->fetch_recurse != RECURSE_SUBMODULES_NONE)
 			warn_multiple_config(me->commit_sha1, submodule->name,
 					"fetchrecursesubmodules");
-			goto release_return;
-		}
-
-		submodule->fetch_recurse = parse_fetch_recurse(var, value,
+		else
+			submodule->fetch_recurse = parse_fetch_recurse(
+								var, value,
 								die_on_error);
 	} else if (!strcmp(item.buf, "ignore")) {
-		struct strbuf ignore = STRBUF_INIT;
-		if (!me->overwrite && submodule->ignore != NULL) {
+		if (!value)
+			ret = config_error_nonbool(var);
+		else if (!me->overwrite && submodule->ignore != NULL)
 			warn_multiple_config(me->commit_sha1, submodule->name,
 					"ignore");
-			goto release_return;
-		}
-		if (!value) {
-			ret = config_error_nonbool(var);
-			goto release_return;
-		}
-		if (strcmp(value, "untracked") && strcmp(value, "dirty") &&
-		    strcmp(value, "all") && strcmp(value, "none")) {
+		else if (strcmp(value, "untracked") &&
+			 strcmp(value, "dirty") &&
+			 strcmp(value, "all") &&
+			 strcmp(value, "none"))
 			warning("Invalid parameter '%s' for config option "
 					"'submodule.%s.ignore'", value, var);
-			goto release_return;
+		else {
+			free((void *) submodule->ignore);
+			submodule->ignore = xstrdup(value);
 		}
-
-		free((void *) submodule->ignore);
-		strbuf_addstr(&ignore, value);
-		submodule->ignore = strbuf_detach(&ignore, NULL);
 	} else if (!strcmp(item.buf, "url")) {
-		struct strbuf url = STRBUF_INIT;
 		if (!value) {
 			ret = config_error_nonbool(var);
-			goto release_return;
-		}
-		if (!me->overwrite && submodule->url != NULL) {
+		} else if (!me->overwrite && submodule->url != NULL) {
 			warn_multiple_config(me->commit_sha1, submodule->name,
 					"url");
-			goto release_return;
+		} else {
+			free((void *) submodule->url);
+			submodule->url = xstrdup(value);
 		}
-
-		free((void *) submodule->url);
-		strbuf_addstr(&url, value);
-		submodule->url = strbuf_detach(&url, NULL);
 	} else if (!strcmp(item.buf, "update")) {
-		struct strbuf update = STRBUF_INIT;
-		if (!value) {
+		if (!value)
 			ret = config_error_nonbool(var);
-			goto release_return;
-		}
-		if (!me->overwrite && submodule->update != NULL) {
+		else if (!me->overwrite && submodule->update != NULL)
 			warn_multiple_config(me->commit_sha1, submodule->name,
-					"update");
-			goto release_return;
+					     "update");
+		else {
+			free((void *)submodule->update);
+			submodule->update = xstrdup(value);
 		}
-
-		free((void *) submodule->update);
-		strbuf_addstr(&update, value);
-		submodule->update = strbuf_detach(&update, NULL);
 	}
 
-release_return:
 	strbuf_release(&name);
 	strbuf_release(&item);
 
diff --git a/submodule.c b/submodule.c
index d15364f..fdaf3e4 100644
--- a/submodule.c
+++ b/submodule.c
@@ -650,10 +650,12 @@ int fetch_populated_submodules(const struct argv_array *options,
 {
 	int i;
 	struct submodule_parallel_fetch spf = SPF_INIT;
+
 	spf.work_tree = get_git_work_tree();
 	spf.command_line_option = command_line_option;
 	spf.quiet = quiet;
 	spf.prefix = prefix;
+
 	if (!spf.work_tree)
 		goto out;
 
@@ -738,12 +740,11 @@ int get_next_submodule(void *data, struct child_process *cp,
 		if (is_directory(git_dir)) {
 			child_process_init(cp);
 			cp->dir = strbuf_detach(&submodule_path, NULL);
+			cp->env = local_repo_env;
 			cp->git_cmd = 1;
-			cp->no_stdout = 1;
 			cp->no_stdin = 1;
 			cp->stdout_to_stderr = 1;
 			cp->err = -1;
-			cp->env = local_repo_env;
 			if (!spf->quiet)
 				strbuf_addf(err, "Fetching submodule %s%s\n",
 					    spf->prefix, ce->name);
diff --git a/wrapper.c b/wrapper.c
index 54ce231..41a21e1 100644
--- a/wrapper.c
+++ b/wrapper.c
@@ -206,16 +206,10 @@ ssize_t xread(int fd, void *buf, size_t len)
 				continue;
 			if (errno == EAGAIN || errno == EWOULDBLOCK) {
 				struct pollfd pfd;
-				int i;
 				pfd.events = POLLIN;
 				pfd.fd = fd;
-				i = poll(&pfd, 1, 100);
-				if (i < 0) {
-					if (errno == EINTR || errno == ENOMEM)
-						continue;
-					else
-						die_errno("poll");
-				}
+				/* We deliberately ignore the return value */
+				poll(&pfd, 1, -1);
 			}
 		}
 		return nr;
@@ -225,13 +219,13 @@ ssize_t xread(int fd, void *buf, size_t len)
 /*
  * xread_nonblock() is the same a read(), but it automatically restarts read()
  * interrupted operations (EINTR). xread_nonblock() DOES NOT GUARANTEE that
- * "len" bytes is read even if the data is available.
+ * "len" bytes is read. EWOULDBLOCK is turned into EAGAIN.
  */
 ssize_t xread_nonblock(int fd, void *buf, size_t len)
 {
 	ssize_t nr;
 	if (len > MAX_IO_SIZE)
-	    len = MAX_IO_SIZE;
+		len = MAX_IO_SIZE;
 	while (1) {
 		nr = read(fd, buf, len);
 		if (nr < 0) {

-- 
2.5.0.272.ga84127c.dirty

^ permalink raw reply related	[flat|nested] 39+ messages in thread

end of thread, other threads:[~2015-09-25 19:32 UTC | newest]

Thread overview: 39+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2015-09-23  1:45 [PATCHv4 00/14] fetch submodules in parallel and a preview on parallel "submodule update" Stefan Beller
2015-09-23  1:45 ` [PATCHv4 01/14] submodule: Send "Fetching submodule <foo>" to standard error Stefan Beller
2015-09-23  1:45 ` [PATCHv4 02/14] xread: poll on non blocking fds Stefan Beller
2015-09-23  1:45 ` [PATCHv4 03/14] xread_nonblock: add functionality to read from fds without blocking Stefan Beller
2015-09-23  1:45 ` [PATCHv4 04/14] strbuf: add strbuf_read_once to read " Stefan Beller
2015-09-23  1:45 ` [PATCHv4 05/14] run-command: factor out return value computation Stefan Beller
2015-09-23  1:45 ` [PATCHv4 06/14] run-command: add an asynchronous parallel child processor Stefan Beller
2015-09-23  6:29   ` Junio C Hamano
2015-09-23 17:53     ` Stefan Beller
2015-09-23 18:04       ` Junio C Hamano
2015-09-23 19:34         ` Junio C Hamano
2015-09-23 19:39           ` Stefan Beller
2015-09-23 19:47             ` Junio C Hamano
2015-09-23  6:47   ` Junio C Hamano
2015-09-23 14:59     ` Junio C Hamano
2015-09-23 17:54       ` Junio C Hamano
2015-09-23 23:41         ` [PATCHv5] Another squash on " Stefan Beller
2015-09-24  2:17           ` Junio C Hamano
2015-09-24 21:13             ` [PATCH 0/2] " Stefan Beller
2015-09-24 21:13               ` [PATCH 2/2] SQUASH for "fetch_populated_submodules: use new parallel job processing" Stefan Beller
2015-09-24 21:13               ` [PATCH 1/2] SQUASH??? Stefan Beller
2015-09-25  0:49                 ` Junio C Hamano
2015-09-25  1:09                   ` Junio C Hamano
2015-09-25 17:52                   ` Stefan Beller
2015-09-25 17:56                     ` Junio C Hamano
2015-09-25  1:08               ` [PATCH 0/2] Another squash on run-command: add an asynchronous parallel child processor Junio C Hamano
2015-09-25 18:56                 ` Stefan Beller
2015-09-25 19:04                   ` Junio C Hamano
2015-09-25 19:19                     ` Stefan Beller
2015-09-25 19:32                   ` Junio C Hamano
2015-09-23  1:45 ` [PATCHv4 07/14] fetch_populated_submodules: use new parallel job processing Stefan Beller
2015-09-23  1:45 ` [PATCHv4 08/14] submodules: allow parallel fetching, add tests and documentation Stefan Beller
2015-09-23  1:45 ` [PATCHv4 09/14] submodule-config: Untangle logic in parse_config Stefan Beller
2015-09-23  1:45 ` [PATCHv4 10/14] submodule config: keep update strategy around Stefan Beller
2015-09-23  1:45 ` [PATCHv4 11/14] git submodule update: cmd_update_recursive Stefan Beller
2015-09-23  1:45 ` [PATCHv4 12/14] git submodule update: cmd_update_clone Stefan Beller
2015-09-23 20:13   ` Junio C Hamano
2015-09-23  1:45 ` [PATCHv4 13/14] git submodule update: cmd_update_fetch Stefan Beller
2015-09-23  1:45 ` [PATCHv4 14/14] Rewrite submodule update in C Stefan Beller

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.