git.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH v2] Threaded grep
@ 2010-01-12 23:44 Fredrik Kuivinen
  0 siblings, 0 replies; only message in thread
From: Fredrik Kuivinen @ 2010-01-12 23:44 UTC (permalink / raw)
  To: git
  Cc: Linus Torvalds, Junio C Hamano, Miles Bader,
	Nguyen Thai Ngoc Duy, Jeff King

This the second version of the threaded grep patch. It applies cleanly
on top of pu.

Two environment variables can be used to control the threading. If
NO_THREADS is set, then the old non-threaded code will be used.
THREADS can be set to an integer to specify the number of threads that
should be used. If neither NO_THREADS nor THREADS is set, 8 threads
are used.

The results below are best of five runs in the Linux repository on a
box with two cores.

git grep --no-ext-grep qwerty

4.38user 0.57system 0:02.69elapsed 183%CPU (0avgtext+0avgdata
0maxresident)k 0inputs+784outputs (0major+6206minor)pagefaults 0swaps

NO_THREADS=1 git grep --no-ext-grep qwerty
4.33user 0.48system 0:04.82elapsed 99%CPU (0avgtext+0avgdata
0maxresident)k 0inputs+784outputs (0major+3523minor)pagefaults 0swaps


And with a pattern with quite a few matches:

git grep --no-ext-grep function
6.28user 0.63system 0:03.97elapsed 173%CPU (0avgtext+0avgdata
0maxresident)k 0inputs+784outputs (0major+8101minor)pagefaults 0swaps

NO_THREADS=1 git grep --no-ext-grep function
6.21user 0.50system 0:06.71elapsed 100%CPU (0avgtext+0avgdata
0maxresident)k 0inputs+784outputs (0major+3523minor)pagefaults 0swaps


In either case we gain about 40% by the threading. More testing is, as
always, greatly appreciated.

The patch needs some further clean-ups before it can be considered for
application.

---
Major changes since v1:

* Buffer the output from the threads instead of waiting until the
  result can be written to stdout.

* Limit the maximum number of pending work_items.


 builtin-grep.c |  340 +++++++++++++++++++++++++++++++++++++++++++++++++++++---
 grep.c         |   85 ++++++++++++--
 grep.h         |    6 +
 strbuf.c       |   19 ++-
 strbuf.h       |    1 
 5 files changed, 413 insertions(+), 38 deletions(-)

diff --git a/builtin-grep.c b/builtin-grep.c
index 59c4b12..ebeede9 100644
--- a/builtin-grep.c
+++ b/builtin-grep.c
@@ -15,6 +15,13 @@
 #include "grep.h"
 #include "quote.h"
 
+#ifdef THREADED_DELTA_SEARCH
+#include "thread-utils.h"
+#include <pthread.h>
+#include <sys/types.h>
+#include <sys/syscall.h>
+#endif
+
 #ifndef NO_EXTERNAL_GREP
 #ifdef __unix__
 #define NO_EXTERNAL_GREP 0
@@ -23,6 +30,262 @@
 #endif
 #endif
 
