git.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH 0/8] Rerolling sb/submodule-parallel-fetch for the time after 2.7
@ 2015-12-14 19:37 Stefan Beller
  2015-12-14 19:37 ` [PATCH 1/8] submodule.c: write "Fetching submodule <foo>" to stderr Stefan Beller
                   ` (8 more replies)
  0 siblings, 9 replies; 31+ messages in thread
From: Stefan Beller @ 2015-12-14 19:37 UTC (permalink / raw)
  To: sbeller, git
  Cc: peff, gitster, jrnieder, johannes.schindelin, Jens.Lehmann,
	ericsunshine, j6t

I am sending out a new version for replacing sb/submodule-parallel-fetch for
the time after the 2.7 release.

The content are 
 * all patches as in the branch sb/submodule-parallel-fetch
 * inlcuding the fixups as suggested by Hannes, 
 * write a message to the debug log for better testing and debugging purposes
  (a patch cherry picked from the series which is supposed to build on top of this)

The patches themselves were rebased such that there are no fixup commits
any more, but we get things right the first time.

The commit message of "run-command: add an asynchronous parallel child processor"
has slightly been updated to mention the fact that we don't want to use waitpid(-1)
but rather use the assumption of child's stderr living as long as the child itself.

Thanks,
Stefan


Jonathan Nieder (1):
  submodule.c: write "Fetching submodule <foo>" to stderr

Stefan Beller (7):
  xread: poll on non blocking fds
  xread_nonblock: add functionality to read from fds without blocking
  strbuf: add strbuf_read_once to read without blocking
  sigchain: add command to pop all common signals
  run-command: add an asynchronous parallel child processor
  fetch_populated_submodules: use new parallel job processing
  submodules: allow parallel fetching, add tests and documentation

 Documentation/fetch-options.txt |   7 +
 builtin/fetch.c                 |   6 +-
 builtin/pull.c                  |   6 +
 git-compat-util.h               |   1 +
 run-command.c                   | 335 ++++++++++++++++++++++++++++++++++++++++
 run-command.h                   |  80 ++++++++++
 sigchain.c                      |   9 ++
 sigchain.h                      |   1 +
 strbuf.c                        |  11 ++
 strbuf.h                        |   8 +
 submodule.c                     | 141 +++++++++++------
 submodule.h                     |   2 +-
 t/t0061-run-command.sh          |  53 +++++++
 t/t5526-fetch-submodules.sh     |  71 ++++++---
 test-run-command.c              |  55 ++++++-
 wrapper.c                       |  35 ++++-
 16 files changed, 747 insertions(+), 74 deletions(-)

-- 
2.6.4.443.ge094245.dirty