+static int use_threads = 1;
+
+#ifdef THREADED_DELTA_SEARCH
+
+static void* load_file(const char *filename, size_t *sz);
+
+enum work_type {WORK_BUF, WORK_FILE};
+
+/* We use one producer thread and number_of_threads consumer
+   threads. The producer adds struct work_items to 'todo' and the
+   consumers pick work items from the same array. */
+struct work_item
+{
+	enum work_type type;
+	char *name;
+
+	/* if type == WORK_BUF, then 'buf' points to a buffer of size
+	   'size' otherwise type == WORK_FILE and 'buf' is a nul
+	   terminated filename. */
+	char *buf;
+	size_t size;
+	char done;
+	struct strbuf out;
+};
+
+/* Number of consumer threads. */
+static int number_of_threads;
+static pthread_t *threads;
+
+/* In the range [todo_done, todo_start) in 'todo' we have work_items
+   that have been or are processed by a consumer thread. We haven't
+   written the result for these to stdout yet.
+
+   The work_items in [todo_start, todo_end) are waiting to be picked
+   up by a consumer thread.
+
+   The ranges are modulo todo_alloc.
+*/
+static struct work_item *todo = NULL;
+static int todo_alloc = 0;
+static int todo_start = 0;
+static int todo_end = 0;
+static int todo_done = 0;
+
+/* Has all work items been added? */
+static int all_work_added = 0;
+
+/* This lock protects all the variables above. */
+static pthread_mutex_t grep_lock = PTHREAD_MUTEX_INITIALIZER;
+
+/* Signalled when a new work_item is added to todo. */
+static pthread_cond_t cond_add = PTHREAD_COND_INITIALIZER;
+
+/* Signalled when the result from one work_item is written to
+   stdout. */
+static pthread_cond_t cond_write = PTHREAD_COND_INITIALIZER;
+
+/* Signalled when we are finished with everything. */
+static pthread_cond_t cond_result = PTHREAD_COND_INITIALIZER;
+
+static void add_work(enum work_type type, char *name, char *buf, size_t size)
+{
+	pthread_mutex_lock(&grep_lock);
+
+	while ((todo_end+1) % todo_alloc == todo_done) {
+		pthread_cond_wait(&cond_write, &grep_lock);
+	}
+
+	todo[todo_end].type = type;
+	todo[todo_end].name = name;
+	todo[todo_end].buf = buf;
+	todo[todo_end].size = size;
+	todo[todo_end].done = 0;
+	strbuf_reset(&todo[todo_end].out);
+	todo_end = (todo_end + 1) % todo_alloc;
+
+	pthread_mutex_unlock(&grep_lock);
+	pthread_cond_signal(&cond_add);
+}
+
+static struct work_item* get_work()
+{
+	struct work_item* ret;
+
+	pthread_mutex_lock(&grep_lock);
+	while (todo_start == todo_end && !all_work_added) {
+		pthread_cond_wait(&cond_add, &grep_lock);
+	}
+
+	if (todo_start == todo_end && all_work_added) {
+		ret = NULL;
+	} else {
+		ret = &todo[todo_start];
+		todo_start = (todo_start + 1) % todo_alloc;
+	}
+	pthread_mutex_unlock(&grep_lock);
+	return ret;
+}
+
+/* This function takes ownership of 'name' and 'buf'. They will be
+   deallocated with free. */
+static int grep_buffer_async(struct grep_opt *opt, char *name, char *buf,
+			     size_t size)
+{
+	add_work(WORK_BUF, name, buf, size);
+	return 0;
+}
+
+static int grep_file_async(struct grep_opt *opt, char *name,
+			   const char *filename)
+{
+	add_work(WORK_FILE, name, (char*) filename, 0);
+	return 0;
+}
+
+static void work_done(struct work_item* w)
+{
+	int old_done;
+
+	pthread_mutex_lock(&grep_lock);
+	w->done = 1;
+	old_done = todo_done;
+	for(; todo[todo_done].done && todo_done != todo_start;
+	    todo_done = (todo_done+1) % todo_alloc) {
+		w = &todo[todo_done];
+		write_or_die(1, w->out.buf, w->out.len);
+		if (w->type == WORK_BUF)
+			free(w->buf);
+
+		free(w->name);
+	}
+
+	if (old_done != todo_done)
+		pthread_cond_signal(&cond_write);
+
+	if (all_work_added && todo_done == todo_end)
+		pthread_cond_signal(&cond_result);
+
+	pthread_mutex_unlock(&grep_lock);
+}
+
+static void* run(void *arg)
+{
+	int hit = 0;
+	struct grep_opt *opt = arg;
+
+	while (1) {
+		struct work_item *w = get_work();
+		if (!w)
+			break;
+
+		opt->output_priv = w;
+		if (w->type == WORK_BUF) {
+			hit |= grep_buffer(opt, w->name, w->buf, w->size);
+		} else {
+			size_t sz;
+			void* data = load_file(w->buf, &sz);
+			if (data) {
+				hit |= grep_buffer(opt, w->name, data, sz);
+				free(data);
+			}
+		}
+
+		work_done(w);
+	}
+
+	return (void*) (intptr_t) hit;
+}
+
+static void strbuf_out(struct grep_opt *opt, const char *fmt, ...)
+{
+	struct work_item *w = opt->output_priv;
+	va_list ap;
+
+	va_start(ap, fmt);
+	strbuf_vaddf(&w->out, fmt, ap);
+	va_end(ap);
+}
+
+static void start_threads(struct grep_opt *opt)
+{
+	int i;
+
+	if (getenv("THREADS"))
+		number_of_threads = atoi(getenv("THREADS"));
+	else
+		number_of_threads = 8;
+
+	todo_alloc = 100;
+	todo = xmalloc(sizeof(struct work_item)*todo_alloc);
+	for (i = 0; i < todo_alloc; i++) {
+		strbuf_init(&todo[i].out, 0);
+	}
+
+	threads = xmalloc(sizeof(pthread_t)*number_of_threads);
+	for (i = 0; i < number_of_threads; i++) {
+		struct grep_opt *o = grep_opt_dup(opt);
+		o->show_hunk_mark = 1;
+		o->output = strbuf_out;
+		compile_grep_patterns(o);
+		int err = pthread_create(&threads[i], NULL, run, o);
+
+		if (err)
+			die("grep: failed to create thread: %s", strerror(err));
+	}
+}
+#endif /* THREADED_DELTA_SEARCH */
+
+#ifdef THREADED_DELTA_SEARCH
+static int wait_all()
+{
+	int hit = 0;
+	int i;
+
+	pthread_mutex_lock(&grep_lock);
+	all_work_added = 1;
+
+	/* Wait until all work is done. */
+	while (todo_done != todo_end)
+		pthread_cond_wait(&cond_result, &grep_lock);
+
+	/* Wake up all the consumer threads so they can see that there
+	   is no more work to do. */
+	pthread_cond_broadcast(&cond_add);
+	pthread_mutex_unlock(&grep_lock);
+
+	for (i = 0; i < number_of_threads; i++) {
+		void *h;
+		pthread_join(threads[i], &h);
+		hit |= (int) (intptr_t) h;
+	}
+
+	return hit;
+}
+#else
+static int wait_all()
+{
+	return 0;
+}
+#endif
+
+static int grep_buffer_internal(struct grep_opt *opt, char *name, char *buf,
+				size_t size)
+{
+#ifdef THREADED_DELTA_SEARCH
+	if (use_threads)
+		return grep_buffer_async(opt, name, buf, size);
+#endif
+	{
+		int hit = grep_buffer(opt, name, buf, size);
+		free(name);
+		free(buf);
+		return hit;
+	}
+}
+
 static char const * const grep_usage[] = {
 	"git grep [options] [-e] <pattern> [<rev>...] [[--] path...]",
 	NULL
@@ -169,21 +432,21 @@ static int grep_sha1(struct grep_opt *opt, const unsigned char *sha1, const char
 	if (opt->relative && opt->prefix_length) {
 		quote_path_relative(name + tree_name_len, -1, &pathbuf, opt->prefix);
 		strbuf_insert(&pathbuf, 0, name, tree_name_len);
-		name = pathbuf.buf;
+	} else {
+		strbuf_addstr(&pathbuf, name);
 	}
-	hit = grep_buffer(opt, name, data, size);
-	strbuf_release(&pathbuf);
-	free(data);
+
+	hit = grep_buffer_internal(opt, strbuf_detach(&pathbuf, NULL),
+				   data, size);
+
 	return hit;
 }
 
-static int grep_file(struct grep_opt *opt, const char *filename)
+static void* load_file(const char *filename, size_t *sz)
 {
 	struct stat st;
+	char* data;
 	int i;
-	char *data;
-	size_t sz;
-	struct strbuf buf = STRBUF_INIT;
 
 	if (lstat(filename, &st) < 0) {
 	err_ret:
@@ -195,24 +458,47 @@ static int grep_file(struct grep_opt *opt, const char *filename)
 		return 0; /* empty file -- no grep hit */
 	if (!S_ISREG(st.st_mode))
 		return 0;
-	sz = xsize_t(st.st_size);
+	*sz = xsize_t(st.st_size);
 	i = open(filename, O_RDONLY);
 	if (i < 0)
 		goto err_ret;
-	data = xmalloc(sz + 1);
-	if (st.st_size != read_in_full(i, data, sz)) {
+	data = xmalloc(*sz + 1);
+	data[*sz] = 0;
+	if (st.st_size != read_in_full(i, data, *sz)) {
 		error("'%s': short read %s", filename, strerror(errno));
 		close(i);
 		free(data);
 		return 0;
 	}
 	close(i);
+	return data;
+}
+
+static int grep_file(struct grep_opt *opt, const char *filename)
+{
+	char *data;
+	size_t sz;
+	struct strbuf buf = STRBUF_INIT;
+	char *name;
+
 	if (opt->relative && opt->prefix_length)
-		filename = quote_path_relative(filename, -1, &buf, opt->prefix);
-	i = grep_buffer(opt, filename, data, sz);
-	strbuf_release(&buf);
-	free(data);
-	return i;
+		quote_path_relative(filename, -1, &buf, opt->prefix);
+	else
+		strbuf_addstr(&buf, filename);
+	name = strbuf_detach(&buf, NULL);
+
+#ifdef THREADED_DELTA_SEARCH
+	if (use_threads) {
+		return grep_file_async(opt, name, filename);
+	} else
+#endif
+	{
+		data = load_file(filename, &sz);
+		if (!data)
+			return 0;
+
+		return grep_buffer_internal(opt, name, data, sz);
+	}
 }
 
 #if !NO_EXTERNAL_GREP
@@ -809,6 +1095,8 @@ int cmd_grep(int argc, const char **argv, const char *prefix)
 		OPT_END()
 	};
 
+	use_threads = getenv("NO_THREADS") == NULL;
+
 	/*
 	 * 'git grep -h', unlike 'git grep -h <pattern>', is a request
 	 * to show usage information and exit.
@@ -862,6 +1150,17 @@ int cmd_grep(int argc, const char **argv, const char *prefix)
 		opt.regflags |= REG_ICASE;
 	if ((opt.regflags != REG_NEWLINE) && opt.fixed)
 		die("cannot mix --fixed-strings and regexp");
+
+#ifdef THREADED_DELTA_SEARCH
+	if (!grep_threads_ok(&opt))
+		use_threads = 0;
+
+	if (use_threads)
+		start_threads(&opt);
+#else
+	use_threads = 0;
+#endif
+
 	compile_grep_patterns(&opt);
 
 	/* Check revs and then paths */
@@ -899,9 +1198,13 @@ int cmd_grep(int argc, const char **argv, const char *prefix)
 	}
 
 	if (!list.nr) {
+		int hit;
 		if (!cached)
 			setup_work_tree();
-		return !grep_cache(&opt, paths, cached, external_grep_allowed);
+		hit = grep_cache(&opt, paths, cached, external_grep_allowed);
+		if (use_threads)
+			hit |= wait_all();
+		return !hit;
 	}
 
 	if (cached)
@@ -913,6 +1216,9 @@ int cmd_grep(int argc, const char **argv, const char *prefix)
 		if (grep_object(&opt, paths, real_obj, list.objects[i].name))
 			hit = 1;
 	}