^ permalink raw reply	[flat|nested] 31+ messages in thread
* [PATCH 0/8] fetch submodules in parallel
@ 2015-09-28 23:13 Stefan Beller
  2015-09-28 23:14 ` [PATCH 2/8] xread: poll on non blocking fds Stefan Beller
  0 siblings, 1 reply; 31+ messages in thread
From: Stefan Beller @ 2015-09-28 23:13 UTC (permalink / raw)
  To: git
  Cc: Stefan Beller, ramsay, jacob.keller, peff, gitster, jrnieder,
	johannes.schindelin, Jens.Lehmann, ericsunshine

Changes to v4: (diff below)
* Some functions wanted to be static (Thanks Ramsay!)
* The patch to factor out return code handling has been dropped as
  the return code handling is slightly different in finish_command and
  the parallel case.
* We can handle signals a bit more gracefully now.
* More documentation in run-command.h 
* I thought it is a good idea to introduce `sigchain_pop_common`.

Jonathan Nieder (1):
  submodule.c: write "Fetching submodule <foo>" to stderr

Stefan Beller (7):
  xread: poll on non blocking fds
  xread_nonblock: add functionality to read from fds without blocking
  strbuf: add strbuf_read_once to read without blocking
  sigchain: add command to pop all common signals
  run-command: add an asynchronous parallel child processor
  fetch_populated_submodules: use new parallel job processing
  submodules: allow parallel fetching, add tests and documentation

 Documentation/fetch-options.txt |   7 +
 builtin/fetch.c                 |   6 +-
 builtin/pull.c                  |   6 +
 git-compat-util.h               |   1 +
 run-command.c                   | 348 ++++++++++++++++++++++++++++++++++++++++
 run-command.h                   |  63 ++++++++
 sigchain.c                      |   9 ++
 sigchain.h                      |   1 +
 strbuf.c                        |  11 ++
 strbuf.h                        |   9 ++
 submodule.c                     | 127 +++++++++++----
 submodule.h                     |   2 +-
 t/t0061-run-command.sh          |  20 +++
 t/t5526-fetch-submodules.sh     |  70 +++++---
 test-run-command.c              |  24 +++
 wrapper.c                       |  35 +++-
 16 files changed, 675 insertions(+), 64 deletions(-)

diff --git a/run-command.c b/run-command.c
index 494e1f8..df84985 100644
--- a/run-command.c
+++ b/run-command.c
@@ -234,35 +234,6 @@ static inline void set_cloexec(int fd)
 		fcntl(fd, F_SETFD, flags | FD_CLOEXEC);
 }
 
-static int determine_return_value(int wait_status,
-				  int *result,
-				  int *error_code,
-				  const char *argv0)
-{
-	if (WIFSIGNALED(wait_status)) {
-		*result = WTERMSIG(wait_status);
-		if (*result != SIGINT && *result != SIGQUIT)
-			error("%s died of signal %d", argv0, *result);
-		/*
-		 * This return value is chosen so that code & 0xff
-		 * mimics the exit code that a POSIX shell would report for
-		 * a program that died from this signal.
-		 */
-		*result += 128;
-	} else if (WIFEXITED(wait_status)) {
-		*result = WEXITSTATUS(wait_status);
-		/*
-		 * Convert special exit code when execvp failed.
-		 */
-		if (*result == 127) {
-			*result = -1;
-			*error_code = ENOENT;
-		}
-	} else
-		return -1;
-	return 0;
-}
-
 static int wait_or_whine(pid_t pid, const char *argv0)
 {
 	int status, code = -1;
@@ -275,12 +246,29 @@ static int wait_or_whine(pid_t pid, const char *argv0)
 	if (waiting < 0) {
 		failed_errno = errno;
 		error("waitpid for %s failed: %s", argv0, strerror(errno));
+	} else if (waiting != pid) {
+		error("waitpid is confused (%s)", argv0);
+	} else if (WIFSIGNALED(status)) {
+		code = WTERMSIG(status);
+		if (code != SIGINT && code != SIGQUIT)
+			error("%s died of signal %d", argv0, code);
+		/*
+		 * This return value is chosen so that code & 0xff
+		 * mimics the exit code that a POSIX shell would report for
+		 * a program that died from this signal.
+		 */
+		code += 128;
+	} else if (WIFEXITED(status)) {
+		code = WEXITSTATUS(status);
+		/*
+		 * Convert special exit code when execvp failed.
+		 */
+		if (code == 127) {
+			code = -1;
+			failed_errno = ENOENT;
+		}
 	} else {
-		if (waiting != pid || (determine_return_value(status,
-							      &code,
-							      &failed_errno,
-							      argv0) < 0))
-			error("waitpid is confused (%s)", argv0);
+		error("waitpid is confused (%s)", argv0);
 	}
 
 	clear_child_for_cleanup(pid);
@@ -888,46 +876,67 @@ struct parallel_processes {
 	 */
 	struct pollfd *pfd;
 
+	unsigned shutdown : 1;
+
 	int output_owner;
 	struct strbuf buffered_output; /* of finished children */
-};
+} parallel_processes_struct;
 
-void default_start_failure(void *data,
-			   struct child_process *cp,
-			   struct strbuf *err)
+static int default_start_failure(void *data,
+				 struct child_process *cp,
+				 struct strbuf *err)
 {
 	int i;
-	struct strbuf sb = STRBUF_INIT;
 
+	strbuf_addstr(err, "Starting a child failed:");
 	for (i = 0; cp->argv[i]; i++)
-		strbuf_addf(&sb, " %s", cp->argv[i]);
+		strbuf_addf(err, " %s", cp->argv[i]);
 
-	die_errno("Starting a child failed:%s", sb.buf);
+	return 0;
 }
 
-void default_return_value(void *data,
-			  struct child_process *cp,
-			  int result)
+static int default_return_value(void *data,
+				struct child_process *cp,
+				struct strbuf *err,
+				int result)
 {
 	int i;
-	struct strbuf sb = STRBUF_INIT;
 
 	if (!result)
-		return;
+		return 0;
 
+	strbuf_addf(err, "A child failed with return code %d:", result);
 	for (i = 0; cp->argv[i]; i++)
-		strbuf_addf(&sb, " %s", cp->argv[i]);
+		strbuf_addf(err, " %s", cp->argv[i]);
 
-	die_errno("A child failed with return code %d:%s", result, sb.buf);
+	return 0;
 }
 
-static void pp_init(struct parallel_processes *pp,
-					int n, void *data,
-					get_next_task_fn get_next_task,
-					start_failure_fn start_failure,
-					return_value_fn return_value)
+static void kill_children(struct parallel_processes *pp, int signo)
+{
+	int i, n = pp->max_processes;
+
+	for (i = 0; i < n; i++)
+		if (pp->children[i].in_use)
+			kill(pp->children[i].process.pid, signo);
+}
+
+static void handle_children_on_signal(int signo)
+{
+	struct parallel_processes *pp = &parallel_processes_struct;
+
+	kill_children(pp, signo);
+	sigchain_pop(signo);
+	raise(signo);
+}
+
+static struct parallel_processes *pp_init(int n, void *data,
+					  get_next_task_fn get_next_task,
+					  start_failure_fn start_failure,
+					  return_value_fn return_value)
 {
 	int i;
+	struct parallel_processes *pp = &parallel_processes_struct;
 
 	if (n < 1)
 		n = online_cpus();
@@ -952,6 +961,8 @@ static void pp_init(struct parallel_processes *pp,
 		pp->pfd[i].events = POLLIN;
 		pp->pfd[i].fd = -1;
 	}
+	sigchain_push_common(handle_children_on_signal);
+	return pp;
 }
 
 static void pp_cleanup(struct parallel_processes *pp)
@@ -964,6 +975,8 @@ static void pp_cleanup(struct parallel_processes *pp)
 	free(pp->children);
 	free(pp->pfd);
 	strbuf_release(&pp->buffered_output);
+
+	sigchain_pop_common();
 }
 
 static void set_nonblocking(int fd)
@@ -977,7 +990,12 @@ static void set_nonblocking(int fd)
 			"output will be degraded");
 }
 
-/* returns 1 if a process was started, 0 otherwise */
+/* returns
+ *  0 if a new task was started.
+ *  1 if no new jobs was started (get_next_task ran out of work, non critical
+ *    problem with starting a new command)
+ * -1 no new job was started, user wishes to shutdown early.
+ */
 static int pp_start_one(struct parallel_processes *pp)
 {
 	int i;
@@ -993,10 +1011,14 @@ static int pp_start_one(struct parallel_processes *pp)
 			       &pp->children[i].err))
 		return 1;
 
-	if (start_command(&pp->children[i].process))
-		pp->start_failure(pp->data,
-				  &pp->children[i].process,
-				  &pp->children[i].err);
+	if (start_command(&pp->children[i].process)) {
+		int code = pp->start_failure(pp->data,
+					     &pp->children[i].process,
+					     &pp->children[i].err);
+		strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
+		strbuf_reset(&pp->children[i].err);
+		return code ? -1 : 1;
+	}
 
 	set_nonblocking(pp->children[i].process.err);
 
@@ -1006,11 +1028,11 @@ static int pp_start_one(struct parallel_processes *pp)
 	return 0;
 }
 
-static void pp_buffer_stderr(struct parallel_processes *pp)
+static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout)
 {
 	int i;
 
-	while ((i = poll(pp->pfd, pp->max_processes, 100)) < 0) {
+	while ((i = poll(pp->pfd, pp->max_processes, output_timeout)) < 0) {
 		if (errno == EINTR)
 			continue;
 		pp_cleanup(pp);
@@ -1038,17 +1060,18 @@ static void pp_output(struct parallel_processes *pp)
 	}
 }
 
-static void pp_collect_finished(struct parallel_processes *pp)
+static int pp_collect_finished(struct parallel_processes *pp)
 {
 	int i = 0;
 	pid_t pid;
 	int wait_status, code;
 	int n = pp->max_processes;
+	int result = 0;
 
 	while (pp->nr_processes > 0) {
 		pid = waitpid(-1, &wait_status, WNOHANG);
 		if (pid == 0)
-			return;
+			return 0;
 
 		if (pid < 0)
 			die_errno("wait");
@@ -1064,12 +1087,38 @@ static void pp_collect_finished(struct parallel_processes *pp)
 				pp->children[i].process.err, 0) < 0)
 			die_errno("strbuf_read");
 
-		if (determine_return_value(wait_status, &code, &errno,
-					   pp->children[i].process.argv[0]) < 0)
-			error("waitpid is confused (%s)",
-			      pp->children[i].process.argv[0]);
+		if (WIFSIGNALED(wait_status)) {
+			code = WTERMSIG(wait_status);
+			if (!pp->shutdown &&
+			    code != SIGINT && code != SIGQUIT)
+				strbuf_addf(&pp->children[i].err,
+					    "%s died of signal %d",
+					    pp->children[i].process.argv[0],
+					    code);
+			/*
+			 * This return value is chosen so that code & 0xff
+			 * mimics the exit code that a POSIX shell would report for
+			 * a program that died from this signal.
+			 */
+			code += 128;
+		} else if (WIFEXITED(wait_status)) {
+			code = WEXITSTATUS(wait_status);
+			/*
+			 * Convert special exit code when execvp failed.
+			 */
+			if (code == 127) {
+				code = -1;
+				errno = ENOENT;
+			}
+		} else
+			strbuf_addf(&pp->children[i].err,
+				    "waitpid is confused (%s)",
+				    pp->children[i].process.argv[0]);
+
 
-		pp->return_value(pp->data, &pp->children[i].process, code);
+		if (pp->return_value(pp->data, &pp->children[i].process,
+				     &pp->children[i].err, code))
+			result = 1;
 
 		argv_array_clear(&pp->children[i].process.args);
 		argv_array_clear(&pp->children[i].process.env_array);
@@ -1103,6 +1152,7 @@ static void pp_collect_finished(struct parallel_processes *pp)
 			pp->output_owner = (pp->output_owner + i) % n;
 		}
 	}
+	return result;
 }
 
 int run_processes_parallel(int n, void *data,
@@ -1110,21 +1160,43 @@ int run_processes_parallel(int n, void *data,
 			   start_failure_fn start_failure,
 			   return_value_fn return_value)
 {
-	struct parallel_processes pp;
-	pp_init(&pp, n, data, get_next_task, start_failure, return_value);
+	int no_more_task = 0;
+	struct parallel_processes *pp;
 
+	pp = pp_init(n, data, get_next_task, start_failure, return_value);
 	while (1) {
-		while (pp.nr_processes < pp.max_processes &&
-		       !pp_start_one(&pp))
-			; /* nothing */
-		if (!pp.nr_processes)
+		int i;
+		int output_timeout = 100;
+		int spawn_cap = 4;
+
+		if (!no_more_task) {
+			for (i = 0; i < spawn_cap; i++) {
+				int code;
+				if (pp->nr_processes == pp->max_processes)
+					break;
+
+				code = pp_start_one(pp);
+				if (!code)
+					continue;
+				if (code < 0) {
+					pp->shutdown = 1;
+					kill_children(pp, SIGTERM);
+				}
+				no_more_task = 1;
+				break;
+			}
+		}
+		if (no_more_task && !pp->nr_processes)
 			break;
-		pp_buffer_stderr(&pp);
-		pp_output(&pp);
-		pp_collect_finished(&pp);
+		pp_buffer_stderr(pp, output_timeout);
+		pp_output(pp);
+		if (pp_collect_finished(pp)) {
+			kill_children(pp, SIGTERM);
+			pp->shutdown = 1;
+			no_more_task = 1;
+		}
 	}
 
-	pp_cleanup(&pp);
-
+	pp_cleanup(pp);
 	return 0;
 }
diff --git a/run-command.h b/run-command.h
index 3807fd1..1179cb0 100644
--- a/run-command.h
+++ b/run-command.h
@@ -132,13 +132,36 @@ typedef int (*get_next_task_fn)(void *data,
 				struct child_process *cp,
 				struct strbuf *err);
 
-typedef void (*start_failure_fn)(void *data,
-				 struct child_process *cp,
-				 struct strbuf *err);
-
-typedef void (*return_value_fn)(void *data,
+/**
+ * This callback is called whenever there are problems starting
+ * a new process.
+ *
+ * You must not write to stdout or stderr in this function. Add your
+ * message to the strbuf err instead, which will be printed without
+ * messing up the output of the other parallel processes.
+ *
+ * Return 0 to continue the parallel processing. To abort gracefully,
+ * return non zero.
+ */
+typedef int (*start_failure_fn)(void *data,
 				struct child_process *cp,
-				int result);
+				struct strbuf *err);
+
+/**
+ * This callback is called on every there are problems starting
+ * a new process.
+ *
+ * You must not write to stdout or stderr in this function. Add your
+ * message to the strbuf err instead, which will be printed without
+ * messing up the output of the other parallel processes.
+ *
+ * Return 0 to continue the parallel processing. To abort gracefully,
+ * return non zero.
+ */
+typedef int (*return_value_fn)(void *data,
+			       struct child_process *cp,
+			       struct strbuf *err,
+			       int result);
 
 /**
  * Runs up to n processes at the same time. Whenever a process can be
@@ -148,6 +171,10 @@ typedef void (*return_value_fn)(void *data,
  * The children started via this function run in parallel and their output
  * to stderr is buffered, while one of the children will directly output
  * to stderr.
+ *
+ * If start_failure_fn and return_value_fn are NULL, default handlers
+ * will be used. The default handlers will print an error message on
+ * error without issuing an emergency stop.
  */
 
 int run_processes_parallel(int n, void *data,
diff --git a/sigchain.c b/sigchain.c
index faa375d..9262307 100644
--- a/sigchain.c
+++ b/sigchain.c
@@ -50,3 +50,12 @@ void sigchain_push_common(sigchain_fun f)
 	sigchain_push(SIGQUIT, f);
 	sigchain_push(SIGPIPE, f);
 }
+
+void sigchain_pop_common(void)
+{
+	sigchain_pop(SIGINT);
+	sigchain_pop(SIGHUP);
+	sigchain_pop(SIGTERM);
+	sigchain_pop(SIGQUIT);
+	sigchain_pop(SIGPIPE);
+}
diff --git a/sigchain.h b/sigchain.h
index 618083b..138b20f 100644
--- a/sigchain.h
+++ b/sigchain.h
@@ -7,5 +7,6 @@ int sigchain_push(int sig, sigchain_fun f);
 int sigchain_pop(int sig);
 
 void sigchain_push_common(sigchain_fun f);
+void sigchain_pop_common(void);
 
 #endif /* SIGCHAIN_H */
diff --git a/submodule.c b/submodule.c
index fdaf3e4..7ab89f4 100644
--- a/submodule.c
+++ b/submodule.c
@@ -630,18 +630,25 @@ struct submodule_parallel_fetch {
 int get_next_submodule(void *data, struct child_process *cp,
 		       struct strbuf *err);
 
-void handle_submodule_fetch_start_err(void *data, struct child_process *cp, struct strbuf *err)
+static int fetch_start_failure(void *data, struct child_process *cp,
+			       struct strbuf *err)
 {
 	struct submodule_parallel_fetch *spf = data;
+
 	spf->result = 1;
+
+	return 0;
 }
 
-void handle_submodule_fetch_finish( void *data, struct child_process *cp, int retvalue)
+static int fetch_finish(void *data, struct child_process *cp,
+			struct strbuf *err, int retvalue)
 {
 	struct submodule_parallel_fetch *spf = data;
 
 	if (retvalue)
 		spf->result = 1;
+
+	return 0;
 }
 
 int fetch_populated_submodules(const struct argv_array *options,
@@ -671,8 +678,8 @@ int fetch_populated_submodules(const struct argv_array *options,
 	calculate_changed_submodule_paths();
 	run_processes_parallel(max_parallel_jobs, &spf,
 			       get_next_submodule,
-			       handle_submodule_fetch_start_err,
-			       handle_submodule_fetch_finish);
+			       fetch_start_failure,
+			       fetch_finish);
 
 	argv_array_clear(&spf.args);
 out:
diff --git a/test-run-command.c b/test-run-command.c
index 94c6eee..2555791 100644
--- a/test-run-command.c
+++ b/test-run-command.c
@@ -16,9 +16,9 @@
 #include <errno.h>
 
 static int number_callbacks;
-int parallel_next(void *data,
-		  struct child_process *cp,
-		  struct strbuf *err)
+static int parallel_next(void *data,
+			 struct child_process *cp,
+			 struct strbuf *err)
 {
 	struct child_process *d = data;
 	if (number_callbacks >= 4)


-- 
2.5.0.273.g6fa2560.dirty

^ permalink raw reply related	[flat|nested] 31+ messages in thread

end of thread, other threads:[~2015-12-15  6:13 UTC | newest]

Thread overview: 31+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2015-12-14 19:37 [PATCH 0/8] Rerolling sb/submodule-parallel-fetch for the time after 2.7 Stefan Beller
2015-12-14 19:37 ` [PATCH 1/8] submodule.c: write "Fetching submodule <foo>" to stderr Stefan Beller
2015-12-14 19:37 ` [PATCH 2/8] xread: poll on non blocking fds Stefan Beller
2015-12-14 22:58   ` Eric Sunshine
2015-12-14 23:07     ` Stefan Beller
2015-12-14 23:11     ` Junio C Hamano
2015-12-14 23:14       ` Stefan Beller
2015-12-14 19:37 ` [PATCH 3/8] xread_nonblock: add functionality to read from fds without blocking Stefan Beller
2015-12-14 20:59   ` Junio C Hamano
2015-12-14 23:03   ` Eric Sunshine
2015-12-14 23:05     ` Eric Sunshine
2015-12-14 23:15     ` Junio C Hamano
2015-12-14 23:57       ` Jeff King
2015-12-15  0:09         ` Stefan Beller
2015-12-15  0:16           ` Jeff King
2015-12-15  0:25             ` Stefan Beller
2015-12-15  1:44               ` Jeff King
2015-12-15  6:12               ` Johannes Sixt
2015-12-15  1:40         ` Junio C Hamano
2015-12-14 19:37 ` [PATCH 4/8] strbuf: add strbuf_read_once to read " Stefan Beller
2015-12-14 23:16   ` Eric Sunshine
2015-12-14 23:27     ` Stefan Beller
2015-12-14 19:37 ` [PATCH 5/8] sigchain: add command to pop all common signals Stefan Beller
2015-12-14 19:37 ` [PATCH 6/8] run-command: add an asynchronous parallel child processor Stefan Beller
2015-12-14 20:39   ` Johannes Sixt
2015-12-14 21:40     ` Stefan Beller
2015-12-14 19:37 ` [PATCH 7/8] fetch_populated_submodules: use new parallel job processing Stefan Beller
2015-12-14 19:37 ` [PATCH 8/8] submodules: allow parallel fetching, add tests and documentation Stefan Beller
2015-12-14 20:40 ` [PATCH 0/8] Rerolling sb/submodule-parallel-fetch for the time after 2.7 Johannes Sixt
2015-12-14 21:00   ` Junio C Hamano
  -- strict thread matches above, loose matches on Subject: below --
2015-09-28 23:13 [PATCH 0/8] fetch submodules in parallel Stefan Beller
2015-09-28 23:14 ` [PATCH 2/8] xread: poll on non blocking fds Stefan Beller

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).