+
+	if (use_threads)
+		hit |= wait_all();
 	free_grep_patterns(&opt);
 	return !hit;
 }
diff --git a/grep.c b/grep.c
index 62723da..3dd3918 100644
--- a/grep.c
+++ b/grep.c
@@ -29,6 +29,28 @@ void append_grep_pattern(struct grep_opt *opt, const char *pat,
 	p->next = NULL;
 }
 
+struct grep_opt* grep_opt_dup(const struct grep_opt *opt)
+{
+	struct grep_pat *pat;
+	struct grep_opt *ret = xmalloc(sizeof(struct grep_opt));
+	*ret = *opt;
+
+	ret->pattern_list = NULL;
+	ret->pattern_tail = &ret->pattern_list;
+
+	for(pat = opt->pattern_list; pat != NULL; pat = pat->next)
+	{
+		if(pat->token == GREP_PATTERN_HEAD)
+			append_header_grep_pattern(ret, pat->field,
+						   pat->pattern);
+		else
+			append_grep_pattern(ret, pat->pattern, pat->origin,
+					    pat->no, pat->token);
+	}
+
+	return ret;
+}
+
 static int is_fixed(const char *s)
 {
 	while (*s && !is_regex_special(*s))
@@ -260,7 +282,7 @@ static int word_char(char ch)
 
 static void show_name(struct grep_opt *opt, const char *name)
 {
-	printf("%s%c", name, opt->null_following_name ? '\0' : '\n');
+	opt->output(opt, "%s%c", name, opt->null_following_name ? '\0' : '\n');
 }
 
 
@@ -501,20 +523,20 @@ static void show_line(struct grep_opt *opt, char *bol, char *eol,
 	if (opt->pre_context || opt->post_context) {
 		if (opt->last_shown == 0) {
 			if (opt->show_hunk_mark)
-				fputs("--\n", stdout);
+				opt->output(opt, "--\n");
 			else
 				opt->show_hunk_mark = 1;
 		} else if (lno > opt->last_shown + 1)
-			fputs("--\n", stdout);
+			opt->output(opt, "--\n");
 	}
 	opt->last_shown = lno;
 
 	if (opt->null_following_name)
 		sign = '\0';
 	if (opt->pathname)
-		printf("%s%c", name, sign);
+		opt->output(opt, "%s%c", name, sign);
 	if (opt->linenum)
-		printf("%d%c", lno, sign);
+		opt->output(opt, "%d%c", lno, sign);
 	if (opt->color) {
 		regmatch_t match;
 		enum grep_context ctx = GREP_CONTEXT_BODY;
@@ -525,18 +547,19 @@ static void show_line(struct grep_opt *opt, char *bol, char *eol,
 		while (next_match(opt, bol, eol, ctx, &match, eflags)) {
 			if (match.rm_so == match.rm_eo)
 				break;
-			printf("%.*s%s%.*s%s",
-			       (int)match.rm_so, bol,
-			       opt->color_match,
-			       (int)(match.rm_eo - match.rm_so), bol + match.rm_so,
-			       GIT_COLOR_RESET);
+			opt->output(opt, "%.*s%s%.*s%s",
+				    (int)match.rm_so, bol,
+				    opt->color_match,
+				    (int)(match.rm_eo - match.rm_so),
+				    bol + match.rm_so,
+				    GIT_COLOR_RESET);
 			bol += match.rm_eo;
 			rest -= match.rm_eo;
 			eflags = REG_NOTBOL;
 		}
 		*eol = ch;
 	}
-	printf("%.*s\n", rest, bol);
+	opt->output(opt, "%.*s\n", rest, bol);
 }
 
 static int match_funcname(struct grep_opt *opt, char *bol, char *eol)
@@ -674,6 +697,33 @@ static int look_ahead(struct grep_opt *opt,
 	return 0;
 }
 
+int grep_threads_ok(const struct grep_opt *opt)
+{
+	/* If this condition is true, then we may use the attribute
+	   machinery in grep_buffer_1. The attribute code is not
+	   thread safe, so we disable the use of threads. */
+	if (opt->funcname && !opt->unmatch_name_only && !opt->status_only &&
+	    !opt->name_only)
+		return 0;
+
+	/* If we are showing hunk marks, we should not do it for the
+	   first match. The synchronization problem we get for this
+	   constraint is not yet solved, so we disable threading in
+	   this case. */
+	if (opt->pre_context || opt->post_context)
+		return 0;
+	
+	return 1;
+}
+
+static void std_output(struct grep_opt *opt, const char* fmt, ...)
+{
+	va_list ap;
+	va_start(ap, fmt);
+	vprintf(fmt, ap);
+	va_end(ap);
+}
+
 static int grep_buffer_1(struct grep_opt *opt, const char *name,
 			 char *buf, unsigned long size, int collect_hits)
 {
@@ -689,6 +739,9 @@ static int grep_buffer_1(struct grep_opt *opt, const char *name,
 
 	opt->last_shown = 0;
 
+	if (!opt->output)
+		opt->output = std_output;
+
 	if (buffer_is_binary(buf, size)) {
 		switch (opt->binary) {
 		case GREP_BINARY_DEFAULT:
@@ -761,7 +814,8 @@ static int grep_buffer_1(struct grep_opt *opt, const char *name,
 			if (opt->status_only)
 				return 1;
 			if (binary_match_only) {
-				printf("Binary file %s matches\n", name);
+				opt->output(opt,
+					    "Binary file %s matches\n", name);
 				return 1;
 			}
 			if (opt->name_only) {
@@ -817,9 +871,10 @@ static int grep_buffer_1(struct grep_opt *opt, const char *name,
 	 * which feels mostly useless but sometimes useful.  Maybe
 	 * make it another option?  For now suppress them.
 	 */
-	if (opt->count && count)
-		printf("%s%c%u\n", name,
-		       opt->null_following_name ? '\0' : ':', count);
+	if (opt->count && count) {
+		opt->output(opt, "%s%c%u\n", name,
+			    opt->null_following_name ? '\0' : ':', count);
+	}
 	return !!last_hit;
 }
 
diff --git a/grep.h b/grep.h
index 75370f6..f5031c7 100644
--- a/grep.h
+++ b/grep.h
@@ -92,6 +92,9 @@ struct grep_opt {
 	unsigned last_shown;
 	int show_hunk_mark;
 	void *priv;
+
+	void (*output)(struct grep_opt*, const char*, ...);
+	void *output_priv;
 };
 
 extern void append_grep_pattern(struct grep_opt *opt, const char *pat, const char *origin, int no, enum grep_pat_token t);
@@ -100,4 +103,7 @@ extern void compile_grep_patterns(struct grep_opt *opt);
 extern void free_grep_patterns(struct grep_opt *opt);
 extern int grep_buffer(struct grep_opt *opt, const char *name, char *buf, unsigned long size);
 
+extern struct grep_opt* grep_opt_dup(const struct grep_opt *opt);
+extern int grep_threads_ok(const struct grep_opt *opt);
+
 #endif
diff --git a/strbuf.c b/strbuf.c
index 4017476..a41f0ea 100644
--- a/strbuf.c
+++ b/strbuf.c
@@ -194,21 +194,28 @@ void strbuf_adddup(struct strbuf *sb, size_t pos, size_t len)
 
 void strbuf_addf(struct strbuf *sb, const char *fmt, ...)
 {
-	int len;
 	va_list ap;
 
+	va_start(ap, fmt);
+	strbuf_vaddf(sb, fmt, ap);
+	va_end(ap);
+}
+
+void strbuf_vaddf(struct strbuf *sb, const char *fmt, va_list ap)
+{
+	int len;
+	va_list ap2;
+
+	va_copy(ap2, ap);
 	if (!strbuf_avail(sb))
 		strbuf_grow(sb, 64);
-	va_start(ap, fmt);
 	len = vsnprintf(sb->buf + sb->len, sb->alloc - sb->len, fmt, ap);
-	va_end(ap);
 	if (len < 0)
 		die("your vsnprintf is broken");
 	if (len > strbuf_avail(sb)) {
 		strbuf_grow(sb, len);
-		va_start(ap, fmt);
-		len = vsnprintf(sb->buf + sb->len, sb->alloc - sb->len, fmt, ap);
-		va_end(ap);
+		len = vsnprintf(sb->buf + sb->len, sb->alloc - sb->len, fmt, ap2);
+		va_end(ap2);
 		if (len > strbuf_avail(sb)) {
 			die("this should not happen, your snprintf is broken");
 		}
diff --git a/strbuf.h b/strbuf.h
index b37f06a..20129ad 100644
--- a/strbuf.h
+++ b/strbuf.h
@@ -118,6 +118,7 @@ extern size_t strbuf_expand_dict_cb(struct strbuf *sb, const char *placeholder,
 
 __attribute__((format (printf,2,3)))
 extern void strbuf_addf(struct strbuf *sb, const char *fmt, ...);
+extern void strbuf_vaddf(struct strbuf *sb, const char *fmt, va_list ap);
 
 extern size_t strbuf_fread(struct strbuf *, size_t, FILE *);
 /* XXX: if read fails, any partial read is undone */

^ permalink raw reply related	[flat|nested] only message in thread

only message in thread, other threads:[~2010-01-12 23:44 UTC | newest]

Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2010-01-12 23:44 [PATCH v2] Threaded grep Fredrik Kuivinen

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).