git.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [JGIT PATCH 0/6] Add timeouts to network IO
@ 2009-06-19 21:27 Shawn O. Pearce
  2009-06-19 21:27 ` [JGIT PATCH 1/6] Create input and output streams that have timeouts Shawn O. Pearce
  0 siblings, 1 reply; 11+ messages in thread
From: Shawn O. Pearce @ 2009-06-19 21:27 UTC (permalink / raw)
  To: Robin Rosenberg; +Cc: git

If there's a bug in the remote peer software, or even just random
network hardware/cable failures between us and the remote peer, we
might block indefinitely waiting for more incoming data, or waiting
for TCP ACKs necessary to release space in our transmit buffer.

This series adds a timeout to everything, allowing the caller to
define some maximum waiting period before we abort and declare the
remote peer to be unresponsive.  The default timeout of 0 will use
the traditional "block indefinitely" behavior we have always had.


Shawn O. Pearce (6):
  Create input and output streams that have timeouts
  Add remote.name.timeout to configure an IO timeout
  Add timeouts to smart transport protocol clients
  Add timeouts to smart transport protocol servers
  Add timeouts to anonymous git:// daemon
  Add --timeout command line options

 .../src/org/spearce/jgit/pgm/Daemon.java           |    5 +
 .../src/org/spearce/jgit/pgm/Fetch.java            |    5 +
 .../src/org/spearce/jgit/pgm/LsRemote.java         |    6 +
 .../src/org/spearce/jgit/pgm/Push.java             |    5 +
 .../src/org/spearce/jgit/pgm/UploadPack.java       |    6 +
 .../spearce/jgit/transport/RemoteConfigTest.java   |   26 ++
 .../jgit/util/io/TimeoutInputStreamTest.java       |  187 +++++++++++++
 .../jgit/util/io/TimeoutOutputStreamTest.java      |  286 ++++++++++++++++++++
 .../spearce/jgit/transport/BasePackConnection.java |   36 +++-
 .../jgit/transport/BasePackPushConnection.java     |   29 ++-
 .../src/org/spearce/jgit/transport/Daemon.java     |   26 ++-
 .../org/spearce/jgit/transport/DaemonClient.java   |   12 +-
 .../org/spearce/jgit/transport/ReceivePack.java    |   57 ++++
 .../org/spearce/jgit/transport/RemoteConfig.java   |   31 +++
 .../org/spearce/jgit/transport/SshTransport.java   |    3 +-
 .../src/org/spearce/jgit/transport/Transport.java  |   21 ++
 .../spearce/jgit/transport/TransportGitAnon.java   |   13 +-
 .../spearce/jgit/transport/TransportGitSsh.java    |  103 +++++++-
 .../org/spearce/jgit/transport/TransportLocal.java |    9 +-
 .../org/spearce/jgit/transport/TransportSftp.java  |    3 +-
 .../src/org/spearce/jgit/transport/UploadPack.java |   59 ++++-
 .../org/spearce/jgit/util/io/InterruptTimer.java   |  216 +++++++++++++++
 .../spearce/jgit/util/io/TimeoutInputStream.java   |  133 +++++++++
 .../spearce/jgit/util/io/TimeoutOutputStream.java  |  146 ++++++++++
 24 files changed, 1400 insertions(+), 23 deletions(-)
 create mode 100644 org.spearce.jgit.test/tst/org/spearce/jgit/util/io/TimeoutInputStreamTest.java
 create mode 100644 org.spearce.jgit.test/tst/org/spearce/jgit/util/io/TimeoutOutputStreamTest.java
 create mode 100644 org.spearce.jgit/src/org/spearce/jgit/util/io/InterruptTimer.java
 create mode 100644 org.spearce.jgit/src/org/spearce/jgit/util/io/TimeoutInputStream.java
 create mode 100644 org.spearce.jgit/src/org/spearce/jgit/util/io/TimeoutOutputStream.java

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [JGIT PATCH 1/6] Create input and output streams that have timeouts
  2009-06-19 21:27 [JGIT PATCH 0/6] Add timeouts to network IO Shawn O. Pearce
@ 2009-06-19 21:27 ` Shawn O. Pearce
  2009-06-19 21:27   ` [JGIT PATCH 2/6] Add remote.name.timeout to configure an IO timeout Shawn O. Pearce
  2009-06-22 21:09   ` [JGIT PATCH 1/6] Create input and output streams that have timeouts Robin Rosenberg
  0 siblings, 2 replies; 11+ messages in thread
From: Shawn O. Pearce @ 2009-06-19 21:27 UTC (permalink / raw)
  To: Robin Rosenberg; +Cc: git

We can use these streams to wrap around a PipedInputStream, such
as the one given to us by JSch, to implement a timeout so we can
abort a transport operation if the remote peer doesn't communicate
with us in a timely fashion.

Only the input and output streams and tests are defined, later
I will make use of them in the transport layer where we can't
rely upon raw socket timeouts to abort a long running IO call.

Signed-off-by: Shawn O. Pearce <spearce@spearce.org>
---
 .../jgit/util/io/TimeoutInputStreamTest.java       |  187 +++++++++++++
 .../jgit/util/io/TimeoutOutputStreamTest.java      |  286 ++++++++++++++++++++
 .../org/spearce/jgit/util/io/InterruptTimer.java   |  216 +++++++++++++++
 .../spearce/jgit/util/io/TimeoutInputStream.java   |  133 +++++++++
 .../spearce/jgit/util/io/TimeoutOutputStream.java  |  146 ++++++++++
 5 files changed, 968 insertions(+), 0 deletions(-)
 create mode 100644 org.spearce.jgit.test/tst/org/spearce/jgit/util/io/TimeoutInputStreamTest.java
 create mode 100644 org.spearce.jgit.test/tst/org/spearce/jgit/util/io/TimeoutOutputStreamTest.java
 create mode 100644 org.spearce.jgit/src/org/spearce/jgit/util/io/InterruptTimer.java
 create mode 100644 org.spearce.jgit/src/org/spearce/jgit/util/io/TimeoutInputStream.java
 create mode 100644 org.spearce.jgit/src/org/spearce/jgit/util/io/TimeoutOutputStream.java

diff --git a/org.spearce.jgit.test/tst/org/spearce/jgit/util/io/TimeoutInputStreamTest.java b/org.spearce.jgit.test/tst/org/spearce/jgit/util/io/TimeoutInputStreamTest.java
new file mode 100644
index 0000000..25eff9a
--- /dev/null
+++ b/org.spearce.jgit.test/tst/org/spearce/jgit/util/io/TimeoutInputStreamTest.java
@@ -0,0 +1,187 @@
+/*
+ * Copyright (C) 2009, Google Inc.
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ *   notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ *   copyright notice, this list of conditions and the following
+ *   disclaimer in the documentation and/or other materials provided
+ *   with the distribution.
+ *
+ * - Neither the name of the Git Development Community nor the
+ *   names of its contributors may be used to endorse or promote
+ *   products derived from this software without specific prior
+ *   written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
+ * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+ * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.spearce.jgit.util.io;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Arrays;
+import java.util.List;
+
+import org.spearce.jgit.util.NB;
+import org.spearce.jgit.util.io.InterruptTimer;
+import org.spearce.jgit.util.io.TimeoutInputStream;
+
+import junit.framework.TestCase;
+
+public class TimeoutInputStreamTest extends TestCase {
+	private static final int timeout = 250;
+
+	private PipedOutputStream out;
+
+	private PipedInputStream in;
+
+	private InterruptTimer timer;
+
+	private TimeoutInputStream is;
+
+	private long start;
+
+	protected void setUp() throws Exception {
+		super.setUp();
+		out = new PipedOutputStream();
+		in = new PipedInputStream(out);
+		timer = new InterruptTimer();
+		is = new TimeoutInputStream(in, timer);
+		is.setTimeout(timeout);
+	}
+
+	protected void tearDown() throws Exception {
+		close();
+		super.tearDown();
+	}
+
+	public void testTimeout_readByte_Success1() throws IOException {
+		out.write('a');
+		assertEquals('a', is.read());
+		close();
+	}
+
+	public void testTimeout_readByte_Success2() throws IOException {
+		final byte[] exp = new byte[] { 'a', 'b', 'c' };
+		out.write(exp);
+		assertEquals(exp[0], is.read());
+		assertEquals(exp[1], is.read());
+		assertEquals(exp[2], is.read());
+		close();
+	}
+
+	public void testTimeout_readByte_Timeout() throws IOException {
+		beginRead();
+		try {
+			is.read();
+			fail("incorrectly read a byte");
+		} catch (InterruptedIOException e) {
+			// expected
+		}
+		assertTimeout();
+		close();
+	}
+
+	public void testTimeout_readBuffer_Success1() throws IOException {
+		final byte[] exp = new byte[] { 'a', 'b', 'c' };
+		final byte[] act = new byte[exp.length];
+		out.write(exp);
+		NB.readFully(is, act, 0, act.length);
+		assertTrue(Arrays.equals(exp, act));
+		close();
+	}
+
+	public void testTimeout_readBuffer_Success2() throws IOException {
+		final byte[] exp = new byte[] { 'a', 'b', 'c' };
+		final byte[] act = new byte[exp.length];
+		out.write(exp);
+		NB.readFully(is, act, 0, 1);
+		NB.readFully(is, act, 1, 1);
+		NB.readFully(is, act, 2, 1);
+		assertTrue(Arrays.equals(exp, act));
+		close();
+	}
+
+	public void testTimeout_readBuffer_Timeout() throws IOException {
+		beginRead();
+		try {
+			is.read(new byte[512]);
+			fail("incorrectly read bytes");
+		} catch (InterruptedIOException e) {
+			// expected
+		}
+		assertTimeout();
+		close();
+	}
+
+	public void testTimeout_skip_Success() throws IOException {
+		final byte[] exp = new byte[] { 'a', 'b', 'c' };
+		out.write(exp);
+		assertEquals(2, is.skip(2));
+		assertEquals('c', is.read());
+		close();
+	}
+
+	public void testTimeout_skip_Timeout() throws IOException {
+		beginRead();
+		try {
+			is.skip(1024);
+			fail("incorrectly skipped bytes");
+		} catch (InterruptedIOException e) {
+			// expected
+		}
+		assertTimeout();
+		close();
+	}
+
+	private void beginRead() {
+		start = now();
+	}
+
+	private void assertTimeout() {
+		final long wait = now() - start;
+		assertTrue(Math.abs(wait - timeout) < 50);
+	}
+
+	private void close() {
+		timer.terminate();
+		for (Thread t : active())
+			assertFalse(t instanceof InterruptTimer.AlarmThread);
+	}
+
+	private static List<Thread> active() {
+		Thread[] all = new Thread[16];
+		int n = Thread.currentThread().getThreadGroup().enumerate(all);
+		while (n == all.length) {
+			all = new Thread[all.length * 2];
+			n = Thread.currentThread().getThreadGroup().enumerate(all);
+		}
+		return Arrays.asList(all).subList(0, n);
+	}
+
+	private static long now() {
+		return System.currentTimeMillis();
+	}
+}
diff --git a/org.spearce.jgit.test/tst/org/spearce/jgit/util/io/TimeoutOutputStreamTest.java b/org.spearce.jgit.test/tst/org/spearce/jgit/util/io/TimeoutOutputStreamTest.java
new file mode 100644
index 0000000..4b0656d
--- /dev/null
+++ b/org.spearce.jgit.test/tst/org/spearce/jgit/util/io/TimeoutOutputStreamTest.java
@@ -0,0 +1,286 @@
+/*
+ * Copyright (C) 2009, Google Inc.
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ *   notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ *   copyright notice, this list of conditions and the following
+ *   disclaimer in the documentation and/or other materials provided
+ *   with the distribution.
+ *
+ * - Neither the name of the Git Development Community nor the
+ *   names of its contributors may be used to endorse or promote
+ *   products derived from this software without specific prior
+ *   written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
+ * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+ * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.spearce.jgit.util.io;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Arrays;
+import java.util.List;
+
+import org.spearce.jgit.util.NB;
+import org.spearce.jgit.util.io.InterruptTimer;
+import org.spearce.jgit.util.io.TimeoutOutputStream;
+
+import junit.framework.TestCase;
+
+public class TimeoutOutputStreamTest extends TestCase {
+	private static final int timeout = 250;
+
+	private PipedOutputStream out;
+
+	private FullPipeInputStream in;
+
+	private InterruptTimer timer;
+
+	private TimeoutOutputStream os;
+
+	private long start;
+
+	protected void setUp() throws Exception {
+		super.setUp();
+		out = new PipedOutputStream();
+		in = new FullPipeInputStream(out);
+		timer = new InterruptTimer();
+		os = new TimeoutOutputStream(out, timer);
+		os.setTimeout(timeout);
+	}
+
+	protected void tearDown() throws Exception {
+		close();
+		super.tearDown();
+	}
+
+	public void testTimeout_writeByte_Success1() throws IOException {
+		in.free(1);
+		os.write('a');
+		in.want(1);
+		assertEquals('a', in.read());
+		close();
+	}
+
+	public void testTimeout_writeByte_Success2() throws IOException {
+		final byte[] exp = new byte[] { 'a', 'b', 'c' };
+		final byte[] act = new byte[exp.length];
+		in.free(exp.length);
+		os.write(exp[0]);
+		os.write(exp[1]);
+		os.write(exp[2]);
+		in.want(exp.length);
+		in.read(act);
+		assertTrue(Arrays.equals(exp, act));
+		close();
+	}
+
+	public void testTimeout_writeByte_Timeout() throws IOException {
+		beginWrite();
+		try {
+			os.write('\n');
+			fail("incorrectly write a byte");
+		} catch (InterruptedIOException e) {
+			// expected
+		}
+		assertTimeout();
+		close();
+	}
+
+	public void testTimeout_writeBuffer_Success1() throws IOException {
+		final byte[] exp = new byte[] { 'a', 'b', 'c' };
+		final byte[] act = new byte[exp.length];
+		in.free(exp.length);
+		os.write(exp);
+		in.want(exp.length);
+		in.read(act);
+		assertTrue(Arrays.equals(exp, act));
+		close();
+	}
+
+	public void testTimeout_writeBuffer_Timeout() throws IOException {
+		beginWrite();
+		try {
+			os.write(new byte[512]);
+			fail("incorrectly wrote bytes");
+		} catch (InterruptedIOException e) {
+			// expected
+		}
+		assertTimeout();
+		close();
+	}
+
+	public void testTimeout_flush_Success() throws IOException {
+		final boolean[] called = new boolean[1];
+		os = new TimeoutOutputStream(new OutputStream() {
+			@Override
+			public void write(int b) throws IOException {
+				fail("should not have written");
+			}
+
+			@Override
+			public void flush() throws IOException {
+				called[0] = true;
+			}
+		}, timer);
+		os.setTimeout(timeout);
+		os.flush();
+		assertTrue(called[0]);
+		close();
+	}
+
+	public void testTimeout_flush_Timeout() throws IOException {
+		final boolean[] called = new boolean[1];
+		os = new TimeoutOutputStream(new OutputStream() {
+			@Override
+			public void write(int b) throws IOException {
+				fail("should not have written");
+			}
+
+			@Override
+			public void flush() throws IOException {
+				called[0] = true;
+				for (;;) {
+					try {
+						Thread.sleep(1000);
+					} catch (InterruptedException e) {
+						throw new InterruptedIOException();
+					}
+				}
+			}
+		}, timer);
+		os.setTimeout(timeout);
+
+		beginWrite();
+		try {
+			os.flush();
+			fail("incorrectly flushed");
+		} catch (InterruptedIOException e) {
+			// expected
+		}
+		assertTimeout();
+		assertTrue(called[0]);
+		close();
+	}
+
+	public void testTimeout_close_Success() throws IOException {
+		final boolean[] called = new boolean[1];
+		os = new TimeoutOutputStream(new OutputStream() {
+			@Override
+			public void write(int b) throws IOException {
+				fail("should not have written");
+			}
+
+			@Override
+			public void close() throws IOException {
+				called[0] = true;
+			}
+		}, timer);
+		os.setTimeout(timeout);
+		os.close();
+		assertTrue(called[0]);
+		close();
+	}
+
+	public void testTimeout_close_Timeout() throws IOException {
+		final boolean[] called = new boolean[1];
+		os = new TimeoutOutputStream(new OutputStream() {
+			@Override
+			public void write(int b) throws IOException {
+				fail("should not have written");
+			}
+
+			@Override
+			public void close() throws IOException {
+				called[0] = true;
+				for (;;) {
+					try {
+						Thread.sleep(1000);
+					} catch (InterruptedException e) {
+						throw new InterruptedIOException();
+					}
+				}
+			}
+		}, timer);
+		os.setTimeout(timeout);
+
+		beginWrite();
+		try {
+			os.close();
+			fail("incorrectly closed");
+		} catch (InterruptedIOException e) {
+			// expected
+		}
+		assertTimeout();
+		assertTrue(called[0]);
+		close();
+	}
+
+	private void beginWrite() {
+		start = now();
+	}
+
+	private void assertTimeout() {
+		final long wait = now() - start;
+		assertTrue(Math.abs(wait - timeout) < 50);
+	}
+
+	private void close() {
+		timer.terminate();
+		for (Thread t : active())
+			assertFalse(t instanceof InterruptTimer.AlarmThread);
+	}
+
+	private static List<Thread> active() {
+		Thread[] all = new Thread[16];
+		int n = Thread.currentThread().getThreadGroup().enumerate(all);
+		while (n == all.length) {
+			all = new Thread[all.length * 2];
+			n = Thread.currentThread().getThreadGroup().enumerate(all);
+		}
+		return Arrays.asList(all).subList(0, n);
+	}
+
+	private static long now() {
+		return System.currentTimeMillis();
+	}
+
+	private final class FullPipeInputStream extends PipedInputStream {
+		FullPipeInputStream(PipedOutputStream src) throws IOException {
+			super(src);
+			src.write(new byte[PIPE_SIZE]);
+		}
+
+		void want(int cnt) throws IOException {
+			NB.skipFully(this, PIPE_SIZE - cnt);
+		}
+
+		void free(int cnt) throws IOException {
+			NB.skipFully(this, cnt);
+		}
+	}
+}
diff --git a/org.spearce.jgit/src/org/spearce/jgit/util/io/InterruptTimer.java b/org.spearce.jgit/src/org/spearce/jgit/util/io/InterruptTimer.java
new file mode 100644
index 0000000..8f625e0
--- /dev/null
+++ b/org.spearce.jgit/src/org/spearce/jgit/util/io/InterruptTimer.java
@@ -0,0 +1,216 @@
+/*
+ * Copyright (C) 2009, Google Inc.
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ *   notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ *   copyright notice, this list of conditions and the following
+ *   disclaimer in the documentation and/or other materials provided
+ *   with the distribution.
+ *
+ * - Neither the name of the Git Development Community nor the
+ *   names of its contributors may be used to endorse or promote
+ *   products derived from this software without specific prior
+ *   written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
+ * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+ * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.spearce.jgit.util.io;
+
+/**
+ * Triggers an interrupt on the calling thread if it doesn't complete a block.
+ * <p>
+ * Classes can use this to trip an alarm interrupting the calling thread if it
+ * doesn't complete a block within the specified timeout. Typical calling
+ * pattern is:
+ *
+ * <pre>
+ * private InterruptTimer myTimer = ...;
+ * void foo() {
+ *   try {
+ *     myTimer.begin(timeout);
+ *     // work
+ *   } finally {
+ *     myTimer.end();
+ *   }
+ * }
+ * </pre>
+ * <p>
+ * An InterruptTimer is not recursive. To implement recursive timers,
+ * independent InterruptTimer instances are required. A single InterruptTimer
+ * may be shared between objects which won't recursively call each other.
+ * <p>
+ * Each InterruptTimer spawns one background thread to sleep the specified time
+ * and interrupt the thread which called {@link #begin(int)}. It is up to the
+ * caller to ensure that the operations within the work block between the
+ * matched begin and end calls tests the interrupt flag (most IO operations do).
+ * <p>
+ * To terminate the background thread, use {@link #terminate()}. If the
+ * application fails to terminate the thread, it will (eventually) terminate
+ * itself when the InterruptTimer instance is garbage collected.
+ *
+ * @see TimeoutInputStream
+ */
+public final class InterruptTimer {
+	private final AlarmState state;
+
+	private final AlarmThread thread;
+
+	final AutoKiller autoKiller;
+
+	/** Create a new timer with a default thread name. */
+	public InterruptTimer() {
+		this("JGit-InterruptTimer");
+	}
+
+	/**
+	 * Create a new timer to signal on interrupt on the caller.
+	 * <p>
+	 * The timer thread is created in the calling thread's ThreadGroup.
+	 *
+	 * @param threadName
+	 *            name of the timer thread.
+	 */
+	public InterruptTimer(final String threadName) {
+		state = new AlarmState();
+		autoKiller = new AutoKiller(state);
+		thread = new AlarmThread(threadName, state);
+		thread.start();
+	}
+
+	/**
+	 * Arm the interrupt timer before entering a blocking operation.
+	 *
+	 * @param timeout
+	 *            number of milliseconds before the interrupt should trigger.
+	 *            Must be > 0.
+	 */
+	public void begin(final int timeout) {
+		if (timeout <= 0)
+			throw new IllegalArgumentException("Invalid timeout: " + timeout);
+		Thread.interrupted();
+		state.begin(timeout);
+	}
+
+	/** Disable the interrupt timer, as the operation is complete. */
+	public void end() {
+		state.end();
+	}
+
+	/** Shutdown the timer thread, and wait for it to terminate. */
+	public void terminate() {
+		state.terminate();
+		try {
+			thread.join();
+		} catch (InterruptedException e) {
+			//
+		}
+	}
+
+	static final class AlarmThread extends Thread {
+		AlarmThread(final String name, final AlarmState q) {
+			super(q);
+			setName(name);
+			setDaemon(true);
+		}
+	}
+
+	// The trick here is, the AlarmThread does not have a reference to the
+	// AutoKiller instance, only the InterruptTimer itself does. Thus when
+	// the InterruptTimer is GC'd, the AutoKiller is also unreachable and
+	// can be GC'd. When it gets finalized, it tells the AlarmThread to
+	// terminate, triggering the thread to exit gracefully.
+	//
+	private static final class AutoKiller {
+		private final AlarmState state;
+
+		AutoKiller(final AlarmState s) {
+			state = s;
+		}
+
+		@Override
+		protected void finalize() throws Throwable {
+			state.terminate();
+		}
+	}
+
+	static final class AlarmState implements Runnable {
+		private Thread callingThread;
+
+		private long deadline;
+
+		private boolean terminated;
+
+		AlarmState() {
+			callingThread = Thread.currentThread();
+		}
+
+		public synchronized void run() {
+			while (!terminated && callingThread.isAlive()) {
+				try {
+					if (0 < deadline) {
+						final long delay = deadline - now();
+						if (delay <= 0) {
+							deadline = 0;
+							callingThread.interrupt();
+						} else {
+							wait(delay);
+						}
+					} else {
+						wait(1000);
+					}
+				} catch (InterruptedException e) {
+					// Treat an interrupt as notice to examine state.
+				}
+			}
+		}
+
+		synchronized void begin(final int timeout) {
+			if (terminated)
+				throw new IllegalStateException("Timer already terminated");
+			callingThread = Thread.currentThread();
+			deadline = now() + timeout;
+			notifyAll();
+		}
+
+		synchronized void end() {
+			if (0 == deadline)
+				Thread.interrupted();
+			else
+				deadline = 0;
+			notifyAll();
+		}
+
+		synchronized void terminate() {
+			if (!terminated) {
+				deadline = 0;
+				terminated = true;
+				notifyAll();
+			}
+		}
+
+		private static long now() {
+			return System.currentTimeMillis();
+		}
+	}
+}
diff --git a/org.spearce.jgit/src/org/spearce/jgit/util/io/TimeoutInputStream.java b/org.spearce.jgit/src/org/spearce/jgit/util/io/TimeoutInputStream.java
new file mode 100644
index 0000000..3a321aa
--- /dev/null
+++ b/org.spearce.jgit/src/org/spearce/jgit/util/io/TimeoutInputStream.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright (C) 2009, Google Inc.
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ *   notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ *   copyright notice, this list of conditions and the following
+ *   disclaimer in the documentation and/or other materials provided
+ *   with the distribution.
+ *
+ * - Neither the name of the Git Development Community nor the
+ *   names of its contributors may be used to endorse or promote
+ *   products derived from this software without specific prior
+ *   written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
+ * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+ * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.spearce.jgit.util.io;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+
+/** InputStream with a configurable timeout. */
+public class TimeoutInputStream extends FilterInputStream {
+	private final InterruptTimer myTimer;
+
+	private int timeout;
+
+	/**
+	 * Wrap an input stream with a timeout on all read operations.
+	 *
+	 * @param src
+	 *            base input stream (to read from). The stream must be
+	 *            interruptible (most socket streams are).
+	 * @param timer
+	 *            timer to manage the timeouts during reads.
+	 */
+	public TimeoutInputStream(final InputStream src,
+			final InterruptTimer timer) {
+		super(src);
+		myTimer = timer;
+	}
+
+	/** @return number of milliseconds before aborting a read. */
+	public int getTimeout() {
+		return timeout;
+	}
+
+	/**
+	 * @param millis
+	 *            number of milliseconds before aborting a read. Must be > 0.
+	 */
+	public void setTimeout(final int millis) {
+		if (millis < 0)
+			throw new IllegalArgumentException("Invalid timeout: " + millis);
+		timeout = millis;
+	}
+
+	@Override
+	public int read() throws IOException {
+		try {
+			beginRead();
+			return super.read();
+		} catch (InterruptedIOException e) {
+			throw readTimedOut();
+		} finally {
+			endRead();
+		}
+	}
+
+	@Override
+	public int read(byte[] buf) throws IOException {
+		return read(buf, 0, buf.length);
+	}
+
+	@Override
+	public int read(byte[] buf, int off, int cnt) throws IOException {
+		try {
+			beginRead();
+			return super.read(buf, off, cnt);
+		} catch (InterruptedIOException e) {
+			throw readTimedOut();
+		} finally {
+			endRead();
+		}
+	}
+
+	@Override
+	public long skip(long cnt) throws IOException {
+		try {
+			beginRead();
+			return super.skip(cnt);
+		} catch (InterruptedIOException e) {
+			throw readTimedOut();
+		} finally {
+			endRead();
+		}
+	}
+
+	private void beginRead() {
+		myTimer.begin(timeout);
+	}
+
+	private void endRead() {
+		myTimer.end();
+	}
+
+	private static InterruptedIOException readTimedOut() {
+		return new InterruptedIOException("Read timed out");
+	}
+}
diff --git a/org.spearce.jgit/src/org/spearce/jgit/util/io/TimeoutOutputStream.java b/org.spearce.jgit/src/org/spearce/jgit/util/io/TimeoutOutputStream.java
new file mode 100644
index 0000000..014cd92
--- /dev/null
+++ b/org.spearce.jgit/src/org/spearce/jgit/util/io/TimeoutOutputStream.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright (C) 2009, Google Inc.
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ *   notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ *   copyright notice, this list of conditions and the following
+ *   disclaimer in the documentation and/or other materials provided
+ *   with the distribution.
+ *
+ * - Neither the name of the Git Development Community nor the
+ *   names of its contributors may be used to endorse or promote
+ *   products derived from this software without specific prior
+ *   written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
+ * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+ * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.spearce.jgit.util.io;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+
+/** OutputStream with a configurable timeout. */
+public class TimeoutOutputStream extends OutputStream {
+	private final OutputStream dst;
+
+	private final InterruptTimer myTimer;
+
+	private int timeout;
+
+	/**
+	 * Wrap an output stream with a timeout on all write operations.
+	 *
+	 * @param destination
+	 *            base input stream (to write to). The stream must be
+	 *            interruptible (most socket streams are).
+	 * @param timer
+	 *            timer to manage the timeouts during writes.
+	 */
+	public TimeoutOutputStream(final OutputStream destination,
+			final InterruptTimer timer) {
+		dst = destination;
+		myTimer = timer;
+	}
+
+	/** @return number of milliseconds before aborting a write. */
+	public int getTimeout() {
+		return timeout;
+	}
+
+	/**
+	 * @param millis
+	 *            number of milliseconds before aborting a write. Must be > 0.
+	 */
+	public void setTimeout(final int millis) {
+		if (millis < 0)
+			throw new IllegalArgumentException("Invalid timeout: " + millis);
+		timeout = millis;
+	}
+
+	@Override
+	public void write(int b) throws IOException {
+		try {
+			beginWrite();
+			dst.write(b);
+		} catch (InterruptedIOException e) {
+			throw writeTimedOut();
+		} finally {
+			endWrite();
+		}
+	}
+
+	@Override
+	public void write(byte[] buf) throws IOException {
+		write(buf, 0, buf.length);
+	}
+
+	@Override
+	public void write(byte[] buf, int off, int len) throws IOException {
+		try {
+			beginWrite();
+			dst.write(buf, off, len);
+		} catch (InterruptedIOException e) {
+			throw writeTimedOut();
+		} finally {
+			endWrite();
+		}
+	}
+
+	@Override
+	public void flush() throws IOException {
+		try {
+			beginWrite();
+			dst.flush();
+		} catch (InterruptedIOException e) {
+			throw writeTimedOut();
+		} finally {
+			endWrite();
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		try {
+			beginWrite();
+			dst.close();
+		} catch (InterruptedIOException e) {
+			throw writeTimedOut();
+		} finally {
+			endWrite();
+		}
+	}
+
+	private void beginWrite() {
+		myTimer.begin(timeout);
+	}
+
+	private void endWrite() {
+		myTimer.end();
+	}
+
+	private static InterruptedIOException writeTimedOut() {
+		return new InterruptedIOException("Write timed out");
+	}
+}
-- 
1.6.3.2.416.g04d0

^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [JGIT PATCH 2/6] Add remote.name.timeout to configure an IO timeout
  2009-06-19 21:27 ` [JGIT PATCH 1/6] Create input and output streams that have timeouts Shawn O. Pearce
@ 2009-06-19 21:27   ` Shawn O. Pearce
  2009-06-19 21:27     ` [JGIT PATCH 3/6] Add timeouts to smart transport protocol clients Shawn O. Pearce
  2009-06-20 22:28     ` [JGIT PATCH 2/6] Add remote.name.timeout to configure an IO timeout Robin Rosenberg
  2009-06-22 21:09   ` [JGIT PATCH 1/6] Create input and output streams that have timeouts Robin Rosenberg
  1 sibling, 2 replies; 11+ messages in thread
From: Shawn O. Pearce @ 2009-06-19 21:27 UTC (permalink / raw)
  To: Robin Rosenberg; +Cc: git

An IO timeout can be useful if the remote peer stops responding,
and the user wants the application to abort rather than block
indefinitely waiting for more input.

This is a JGit specific extension to the standard remote format.

Signed-off-by: Shawn O. Pearce <spearce@spearce.org>
---
 .../spearce/jgit/transport/RemoteConfigTest.java   |   26 ++++++++++++++++
 .../org/spearce/jgit/transport/RemoteConfig.java   |   31 ++++++++++++++++++++
 2 files changed, 57 insertions(+), 0 deletions(-)

diff --git a/org.spearce.jgit.test/tst/org/spearce/jgit/transport/RemoteConfigTest.java b/org.spearce.jgit.test/tst/org/spearce/jgit/transport/RemoteConfigTest.java
index 6b72b64..3965bdb 100644
--- a/org.spearce.jgit.test/tst/org/spearce/jgit/transport/RemoteConfigTest.java
+++ b/org.spearce.jgit.test/tst/org/spearce/jgit/transport/RemoteConfigTest.java
@@ -78,6 +78,7 @@ writeConfig("[remote \"spearce\"]\n"
 		assertNotNull(rc.getFetchRefSpecs());
 		assertNotNull(rc.getPushRefSpecs());
 		assertNotNull(rc.getTagOpt());
+		assertEquals(0, rc.getTimeout());
 		assertSame(TagOpt.AUTO_FOLLOW, rc.getTagOpt());
 
 		assertEquals(1, allURIs.size());
@@ -423,4 +424,29 @@ checkFile(new File(db.getDirectory(), "config"), "[core]\n"
 				+ "\tfetch = +refs/heads/*:refs/remotes/origin/*\n"
 				+ "\ttagopt = --tags\n");
 	}
+
+	public void testSimpleTimeout() throws Exception {
+		writeConfig("[remote \"spearce\"]\n"
+				+ "url = http://www.spearce.org/egit.git\n"
+				+ "fetch = +refs/heads/*:refs/remotes/spearce/*\n"
+				+ "timeout = 12\n");
+		final RemoteConfig rc = new RemoteConfig(db.getConfig(), "spearce");
+		assertEquals(12, rc.getTimeout());
+	}
+
+	public void testSaveTimeout() throws Exception {
+		final RemoteConfig rc = new RemoteConfig(db.getConfig(), "origin");
+		rc.addURI(new URIish("/some/dir"));
+		rc.addFetchRefSpec(new RefSpec("+refs/heads/*:refs/remotes/"
+				+ rc.getName() + "/*"));
+		rc.setTimeout(60);
+		rc.update(db.getConfig());
+		db.getConfig().save();
+
+		checkFile(new File(db.getDirectory(), "config"), "[core]\n"
+				+ "\trepositoryformatversion = 0\n" + "\tfilemode = true\n"
+				+ "[remote \"origin\"]\n" + "\turl = /some/dir\n"
+				+ "\tfetch = +refs/heads/*:refs/remotes/origin/*\n"
+				+ "\ttimeout = 60\n");
+	}
 }
diff --git a/org.spearce.jgit/src/org/spearce/jgit/transport/RemoteConfig.java b/org.spearce.jgit/src/org/spearce/jgit/transport/RemoteConfig.java
index 519a8a5..a621dc4 100644
--- a/org.spearce.jgit/src/org/spearce/jgit/transport/RemoteConfig.java
+++ b/org.spearce.jgit/src/org/spearce/jgit/transport/RemoteConfig.java
@@ -70,6 +70,8 @@
 
 	private static final String KEY_MIRROR = "mirror";
 
+	private static final String KEY_TIMEOUT = "timeout";
+
 	private static final boolean DEFAULT_MIRROR = false;
 
 	/** Default value for {@link #getUploadPack()} if not specified. */
@@ -120,6 +122,8 @@
 
 	private boolean mirror;
 
+	private int timeout;
+
 	/**
 	 * Parse a remote block from an existing configuration file.
 	 * <p>
@@ -170,6 +174,7 @@ public RemoteConfig(final RepositoryConfig rc, final String remoteName)
 		val = rc.getString(SECTION, name, KEY_TAGOPT);
 		tagopt = TagOpt.fromOption(val);
 		mirror = rc.getBoolean(SECTION, name, KEY_MIRROR, DEFAULT_MIRROR);
+		timeout = rc.getInt(SECTION, name, KEY_TIMEOUT, 0);
 	}
 
 	/**
@@ -200,6 +205,7 @@ public void update(final RepositoryConfig rc) {
 		set(rc, KEY_RECEIVEPACK, getReceivePack(), DEFAULT_RECEIVE_PACK);
 		set(rc, KEY_TAGOPT, getTagOpt().option(), TagOpt.AUTO_FOLLOW.option());
 		set(rc, KEY_MIRROR, mirror, DEFAULT_MIRROR);
+		set(rc, KEY_TIMEOUT, timeout, 0);
 	}
 
 	private void set(final RepositoryConfig rc, final String key,
@@ -218,6 +224,14 @@ private void set(final RepositoryConfig rc, final String key,
 			rc.setBoolean(SECTION, getName(), key, currentValue);
 	}
 
+	private void set(final RepositoryConfig rc, final String key,
+			final int currentValue, final int defaultValue) {
+		if (defaultValue == currentValue)
+			unset(rc, key);
+		else
+			rc.setInt(SECTION, getName(), key, currentValue);
+	}
+
 	private void unset(final RepositoryConfig rc, final String key) {
 		rc.unsetString(SECTION, getName(), key);
 	}
@@ -420,4 +434,21 @@ public boolean isMirror() {
 	public void setMirror(final boolean m) {
 		mirror = m;
 	}
+
+	/** @return timeout (in seconds) before aborting an IO operation. */
+	public int getTimeout() {
+		return timeout;
+	}
+
+	/**
+	 * Set the timeout before willing to abort an IO call.
+	 *
+	 * @param seconds
+	 *            number of seconds to wait (with no data transfer occurring)
+	 *            before aborting an IO read or write operation with this
+	 *            remote.  A timeout of 0 will block indefinitely.
+	 */
+	public void setTimeout(final int seconds) {
+		timeout = seconds;
+	}
 }
-- 
1.6.3.2.416.g04d0

^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [JGIT PATCH 3/6] Add timeouts to smart transport protocol clients
  2009-06-19 21:27   ` [JGIT PATCH 2/6] Add remote.name.timeout to configure an IO timeout Shawn O. Pearce
@ 2009-06-19 21:27     ` Shawn O. Pearce
  2009-06-19 21:27       ` [JGIT PATCH 4/6] Add timeouts to smart transport protocol servers Shawn O. Pearce
  2009-06-20 22:28     ` [JGIT PATCH 2/6] Add remote.name.timeout to configure an IO timeout Robin Rosenberg
  1 sibling, 1 reply; 11+ messages in thread
From: Shawn O. Pearce @ 2009-06-19 21:27 UTC (permalink / raw)
  To: Robin Rosenberg; +Cc: git

For both directions (input and output) we start a background thread
to interrupt the main processing thread if a read or write event on
the low-level stream does not complete in the configured timeout.
On timeout, we abort the transaction entirely.  Any sort of short
read or write doesn't need to be handled.

JSch made a timeout on write difficult because they explicitly do
a catch for InterruptedException inside of their OutputStream.  We
have to work around that by creating an additional thread that just
shuttles data between our own OutputStream and the real JSch stream.
Our OutputStream can be interrupted, which causes it to close, which
in turn closes the JSch Channel and Session.  That breaks our little
copy thread out of the JSch OutputStream, letting it terminate.  Its
not pretty, but its the best we can do with current versions of JSch.

During a push receive-pack stalls while it processes the deltas in
the received data and indexes the new pack on disk, or unpacks the
data into loose objects.  During this stall we receive no network
transfer from the remote side, so we cannot know if the peer has
deadlocked with us, or is progressing normally.  Since the time is
usually proportional to the data sent we give the remote side up to
10x the time we spent transmitting data, or our configured timeout,
hoping it will be able to complete and return a pack success or fail
status report before we timeout.

Signed-off-by: Shawn O. Pearce <spearce@spearce.org>
---
 .../spearce/jgit/transport/BasePackConnection.java |   36 +++++++-
 .../jgit/transport/BasePackPushConnection.java     |   29 ++++++-
 .../org/spearce/jgit/transport/SshTransport.java   |    3 +-
 .../src/org/spearce/jgit/transport/Transport.java  |   21 ++++
 .../spearce/jgit/transport/TransportGitAnon.java   |   13 +++-
 .../spearce/jgit/transport/TransportGitSsh.java    |  103 +++++++++++++++++++-
 .../org/spearce/jgit/transport/TransportLocal.java |    9 ++-
 .../org/spearce/jgit/transport/TransportSftp.java  |    3 +-
 8 files changed, 207 insertions(+), 10 deletions(-)

diff --git a/org.spearce.jgit/src/org/spearce/jgit/transport/BasePackConnection.java b/org.spearce.jgit/src/org/spearce/jgit/transport/BasePackConnection.java
index 0382d2b..d759fc8 100644
--- a/org.spearce.jgit/src/org/spearce/jgit/transport/BasePackConnection.java
+++ b/org.spearce.jgit/src/org/spearce/jgit/transport/BasePackConnection.java
@@ -55,6 +55,9 @@
 import org.spearce.jgit.lib.ObjectId;
 import org.spearce.jgit.lib.Ref;
 import org.spearce.jgit.lib.Repository;
+import org.spearce.jgit.util.io.InterruptTimer;
+import org.spearce.jgit.util.io.TimeoutInputStream;
+import org.spearce.jgit.util.io.TimeoutOutputStream;
 
 /**
  * Base helper class for pack-based operations implementations. Provides partial
@@ -75,6 +78,15 @@
 	/** A transport connected to {@link #uri}. */
 	protected final Transport transport;
 
+	/** Low-level input stream, if a timeout was configured. */
+	protected TimeoutInputStream timeoutIn;
+
+	/** Low-level output stream, if a timeout was configured. */
+	protected TimeoutOutputStream timeoutOut;
+
+	/** Timer to manage {@link #timeoutIn} and {@link #timeoutOut}. */
+	private InterruptTimer myTimer;
+
 	/** Buffered input stream reading from the remote. */
 	protected InputStream in;
 
@@ -102,7 +114,19 @@ BasePackConnection(final PackTransport packTransport) {
 		uri = transport.uri;
 	}
 
-	protected void init(final InputStream myIn, final OutputStream myOut) {
+	protected final void init(InputStream myIn, OutputStream myOut) {
+		final int timeout = transport.getTimeout();
+		if (timeout > 0) {
+			final Thread caller = Thread.currentThread();
+			myTimer = new InterruptTimer(caller.getName() + "-Timer");
+			timeoutIn = new TimeoutInputStream(myIn, myTimer);
+			timeoutOut = new TimeoutOutputStream(myOut, myTimer);
+			timeoutIn.setTimeout(timeout * 1000);
+			timeoutOut.setTimeout(timeout * 1000);
+			myIn = timeoutIn;
+			myOut = timeoutOut;
+		}
+
 		in = myIn instanceof BufferedInputStream ? myIn
 				: new BufferedInputStream(myIn, IndexPack.BUFFER_SIZE);
 		out = myOut instanceof BufferedOutputStream ? myOut
@@ -241,5 +265,15 @@ public void close() {
 				pckIn = null;
 			}
 		}
+
+		if (myTimer != null) {
+			try {
+				myTimer.terminate();
+			} finally {
+				myTimer = null;
+				timeoutIn = null;
+				timeoutOut = null;
+			}
+		}
 	}
 }
diff --git a/org.spearce.jgit/src/org/spearce/jgit/transport/BasePackPushConnection.java b/org.spearce.jgit/src/org/spearce/jgit/transport/BasePackPushConnection.java
index 712d3c0..2a94dfc 100644
--- a/org.spearce.jgit/src/org/spearce/jgit/transport/BasePackPushConnection.java
+++ b/org.spearce.jgit/src/org/spearce/jgit/transport/BasePackPushConnection.java
@@ -92,6 +92,9 @@
 
 	private boolean writePack;
 
+	/** Time in milliseconds spent transferring the pack data. */
+	private long packTransferTime;
+
 	BasePackPushConnection(final PackTransport packTransport) {
 		super(packTransport);
 		thinPack = transport.isPushThin();
@@ -209,12 +212,14 @@ private void writePack(final Map<String, RemoteRefUpdate> refUpdates,
 		writer.setThin(thinPack);
 		writer.setDeltaBaseAsOffset(capableOfsDelta);
 		writer.preparePack(newObjects, remoteObjects);
+		final long start = System.currentTimeMillis();
 		writer.writePack(out);
+		packTransferTime = System.currentTimeMillis() - start;
 	}
 
 	private void readStatusReport(final Map<String, RemoteRefUpdate> refUpdates)
 			throws IOException {
-		final String unpackLine = pckIn.readString();
+		final String unpackLine = readStringLongTimeout();
 		if (!unpackLine.startsWith("unpack "))
 			throw new PackProtocolException(uri, "unexpected report line: "
 					+ unpackLine);
@@ -260,4 +265,26 @@ private void readStatusReport(final Map<String, RemoteRefUpdate> refUpdates)
 						+ " not received");
 		}
 	}
+
+	private String readStringLongTimeout() throws IOException {
+		if (timeoutIn == null)
+			return pckIn.readString();
+
+		// The remote side may need a lot of time to choke down the pack
+		// we just sent them. There may be many deltas that need to be
+		// resolved by the remote. Its hard to say how long the other
+		// end is going to be silent. Taking 10x the configured timeout
+		// or the time spent transferring the pack, whichever is larger,
+		// gives the other side some reasonable window to process the data,
+		// but this is just a wild guess.
+		//
+		final int oldTimeout = timeoutIn.getTimeout();
+		final int sendTime = (int) Math.min(packTransferTime, 28800000L);
+		try {
+			timeoutIn.setTimeout(10 * Math.max(sendTime, oldTimeout));
+			return pckIn.readString();
+		} finally {
+			timeoutIn.setTimeout(oldTimeout);
+		}
+	}
 }
diff --git a/org.spearce.jgit/src/org/spearce/jgit/transport/SshTransport.java b/org.spearce.jgit/src/org/spearce/jgit/transport/SshTransport.java
index 127096c..d45a83f 100644
--- a/org.spearce.jgit/src/org/spearce/jgit/transport/SshTransport.java
+++ b/org.spearce.jgit/src/org/spearce/jgit/transport/SshTransport.java
@@ -111,6 +111,7 @@ protected void initSession() throws TransportException {
 		if (sock != null)
 			return;
 
+		final int tms = getTimeout() > 0 ? getTimeout() * 1000 : 0;
 		final String user = uri.getUser();
 		final String pass = uri.getPass();
 		final String host = uri.getHost();
@@ -118,7 +119,7 @@ protected void initSession() throws TransportException {
 		try {
 			sock = sch.getSession(user, pass, host, port);
 			if (!sock.isConnected())
-				sock.connect();
+				sock.connect(tms);
 		} catch (JSchException je) {
 			final Throwable c = je.getCause();
 			if (c instanceof UnknownHostException)
diff --git a/org.spearce.jgit/src/org/spearce/jgit/transport/Transport.java b/org.spearce.jgit/src/org/spearce/jgit/transport/Transport.java
index 1068f50..a6210a5 100644
--- a/org.spearce.jgit/src/org/spearce/jgit/transport/Transport.java
+++ b/org.spearce.jgit/src/org/spearce/jgit/transport/Transport.java
@@ -376,6 +376,9 @@ private static String findTrackingRefName(final String remoteName,
 	/** Should refs no longer on the source be pruned from the destination? */
 	private boolean removeDeletedRefs;
 
+	/** Timeout in seconds to wait before aborting an IO read or write. */
+	private int timeout;
+
 	/**
 	 * Create a new transport instance.
 	 * 
@@ -572,6 +575,7 @@ public void applyConfig(final RemoteConfig cfg) {
 		setTagOpt(cfg.getTagOpt());
 		optionReceivePack = cfg.getReceivePack();
 		push = cfg.getPushRefSpecs();
+		timeout = cfg.getTimeout();
 	}
 
 	/**
@@ -595,6 +599,23 @@ public void setDryRun(final boolean dryRun) {
 		this.dryRun = dryRun;
 	}
 
+	/** @return timeout (in seconds) before aborting an IO operation. */
+	public int getTimeout() {
+		return timeout;
+	}
+
+	/**
+	 * Set the timeout before willing to abort an IO call.
+	 *
+	 * @param seconds
+	 *            number of seconds to wait (with no data transfer occurring)
+	 *            before aborting an IO read or write operation with this
+	 *            remote.
+	 */
+	public void setTimeout(final int seconds) {
+		timeout = seconds;
+	}
+
 	/**
 	 * Fetch objects and refs from the remote repository to the local one.
 	 * <p>
diff --git a/org.spearce.jgit/src/org/spearce/jgit/transport/TransportGitAnon.java b/org.spearce.jgit/src/org/spearce/jgit/transport/TransportGitAnon.java
index 0c80b9d..e23fe3b 100644
--- a/org.spearce.jgit/src/org/spearce/jgit/transport/TransportGitAnon.java
+++ b/org.spearce.jgit/src/org/spearce/jgit/transport/TransportGitAnon.java
@@ -42,6 +42,7 @@
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.UnknownHostException;
 
@@ -82,16 +83,26 @@ public void close() {
 	}
 
 	Socket openConnection() throws TransportException {
+		final int tms = getTimeout() > 0 ? getTimeout() * 1000 : 0;
 		final int port = uri.getPort() > 0 ? uri.getPort() : GIT_PORT;
+		final Socket s = new Socket();
 		try {
-			return new Socket(InetAddress.getByName(uri.getHost()), port);
+			final InetAddress host = InetAddress.getByName(uri.getHost());
+			s.bind(null);
+			s.connect(new InetSocketAddress(host, port), tms);
 		} catch (IOException c) {
+			try {
+				s.close();
+			} catch (IOException closeErr) {
+				// ignore a failure during close, we're already failing
+			}
 			if (c instanceof UnknownHostException)
 				throw new TransportException(uri, "unknown host");
 			if (c instanceof ConnectException)
 				throw new TransportException(uri, c.getMessage());
 			throw new TransportException(uri, c.getMessage(), c);
 		}
+		return s;
 	}
 
 	void service(final String name, final PacketLineOut pckOut)
diff --git a/org.spearce.jgit/src/org/spearce/jgit/transport/TransportGitSsh.java b/org.spearce.jgit/src/org/spearce/jgit/transport/TransportGitSsh.java
index de72d02..bde641f 100644
--- a/org.spearce.jgit/src/org/spearce/jgit/transport/TransportGitSsh.java
+++ b/org.spearce.jgit/src/org/spearce/jgit/transport/TransportGitSsh.java
@@ -40,7 +40,11 @@
 package org.spearce.jgit.transport;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
 import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
 
 import org.spearce.jgit.errors.NoRemoteRepositoryException;
 import org.spearce.jgit.errors.TransportException;
@@ -120,6 +124,7 @@ private static void sq(final StringBuilder cmd, final String val) {
 	ChannelExec exec(final String exe) throws TransportException {
 		initSession();
 
+		final int tms = getTimeout() > 0 ? getTimeout() * 1000 : 0;
 		try {
 			final ChannelExec channel = (ChannelExec) sock.openChannel("exec");
 			String path = uri.getPath();
@@ -139,7 +144,7 @@ ChannelExec exec(final String exe) throws TransportException {
 			channel.setCommand(cmd.toString());
 			errStream = createErrorStream();
 			channel.setErrStream(errStream, true);
-			channel.connect();
+			channel.connect(tms);
 			return channel;
 		} catch (JSchException je) {
 			throw new TransportException(uri, je.getMessage(), je);
@@ -198,6 +203,98 @@ NoRemoteRepositoryException cleanNotFound(NoRemoteRepositoryException nf) {
 		return new NoRemoteRepositoryException(uri, why);
 	}
 
+	// JSch won't let us interrupt writes when we use our InterruptTimer to
+	// break out of a long-running write operation. To work around that we
+	// spawn a background thread to shuttle data through a pipe, as we can
+	// issue an interrupted write out of that. Its slower, so we only use
+	// this route if there is a timeout.
+	//
+	private OutputStream outputStream(ChannelExec channel) throws IOException {
+		final OutputStream out = channel.getOutputStream();
+		if (getTimeout() <= 0)
+			return out;
+		final PipedInputStream pipeIn = new PipedInputStream();
+		final CopyThread copyThread = new CopyThread(pipeIn, out);
+		final PipedOutputStream pipeOut = new PipedOutputStream(pipeIn) {
+			@Override
+			public void flush() throws IOException {
+				super.flush();
+				copyThread.flush();
+			}
+
+			@Override
+			public void close() throws IOException {
+				super.close();
+				try {
+					copyThread.join(getTimeout() * 1000);
+				} catch (InterruptedException e) {
+					// Just wake early, the thread will terminate anyway.
+				}
+			}
+		};
+		copyThread.start();
+		return pipeOut;
+	}
+
+	private static class CopyThread extends Thread {
+		private final InputStream src;
+
+		private final OutputStream dst;
+
+		private volatile boolean doFlush;
+
+		CopyThread(final InputStream i, final OutputStream o) {
+			setName(Thread.currentThread().getName() + "-Output");
+			src = i;
+			dst = o;
+		}
+
+		void flush() {
+			if (!doFlush) {
+				doFlush = true;
+				interrupt();
+			}
+		}
+
+		@Override
+		public void run() {
+			try {
+				final byte[] buf = new byte[1024];
+				for (;;) {
+					try {
+						if (doFlush) {
+							doFlush = false;
+							dst.flush();
+						}
+
+						final int n;
+						try {
+							n = src.read(buf);
+						} catch (InterruptedIOException wakey) {
+							continue;
+						}
+						if (n < 0)
+							break;
+						dst.write(buf, 0, n);
+					} catch (IOException e) {
+						break;
+					}
+				}
+			} finally {
+				try {
+					src.close();
+				} catch (IOException e) {
+					// Ignore IO errors on close
+				}
+				try {
+					dst.close();
+				} catch (IOException e) {
+					// Ignore IO errors on close
+				}
+			}
+		}
+	}
+
 	class SshFetchConnection extends BasePackFetchConnection {
 		private ChannelExec channel;
 
@@ -207,7 +304,7 @@ SshFetchConnection() throws TransportException {
 				channel = exec(getOptionUploadPack());
 
 				if (channel.isConnected())
-					init(channel.getInputStream(), channel.getOutputStream());
+					init(channel.getInputStream(), outputStream(channel));
 				else
 					throw new TransportException(uri, errStream.toString());
 
@@ -251,7 +348,7 @@ SshPushConnection() throws TransportException {
 				channel = exec(getOptionReceivePack());
 
 				if (channel.isConnected())
-					init(channel.getInputStream(), channel.getOutputStream());
+					init(channel.getInputStream(), outputStream(channel));
 				else
 					throw new TransportException(uri, errStream.toString());
 
diff --git a/org.spearce.jgit/src/org/spearce/jgit/transport/TransportLocal.java b/org.spearce.jgit/src/org/spearce/jgit/transport/TransportLocal.java
index 428f73e..8e8e8d5 100644
--- a/org.spearce.jgit/src/org/spearce/jgit/transport/TransportLocal.java
+++ b/org.spearce.jgit/src/org/spearce/jgit/transport/TransportLocal.java
@@ -43,6 +43,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
 
@@ -240,7 +241,9 @@ public void close() {
 		ForkLocalFetchConnection() throws TransportException {
 			super(TransportLocal.this);
 			uploadPack = startProcessWithErrStream(getOptionUploadPack());
-			init(uploadPack.getInputStream(), uploadPack.getOutputStream());
+			final InputStream upIn = uploadPack.getInputStream();
+			final OutputStream upOut = uploadPack.getOutputStream();
+			init(upIn, upOut);
 			readAdvertisedRefs();
 		}
 
@@ -343,7 +346,9 @@ public void close() {
 		ForkLocalPushConnection() throws TransportException {
 			super(TransportLocal.this);
 			receivePack = startProcessWithErrStream(getOptionReceivePack());
-			init(receivePack.getInputStream(), receivePack.getOutputStream());
+			final InputStream rpIn = receivePack.getInputStream();
+			final OutputStream rpOut = receivePack.getOutputStream();
+			init(rpIn, rpOut);
 			readAdvertisedRefs();
 		}
 
diff --git a/org.spearce.jgit/src/org/spearce/jgit/transport/TransportSftp.java b/org.spearce.jgit/src/org/spearce/jgit/transport/TransportSftp.java
index e18d128..57a6b09 100644
--- a/org.spearce.jgit/src/org/spearce/jgit/transport/TransportSftp.java
+++ b/org.spearce.jgit/src/org/spearce/jgit/transport/TransportSftp.java
@@ -112,9 +112,10 @@ public PushConnection openPush() throws TransportException {
 	ChannelSftp newSftp() throws TransportException {
 		initSession();
 
+		final int tms = getTimeout() > 0 ? getTimeout() * 1000 : 0;
 		try {
 			final Channel channel = sock.openChannel("sftp");
-			channel.connect();
+			channel.connect(tms);
 			return (ChannelSftp) channel;
 		} catch (JSchException je) {
 			throw new TransportException(uri, je.getMessage(), je);
-- 
1.6.3.2.416.g04d0

^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [JGIT PATCH 4/6] Add timeouts to smart transport protocol servers
  2009-06-19 21:27     ` [JGIT PATCH 3/6] Add timeouts to smart transport protocol clients Shawn O. Pearce
@ 2009-06-19 21:27       ` Shawn O. Pearce
  2009-06-19 21:27         ` [JGIT PATCH 5/6] Add timeouts to anonymous git:// daemon Shawn O. Pearce
  0 siblings, 1 reply; 11+ messages in thread
From: Shawn O. Pearce @ 2009-06-19 21:27 UTC (permalink / raw)
  To: Robin Rosenberg; +Cc: git

Like with the client side support, we spawn a background thread and
use that to wake up the real service thread if it blocks too long in
a read or write operation.  Typically this sort of long running IO
indicates the client is not responding, and the server should abort
its transaction and disconnect the client.

Like with the push client, the push server doesn't know when the
client will be done computing its pack file and start sending it
to the server.  Consequently we aren't entirely sure when we can
safely say the client is dead, vs. the client is just busy doing
its local compression work before transmitting.  By upping our
timeout to 10x the originally configured value we can give the
client a reasonable chance to finish packing data before we do
wind up aborting.

Signed-off-by: Shawn O. Pearce <spearce@spearce.org>
---
 .../org/spearce/jgit/transport/ReceivePack.java    |   57 +++++++++++++++++++
 .../src/org/spearce/jgit/transport/UploadPack.java |   59 ++++++++++++++++++--
 2 files changed, 110 insertions(+), 6 deletions(-)

diff --git a/org.spearce.jgit/src/org/spearce/jgit/transport/ReceivePack.java b/org.spearce.jgit/src/org/spearce/jgit/transport/ReceivePack.java
index c92a903..16b0c57 100644
--- a/org.spearce.jgit/src/org/spearce/jgit/transport/ReceivePack.java
+++ b/org.spearce.jgit/src/org/spearce/jgit/transport/ReceivePack.java
@@ -69,6 +69,9 @@
 import org.spearce.jgit.revwalk.RevObject;
 import org.spearce.jgit.revwalk.RevWalk;
 import org.spearce.jgit.transport.ReceiveCommand.Result;
+import org.spearce.jgit.util.io.InterruptTimer;
+import org.spearce.jgit.util.io.TimeoutInputStream;
+import org.spearce.jgit.util.io.TimeoutOutputStream;
 
 /**
  * Implements the server side of a push connection, receiving objects.
@@ -109,6 +112,14 @@
 	/** Hook to report on the commands after execution. */
 	private PostReceiveHook postReceive;
 
+	/** Timeout in seconds to wait for client interaction. */
+	private int timeout;
+
+	/** Timer to manage {@link #timeout}. */
+	private InterruptTimer timer;
+
+	private TimeoutInputStream timeoutIn;
+
 	private InputStream rawIn;
 
 	private OutputStream rawOut;
@@ -297,6 +308,23 @@ public void setPostReceiveHook(final PostReceiveHook h) {
 		postReceive = h != null ? h : PostReceiveHook.NULL;
 	}
 
+	/** @return timeout (in seconds) before aborting an IO operation. */
+	public int getTimeout() {
+		return timeout;
+	}
+
+	/**
+	 * Set the timeout before willing to abort an IO call.
+	 *
+	 * @param seconds
+	 *            number of seconds to wait (with no data transfer occurring)
+	 *            before aborting an IO read or write operation with the
+	 *            connected client.
+	 */
+	public void setTimeout(final int seconds) {
+		timeout = seconds;
+	}
+
 	/** @return all of the command received by the current request. */
 	public List<ReceiveCommand> getAllCommands() {
 		return Collections.unmodifiableList(commands);
@@ -365,6 +393,17 @@ public void receive(final InputStream input, final OutputStream output,
 			rawIn = input;
 			rawOut = output;
 
+			if (timeout > 0) {
+				final Thread caller = Thread.currentThread();
+				timer = new InterruptTimer(caller.getName() + "-Timer");
+				timeoutIn = new TimeoutInputStream(rawIn, timer);
+				TimeoutOutputStream o = new TimeoutOutputStream(rawOut, timer);
+				timeoutIn.setTimeout(timeout * 1000);
+				o.setTimeout(timeout * 1000);
+				rawIn = timeoutIn;
+				rawOut = o;
+			}
+
 			pckIn = new PacketLineIn(rawIn);
 			pckOut = new PacketLineOut(rawOut);
 			if (messages != null) {
@@ -389,6 +428,7 @@ public void println() {
 				}
 			} finally {
 				unlockPack();
+				timeoutIn = null;
 				rawIn = null;
 				rawOut = null;
 				pckIn = null;
@@ -397,6 +437,13 @@ public void println() {
 				refs = null;
 				enabledCapablities = null;
 				commands = null;
+				if (timer != null) {
+					try {
+						timer.terminate();
+					} finally {
+						timer = null;
+					}
+				}
 			}
 		}
 	}
@@ -557,6 +604,13 @@ private boolean needPack() {
 	}
 
 	private void receivePack() throws IOException {
+		// It might take the client a while to pack the objects it needs
+		// to send to us.  We should increase our timeout so we don't
+		// abort while the client is computing.
+		//
+		if (timeoutIn != null)
+			timeoutIn.setTimeout(10 * timeout * 1000);
+
 		final IndexPack ip = IndexPack.create(db, rawIn);
 		ip.setFixThin(true);
 		ip.setObjectChecking(isCheckReceivedObjects());
@@ -566,6 +620,9 @@ private void receivePack() throws IOException {
 		if (getRefLogIdent() != null)
 			lockMsg += " from " + getRefLogIdent().toExternalString();
 		packLock = ip.renameAndOpenPack(lockMsg);
+
+		if (timeoutIn != null)
+			timeoutIn.setTimeout(timeout * 1000);
 	}
 
 	private void checkConnectivity() throws IOException {
diff --git a/org.spearce.jgit/src/org/spearce/jgit/transport/UploadPack.java b/org.spearce.jgit/src/org/spearce/jgit/transport/UploadPack.java
index 7d17b2d..b0fa885 100644
--- a/org.spearce.jgit/src/org/spearce/jgit/transport/UploadPack.java
+++ b/org.spearce.jgit/src/org/spearce/jgit/transport/UploadPack.java
@@ -64,6 +64,9 @@
 import org.spearce.jgit.revwalk.RevObject;
 import org.spearce.jgit.revwalk.RevTag;
 import org.spearce.jgit.revwalk.RevWalk;
+import org.spearce.jgit.util.io.InterruptTimer;
+import org.spearce.jgit.util.io.TimeoutInputStream;
+import org.spearce.jgit.util.io.TimeoutOutputStream;
 
 /**
  * Implements the server side of a fetch connection, transmitting objects.
@@ -89,6 +92,12 @@
 	/** Revision traversal support over {@link #db}. */
 	private final RevWalk walk;
 
+	/** Timeout in seconds to wait for client interaction. */
+	private int timeout;
+
+	/** Timer to manage {@link #timeout}. */
+	private InterruptTimer timer;
+
 	private InputStream rawIn;
 
 	private OutputStream rawOut;
@@ -164,6 +173,23 @@ public final RevWalk getRevWalk() {
 		return walk;
 	}
 
+	/** @return timeout (in seconds) before aborting an IO operation. */
+	public int getTimeout() {
+		return timeout;
+	}
+
+	/**
+	 * Set the timeout before willing to abort an IO call.
+	 *
+	 * @param seconds
+	 *            number of seconds to wait (with no data transfer occurring)
+	 *            before aborting an IO read or write operation with the
+	 *            connected client.
+	 */
+	public void setTimeout(final int seconds) {
+		timeout = seconds;
+	}
+
 	/**
 	 * Execute the upload task on the socket.
 	 *
@@ -183,12 +209,33 @@ public final RevWalk getRevWalk() {
 	 */
 	public void upload(final InputStream input, final OutputStream output,
 			final OutputStream messages) throws IOException {
-		rawIn = input;
-		rawOut = output;
+		try {
+			rawIn = input;
+			rawOut = output;
+
+			if (timeout > 0) {
+				final Thread caller = Thread.currentThread();
+				timer = new InterruptTimer(caller.getName() + "-Timer");
+				TimeoutInputStream i = new TimeoutInputStream(rawIn, timer);
+				TimeoutOutputStream o = new TimeoutOutputStream(rawOut, timer);
+				i.setTimeout(timeout * 1000);
+				o.setTimeout(timeout * 1000);
+				rawIn = i;
+				rawOut = o;
+			}
 
-		pckIn = new PacketLineIn(rawIn);
-		pckOut = new PacketLineOut(rawOut);
-		service();
+			pckIn = new PacketLineIn(rawIn);
+			pckOut = new PacketLineOut(rawOut);
+			service();
+		} finally {
+			if (timer != null) {
+				try {
+					timer.terminate();
+				} finally {
+					timer = null;
+				}
+			}
+		}
 	}
 
 	private void service() throws IOException {
-- 
1.6.3.2.416.g04d0

^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [JGIT PATCH 5/6] Add timeouts to anonymous git:// daemon
  2009-06-19 21:27       ` [JGIT PATCH 4/6] Add timeouts to smart transport protocol servers Shawn O. Pearce
@ 2009-06-19 21:27         ` Shawn O. Pearce
  2009-06-19 21:27           ` [JGIT PATCH 6/6] Add --timeout command line options Shawn O. Pearce
  0 siblings, 1 reply; 11+ messages in thread
From: Shawn O. Pearce @ 2009-06-19 21:27 UTC (permalink / raw)
  To: Robin Rosenberg; +Cc: git

If the initial command line isn't received within the configured
timeout period, the connection is dropped, and the service thread
is able to terminate.

The timeout is also pushed down to the service implementations,
so they can abort if the client doesn't speak to them within the
configured time span.

Signed-off-by: Shawn O. Pearce <spearce@spearce.org>
---
 .../src/org/spearce/jgit/transport/Daemon.java     |   26 ++++++++++++++++---
 .../org/spearce/jgit/transport/DaemonClient.java   |   12 +++++++--
 2 files changed, 31 insertions(+), 7 deletions(-)

diff --git a/org.spearce.jgit/src/org/spearce/jgit/transport/Daemon.java b/org.spearce.jgit/src/org/spearce/jgit/transport/Daemon.java
index be27732..3101d6f 100644
--- a/org.spearce.jgit/src/org/spearce/jgit/transport/Daemon.java
+++ b/org.spearce.jgit/src/org/spearce/jgit/transport/Daemon.java
@@ -37,8 +37,6 @@
 
 package org.spearce.jgit.transport;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -79,6 +77,8 @@
 
 	private Thread acceptThread;
 
+	private int timeout;
+
 	/** Configure a daemon to listen on any available network port. */
 	public Daemon() {
 		this(null);
@@ -108,6 +108,7 @@ protected void execute(final DaemonClient dc,
 							final Repository db) throws IOException {
 						final UploadPack rp = new UploadPack(db);
 						final InputStream in = dc.getInputStream();
+						rp.setTimeout(Daemon.this.getTimeout());
 						rp.upload(in, dc.getOutputStream(), null);
 					}
 				}, new DaemonService("receive-pack", "receivepack") {
@@ -127,6 +128,7 @@ protected void execute(final DaemonClient dc,
 						final String name = "anonymous";
 						final String email = name + "@" + host;
 						rp.setRefLogIdent(new PersonIdent(name, email));
+						rp.setTimeout(Daemon.this.getTimeout());
 						rp.receive(in, dc.getOutputStream(), null);
 					}
 				} };
@@ -213,6 +215,23 @@ synchronized (exportBase) {
 		}
 	}
 
+	/** @return timeout (in seconds) before aborting an IO operation. */
+	public int getTimeout() {
+		return timeout;
+	}
+
+	/**
+	 * Set the timeout before willing to abort an IO call.
+	 *
+	 * @param seconds
+	 *            number of seconds to wait (with no data transfer occurring)
+	 *            before aborting an IO read or write operation with the
+	 *            connected client.
+	 */
+	public void setTimeout(final int seconds) {
+		timeout = seconds;
+	}
+
 	/**
 	 * Start this daemon on a background thread.
 	 *
@@ -280,8 +299,7 @@ private void startClient(final Socket s) {
 		new Thread(processors, "Git-Daemon-Client " + peer.toString()) {
 			public void run() {
 				try {
-					dc.execute(new BufferedInputStream(s.getInputStream()),
-							new BufferedOutputStream(s.getOutputStream()));
+					dc.execute(s);
 				} catch (IOException e) {
 					// Ignore unexpected IO exceptions from clients
 					e.printStackTrace();
diff --git a/org.spearce.jgit/src/org/spearce/jgit/transport/DaemonClient.java b/org.spearce.jgit/src/org/spearce/jgit/transport/DaemonClient.java
index e80d86b..52b2ae0 100644
--- a/org.spearce.jgit/src/org/spearce/jgit/transport/DaemonClient.java
+++ b/org.spearce.jgit/src/org/spearce/jgit/transport/DaemonClient.java
@@ -37,10 +37,13 @@
 
 package org.spearce.jgit.transport;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetAddress;
+import java.net.Socket;
 
 /** Active network client of {@link Daemon}. */
 public class DaemonClient {
@@ -80,11 +83,13 @@ public OutputStream getOutputStream() {
 		return rawOut;
 	}
 
-	void execute(final InputStream in, final OutputStream out)
+	void execute(final Socket sock)
 			throws IOException {
-		rawIn = in;
-		rawOut = out;
+		rawIn = new BufferedInputStream(sock.getInputStream());
+		rawOut = new BufferedOutputStream(sock.getOutputStream());
 
+		if (0 < daemon.getTimeout())
+			sock.setSoTimeout(daemon.getTimeout() * 1000);
 		String cmd = new PacketLineIn(rawIn).readStringRaw();
 		final int nul = cmd.indexOf('\0');
 		if (nul >= 0) {
@@ -98,6 +103,7 @@ void execute(final InputStream in, final OutputStream out)
 		final DaemonService srv = getDaemon().matchService(cmd);
 		if (srv == null)
 			return;
+		sock.setSoTimeout(0);
 		srv.execute(this, cmd);
 	}
 }
-- 
1.6.3.2.416.g04d0

^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [JGIT PATCH 6/6] Add --timeout command line options
  2009-06-19 21:27         ` [JGIT PATCH 5/6] Add timeouts to anonymous git:// daemon Shawn O. Pearce
@ 2009-06-19 21:27           ` Shawn O. Pearce
  0 siblings, 0 replies; 11+ messages in thread
From: Shawn O. Pearce @ 2009-06-19 21:27 UTC (permalink / raw)
  To: Robin Rosenberg; +Cc: git

Signed-off-by: Shawn O. Pearce <spearce@spearce.org>
---
 .../src/org/spearce/jgit/pgm/Daemon.java           |    5 +++++
 .../src/org/spearce/jgit/pgm/Fetch.java            |    5 +++++
 .../src/org/spearce/jgit/pgm/LsRemote.java         |    6 ++++++
 .../src/org/spearce/jgit/pgm/Push.java             |    5 +++++
 .../src/org/spearce/jgit/pgm/UploadPack.java       |    6 ++++++
 5 files changed, 27 insertions(+), 0 deletions(-)

diff --git a/org.spearce.jgit.pgm/src/org/spearce/jgit/pgm/Daemon.java b/org.spearce.jgit.pgm/src/org/spearce/jgit/pgm/Daemon.java
index e064fd8..6508910 100644
--- a/org.spearce.jgit.pgm/src/org/spearce/jgit/pgm/Daemon.java
+++ b/org.spearce.jgit.pgm/src/org/spearce/jgit/pgm/Daemon.java
@@ -54,6 +54,9 @@
 	@Option(name = "--listen", metaVar = "HOSTNAME", usage = "hostname (or ip) to listen on")
 	String host;
 
+	@Option(name = "--timeout", metaVar = "SECONDS", usage = "abort connection if no activity")
+	int timeout = -1;
+
 	@Option(name = "--enable", metaVar = "SERVICE", usage = "enable the service in all repositories", multiValued = true)
 	final List<String> enable = new ArrayList<String>();
 
@@ -85,6 +88,8 @@ protected void run() throws Exception {
 				host != null ? new InetSocketAddress(host, port)
 						: new InetSocketAddress(port));
 		d.setExportAll(exportAll);
+		if (0 <= timeout)
+			d.setTimeout(timeout);
 
 		for (final String n : enable)
 			service(d, n).setEnabled(true);
diff --git a/org.spearce.jgit.pgm/src/org/spearce/jgit/pgm/Fetch.java b/org.spearce.jgit.pgm/src/org/spearce/jgit/pgm/Fetch.java
index 81d6893..d7be9fa 100644
--- a/org.spearce.jgit.pgm/src/org/spearce/jgit/pgm/Fetch.java
+++ b/org.spearce.jgit.pgm/src/org/spearce/jgit/pgm/Fetch.java
@@ -48,6 +48,9 @@
 
 @Command(common = true, usage = "Update remote refs from another repository")
 class Fetch extends AbstractFetchCommand {
+	@Option(name = "--timeout", metaVar = "SECONDS", usage = "abort connection if no activity")
+	int timeout = -1;
+
 	@Option(name = "--fsck", usage = "perform fsck style checks on receive")
 	private Boolean fsck;
 
@@ -86,6 +89,8 @@ protected void run() throws Exception {
 		tn.setDryRun(dryRun);
 		if (thin != null)
 			tn.setFetchThin(thin.booleanValue());
+		if (0 <= timeout)
+			tn.setTimeout(timeout);
 		final FetchResult r;
 		try {
 			r = tn.fetch(new TextProgressMonitor(), toget);
diff --git a/org.spearce.jgit.pgm/src/org/spearce/jgit/pgm/LsRemote.java b/org.spearce.jgit.pgm/src/org/spearce/jgit/pgm/LsRemote.java
index bfa38ec..da2de9a 100644
--- a/org.spearce.jgit.pgm/src/org/spearce/jgit/pgm/LsRemote.java
+++ b/org.spearce.jgit.pgm/src/org/spearce/jgit/pgm/LsRemote.java
@@ -38,18 +38,24 @@
 package org.spearce.jgit.pgm;
 
 import org.kohsuke.args4j.Argument;
+import org.kohsuke.args4j.Option;
 import org.spearce.jgit.lib.AnyObjectId;
 import org.spearce.jgit.lib.Ref;
 import org.spearce.jgit.transport.FetchConnection;
 import org.spearce.jgit.transport.Transport;
 
 class LsRemote extends TextBuiltin {
+	@Option(name = "--timeout", metaVar = "SECONDS", usage = "abort connection if no activity")
+	int timeout = -1;
+
 	@Argument(index = 0, metaVar = "uri-ish", required = true)
 	private String remote;
 
 	@Override
 	protected void run() throws Exception {
 		final Transport tn = Transport.open(db, remote);
+		if (0 <= timeout)
+			tn.setTimeout(timeout);
 		final FetchConnection c = tn.openFetch();
 		try {
 			for (final Ref r : c.getRefs()) {
diff --git a/org.spearce.jgit.pgm/src/org/spearce/jgit/pgm/Push.java b/org.spearce.jgit.pgm/src/org/spearce/jgit/pgm/Push.java
index 19d31a1..018a521 100644
--- a/org.spearce.jgit.pgm/src/org/spearce/jgit/pgm/Push.java
+++ b/org.spearce.jgit.pgm/src/org/spearce/jgit/pgm/Push.java
@@ -55,6 +55,9 @@
 
 @Command(common = true, usage = "Update remote repository from local refs")
 class Push extends TextBuiltin {
+	@Option(name = "--timeout", metaVar = "SECONDS", usage = "abort connection if no activity")
+	int timeout = -1;
+
 	@Argument(index = 0, metaVar = "uri-ish")
 	private String remote = "origin";
 
@@ -104,6 +107,8 @@ protected void run() throws Exception {
 
 		final List<Transport> transports = Transport.openAll(db, remote);
 		for (final Transport transport : transports) {
+			if (0 <= timeout)
+				transport.setTimeout(timeout);
 			transport.setPushThin(thin);
 			if (receivePack != null)
 				transport.setOptionReceivePack(receivePack);
diff --git a/org.spearce.jgit.pgm/src/org/spearce/jgit/pgm/UploadPack.java b/org.spearce.jgit.pgm/src/org/spearce/jgit/pgm/UploadPack.java
index d09d442..1d9af24 100644
--- a/org.spearce.jgit.pgm/src/org/spearce/jgit/pgm/UploadPack.java
+++ b/org.spearce.jgit.pgm/src/org/spearce/jgit/pgm/UploadPack.java
@@ -40,10 +40,14 @@
 import java.io.File;
 
 import org.kohsuke.args4j.Argument;
+import org.kohsuke.args4j.Option;
 import org.spearce.jgit.lib.Repository;
 
 @Command(common = false, usage = "Server side backend for 'jgit fetch'")
 class UploadPack extends TextBuiltin {
+	@Option(name = "--timeout", metaVar = "SECONDS", usage = "abort connection if no activity")
+	int timeout = -1;
+
 	@Argument(index = 0, required = true, metaVar = "DIRECTORY", usage = "Repository to read from")
 	File srcGitdir;
 
@@ -62,6 +66,8 @@ protected void run() throws Exception {
 		if (!db.getObjectsDirectory().isDirectory())
 			throw die("'" + srcGitdir.getPath() + "' not a git repository");
 		rp = new org.spearce.jgit.transport.UploadPack(db);
+		if (0 <= timeout)
+			rp.setTimeout(timeout);
 		rp.upload(System.in, System.out, System.err);
 	}
 }
-- 
1.6.3.2.416.g04d0

^ permalink raw reply related	[flat|nested] 11+ messages in thread

* Re: [JGIT PATCH 2/6] Add remote.name.timeout to configure an IO timeout
  2009-06-19 21:27   ` [JGIT PATCH 2/6] Add remote.name.timeout to configure an IO timeout Shawn O. Pearce
  2009-06-19 21:27     ` [JGIT PATCH 3/6] Add timeouts to smart transport protocol clients Shawn O. Pearce
@ 2009-06-20 22:28     ` Robin Rosenberg
  2009-06-20 22:54       ` Shawn O. Pearce
  1 sibling, 1 reply; 11+ messages in thread
From: Robin Rosenberg @ 2009-06-20 22:28 UTC (permalink / raw)
  To: Shawn O. Pearce; +Cc: git

fredag 19 juni 2009 23:27:51 skrev "Shawn O. Pearce" <spearce@spearce.org>:
> An IO timeout can be useful if the remote peer stops responding,
> and the user wants the application to abort rather than block
> indefinitely waiting for more input.
> 
> This is a JGit specific extension to the standard remote format.

Can we we assume C Git won't implement the same thing with a different 
parameter name, or worse, the same name, but a different unit?

-- robin

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [JGIT PATCH 2/6] Add remote.name.timeout to configure an IO timeout
  2009-06-20 22:28     ` [JGIT PATCH 2/6] Add remote.name.timeout to configure an IO timeout Robin Rosenberg
@ 2009-06-20 22:54       ` Shawn O. Pearce
  0 siblings, 0 replies; 11+ messages in thread
From: Shawn O. Pearce @ 2009-06-20 22:54 UTC (permalink / raw)
  To: Robin Rosenberg; +Cc: git

Robin Rosenberg <robin.rosenberg.lists@dewire.com> wrote:
> fredag 19 juni 2009 23:27:51 skrev "Shawn O. Pearce" <spearce@spearce.org>:
> > An IO timeout can be useful if the remote peer stops responding,
> > and the user wants the application to abort rather than block
> > indefinitely waiting for more input.
> > 
> > This is a JGit specific extension to the standard remote format.
> 
> Can we we assume C Git won't implement the same thing with a different 
> parameter name, or worse, the same name, but a different unit?

No, we can't assume anything.

I probably should add this to C Git too.  I think its the logical
name and units, e.g `git daemon --timeout=` already exists and
takes seconds as the unit.

I wrote this series because I have a case where the remote server
is sometimes not sending packets out... and the client just blocks.
git fetch has the same issue.  Doing git fetch or jgit fetch from
a cron against this server causes the fetch processes to just pile
up indefiniately.  :-(

-- 
Shawn.

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [JGIT PATCH 1/6] Create input and output streams that have timeouts
  2009-06-19 21:27 ` [JGIT PATCH 1/6] Create input and output streams that have timeouts Shawn O. Pearce
  2009-06-19 21:27   ` [JGIT PATCH 2/6] Add remote.name.timeout to configure an IO timeout Shawn O. Pearce
@ 2009-06-22 21:09   ` Robin Rosenberg
  2009-06-23 16:41     ` [JGIT PATCH 1/6 v2] " Shawn O. Pearce
  1 sibling, 1 reply; 11+ messages in thread
From: Robin Rosenberg @ 2009-06-22 21:09 UTC (permalink / raw)
  To: Shawn O. Pearce; +Cc: git

fredag 19 juni 2009 23:27:50 skrev "Shawn O. Pearce" <spearce@spearce.org>:
> We can use these streams to wrap around a PipedInputStream, such
> as the one given to us by JSch, to implement a timeout so we can
> abort a transport operation if the remote peer doesn't communicate
> with us in a timely fashion.
> 
> Only the input and output streams and tests are defined, later
> I will make use of them in the transport layer where we can't
> rely upon raw socket timeouts to abort a long running IO call.
> 
> Signed-off-by: Shawn O. Pearce <spearce@spearce.org>
> ---
>  .../jgit/util/io/TimeoutInputStreamTest.java       |  187 +++++++++++++
>  .../jgit/util/io/TimeoutOutputStreamTest.java      |  286 ++++++++++++++++++++
>  .../org/spearce/jgit/util/io/InterruptTimer.java   |  216 +++++++++++++++
>  .../spearce/jgit/util/io/TimeoutInputStream.java   |  133 +++++++++
>  .../spearce/jgit/util/io/TimeoutOutputStream.java  |  146 ++++++++++
>  5 files changed, 968 insertions(+), 0 deletions(-)
>  create mode 100644 org.spearce.jgit.test/tst/org/spearce/jgit/util/io/TimeoutInputStreamTest.java
>  create mode 100644 org.spearce.jgit.test/tst/org/spearce/jgit/util/io/TimeoutOutputStreamTest.java
>  create mode 100644 org.spearce.jgit/src/org/spearce/jgit/util/io/InterruptTimer.java
>  create mode 100644 org.spearce.jgit/src/org/spearce/jgit/util/io/TimeoutInputStream.java
>  create mode 100644 org.spearce.jgit/src/org/spearce/jgit/util/io/TimeoutOutputStream.java
> 
> diff --git a/org.spearce.jgit.test/tst/org/spearce/jgit/util/io/TimeoutInputStreamTest.java b/org.spearce.jgit.test/tst/org/spearce/jgit/util/io/TimeoutInputStreamTest.java
> new file mode 100644
> index 0000000..25eff9a
> --- /dev/null
> +++ b/org.spearce.jgit.test/tst/org/spearce/jgit/util/io/TimeoutInputStreamTest.java
> @@ -0,0 +1,187 @@
> +/*
> + * Copyright (C) 2009, Google Inc.
> + *
> + * All rights reserved.
> + *
> + * Redistribution and use in source and binary forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + * - Redistributions of source code must retain the above copyright
> + *   notice, this list of conditions and the following disclaimer.
> + *
> + * - Redistributions in binary form must reproduce the above
> + *   copyright notice, this list of conditions and the following
> + *   disclaimer in the documentation and/or other materials provided
> + *   with the distribution.
> + *
> + * - Neither the name of the Git Development Community nor the
> + *   names of its contributors may be used to endorse or promote
> + *   products derived from this software without specific prior
> + *   written permission.
> + *
> + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
> + * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
> + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
> + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
> + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
> + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
> + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
> + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
> + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
> + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
> + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
> + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> + */
> +
> +package org.spearce.jgit.util.io;
> +
> +import java.io.IOException;
> +import java.io.InterruptedIOException;
> +import java.io.PipedInputStream;
> +import java.io.PipedOutputStream;
> +import java.util.Arrays;
> +import java.util.List;
> +
> +import org.spearce.jgit.util.NB;
> +import org.spearce.jgit.util.io.InterruptTimer;
> +import org.spearce.jgit.util.io.TimeoutInputStream;
> +
> +import junit.framework.TestCase;
> +
> +public class TimeoutInputStreamTest extends TestCase {
> +	private static final int timeout = 250;
> +
> +	private PipedOutputStream out;
> +
> +	private PipedInputStream in;
> +
> +	private InterruptTimer timer;
> +
> +	private TimeoutInputStream is;
> +
> +	private long start;
> +
> +	protected void setUp() throws Exception {
> +		super.setUp();
> +		out = new PipedOutputStream();
> +		in = new PipedInputStream(out);
> +		timer = new InterruptTimer();
> +		is = new TimeoutInputStream(in, timer);
> +		is.setTimeout(timeout);
> +	}
> +
> +	protected void tearDown() throws Exception {
> +		close();
> +		super.tearDown();
> +	}
> +
> +	public void testTimeout_readByte_Success1() throws IOException {
> +		out.write('a');
> +		assertEquals('a', is.read());
> +		close();
Unnecessary close(). It is already in tearDown. Same goes for all other
tests in the suite.

> +	}
> +
> +	public void testTimeout_readByte_Success2() throws IOException {
> +		final byte[] exp = new byte[] { 'a', 'b', 'c' };
> +		out.write(exp);
> +		assertEquals(exp[0], is.read());
> +		assertEquals(exp[1], is.read());
> +		assertEquals(exp[2], is.read());
> +		close();
> +	}

Perhaps a normal end of stream test too, out.close and verify that read
returns -1.

> +	public void testTimeout_readByte_Timeout() throws IOException {
> +		beginRead();
> +		try {
> +			is.read();
> +			fail("incorrectly read a byte");
> +		} catch (InterruptedIOException e) {
> +			// expected
> +		}
> +		assertTimeout();
> +		close();
> +	}
> +
> +	public void testTimeout_readBuffer_Success1() throws IOException {
> +		final byte[] exp = new byte[] { 'a', 'b', 'c' };
> +		final byte[] act = new byte[exp.length];
> +		out.write(exp);
> +		NB.readFully(is, act, 0, act.length);
> +		assertTrue(Arrays.equals(exp, act));
> +		close();
> +	}
> +
> +	public void testTimeout_readBuffer_Success2() throws IOException {
> +		final byte[] exp = new byte[] { 'a', 'b', 'c' };
> +		final byte[] act = new byte[exp.length];
> +		out.write(exp);
> +		NB.readFully(is, act, 0, 1);
> +		NB.readFully(is, act, 1, 1);
> +		NB.readFully(is, act, 2, 1);
> +		assertTrue(Arrays.equals(exp, act));
> +		close();
> +	}
> +
> +	public void testTimeout_readBuffer_Timeout() throws IOException {
> +		beginRead();
> +		try {
> +			is.read(new byte[512]);
> +			fail("incorrectly read bytes");
> +		} catch (InterruptedIOException e) {
> +			// expected
> +		}
> +		assertTimeout();
> +		close();
> +	}

The success read calls NB.readFully, shouldn't the timeout ones use the
same API?

> +	public void testTimeout_skip_Success() throws IOException {
> +		final byte[] exp = new byte[] { 'a', 'b', 'c' };
> +		out.write(exp);
> +		assertEquals(2, is.skip(2));
> +		assertEquals('c', is.read());
> +		close();
> +	}
> +
> +	public void testTimeout_skip_Timeout() throws IOException {
> +		beginRead();
> +		try {
> +			is.skip(1024);
> +			fail("incorrectly skipped bytes");
> +		} catch (InterruptedIOException e) {
> +			// expected
> +		}
> +		assertTimeout();
> +		close();
> +	}
> +
> +	private void beginRead() {
> +		start = now();
> +	}
> +
> +	private void assertTimeout() {
> +		final long wait = now() - start;
> +		assertTrue(Math.abs(wait - timeout) < 50);

Maybe a comment to the uninvited.

> +	}
...
> +public class TimeoutOutputStreamTest extends TestCase {
> +	private static final int timeout = 250;
...
> +	protected void tearDown() throws Exception {
> +		close();
> +		super.tearDown();
> +	}
> +
Same comment as for the input test and close().


And the rest is beautiful code.

-- robin

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [JGIT PATCH 1/6 v2] Create input and output streams that have timeouts
  2009-06-22 21:09   ` [JGIT PATCH 1/6] Create input and output streams that have timeouts Robin Rosenberg
@ 2009-06-23 16:41     ` Shawn O. Pearce
  0 siblings, 0 replies; 11+ messages in thread
From: Shawn O. Pearce @ 2009-06-23 16:41 UTC (permalink / raw)
  To: Robin Rosenberg; +Cc: git

We can use these streams to wrap around a PipedInputStream, such
as the one given to us by JSch, to implement a timeout so we can
abort a transport operation if the remote peer doesn't communicate
with us in a timely fashion.

Only the input and output streams and tests are defined, later
I will make use of them in the transport layer where we can't
rely upon raw socket timeouts to abort a long running IO call.

Signed-off-by: Shawn O. Pearce <spearce@spearce.org>
---

 This version addresses all of your comments in the unit tests.

 .../jgit/util/io/TimeoutInputStreamTest.java       |  183 +++++++++++++
 .../jgit/util/io/TimeoutOutputStreamTest.java      |  279 ++++++++++++++++++++
 .../org/spearce/jgit/util/io/InterruptTimer.java   |  216 +++++++++++++++
 .../spearce/jgit/util/io/TimeoutInputStream.java   |  133 ++++++++++
 .../spearce/jgit/util/io/TimeoutOutputStream.java  |  146 ++++++++++
 5 files changed, 957 insertions(+), 0 deletions(-)
 create mode 100644 org.spearce.jgit.test/tst/org/spearce/jgit/util/io/TimeoutInputStreamTest.java
 create mode 100644 org.spearce.jgit.test/tst/org/spearce/jgit/util/io/TimeoutOutputStreamTest.java
 create mode 100644 org.spearce.jgit/src/org/spearce/jgit/util/io/InterruptTimer.java
 create mode 100644 org.spearce.jgit/src/org/spearce/jgit/util/io/TimeoutInputStream.java
 create mode 100644 org.spearce.jgit/src/org/spearce/jgit/util/io/TimeoutOutputStream.java

diff --git a/org.spearce.jgit.test/tst/org/spearce/jgit/util/io/TimeoutInputStreamTest.java b/org.spearce.jgit.test/tst/org/spearce/jgit/util/io/TimeoutInputStreamTest.java
new file mode 100644
index 0000000..272838c
--- /dev/null
+++ b/org.spearce.jgit.test/tst/org/spearce/jgit/util/io/TimeoutInputStreamTest.java
@@ -0,0 +1,183 @@
+/*
+ * Copyright (C) 2009, Google Inc.
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ *   notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ *   copyright notice, this list of conditions and the following
+ *   disclaimer in the documentation and/or other materials provided
+ *   with the distribution.
+ *
+ * - Neither the name of the Git Development Community nor the
+ *   names of its contributors may be used to endorse or promote
+ *   products derived from this software without specific prior
+ *   written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
+ * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+ * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.spearce.jgit.util.io;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Arrays;
+import java.util.List;
+
+import org.spearce.jgit.util.NB;
+import org.spearce.jgit.util.io.InterruptTimer;
+import org.spearce.jgit.util.io.TimeoutInputStream;
+
+import junit.framework.TestCase;
+
+public class TimeoutInputStreamTest extends TestCase {
+	private static final int timeout = 250;
+
+	private PipedOutputStream out;
+
+	private PipedInputStream in;
+
+	private InterruptTimer timer;
+
+	private TimeoutInputStream is;
+
+	private long start;
+
+	protected void setUp() throws Exception {
+		super.setUp();
+		out = new PipedOutputStream();
+		in = new PipedInputStream(out);
+		timer = new InterruptTimer();
+		is = new TimeoutInputStream(in, timer);
+		is.setTimeout(timeout);
+	}
+
+	protected void tearDown() throws Exception {
+		timer.terminate();
+		for (Thread t : active())
+			assertFalse(t instanceof InterruptTimer.AlarmThread);
+		super.tearDown();
+	}
+
+	public void testTimeout_readByte_Success1() throws IOException {
+		out.write('a');
+		assertEquals('a', is.read());
+	}
+
+	public void testTimeout_readByte_Success2() throws IOException {
+		final byte[] exp = new byte[] { 'a', 'b', 'c' };
+		out.write(exp);
+		assertEquals(exp[0], is.read());
+		assertEquals(exp[1], is.read());
+		assertEquals(exp[2], is.read());
+		out.close();
+		assertEquals(-1, is.read());
+	}
+
+	public void testTimeout_readByte_Timeout() throws IOException {
+		beginRead();
+		try {
+			is.read();
+			fail("incorrectly read a byte");
+		} catch (InterruptedIOException e) {
+			// expected
+		}
+		assertTimeout();
+	}
+
+	public void testTimeout_readBuffer_Success1() throws IOException {
+		final byte[] exp = new byte[] { 'a', 'b', 'c' };
+		final byte[] act = new byte[exp.length];
+		out.write(exp);
+		NB.readFully(is, act, 0, act.length);
+		assertTrue(Arrays.equals(exp, act));
+	}
+
+	public void testTimeout_readBuffer_Success2() throws IOException {
+		final byte[] exp = new byte[] { 'a', 'b', 'c' };
+		final byte[] act = new byte[exp.length];
+		out.write(exp);
+		NB.readFully(is, act, 0, 1);
+		NB.readFully(is, act, 1, 1);
+		NB.readFully(is, act, 2, 1);
+		assertTrue(Arrays.equals(exp, act));
+	}
+
+	public void testTimeout_readBuffer_Timeout() throws IOException {
+		beginRead();
+		try {
+			NB.readFully(is, new byte[512], 0, 512);
+			fail("incorrectly read bytes");
+		} catch (InterruptedIOException e) {
+			// expected
+		}
+		assertTimeout();
+	}
+
+	public void testTimeout_skip_Success() throws IOException {
+		final byte[] exp = new byte[] { 'a', 'b', 'c' };
+		out.write(exp);
+		assertEquals(2, is.skip(2));
+		assertEquals('c', is.read());
+	}
+
+	public void testTimeout_skip_Timeout() throws IOException {
+		beginRead();
+		try {
+			is.skip(1024);
+			fail("incorrectly skipped bytes");
+		} catch (InterruptedIOException e) {
+			// expected
+		}
+		assertTimeout();
+	}
+
+	private void beginRead() {
+		start = now();
+	}
+
+	private void assertTimeout() {
+		// Our timeout was supposed to be ~250 ms. Since this is a timing
+		// test we can't assume we spent *exactly* the timeout period, as
+		// there may be other activity going on in the system. Instead we
+		// look for the delta between the start and end times to be within
+		// 50 ms of the expected timeout.
+		//
+		final long wait = now() - start;
+		assertTrue(Math.abs(wait - timeout) < 50);
+	}
+
+	private static List<Thread> active() {
+		Thread[] all = new Thread[16];
+		int n = Thread.currentThread().getThreadGroup().enumerate(all);
+		while (n == all.length) {
+			all = new Thread[all.length * 2];
+			n = Thread.currentThread().getThreadGroup().enumerate(all);
+		}
+		return Arrays.asList(all).subList(0, n);
+	}
+
+	private static long now() {
+		return System.currentTimeMillis();
+	}
+}
diff --git a/org.spearce.jgit.test/tst/org/spearce/jgit/util/io/TimeoutOutputStreamTest.java b/org.spearce.jgit.test/tst/org/spearce/jgit/util/io/TimeoutOutputStreamTest.java
new file mode 100644
index 0000000..07e5c38
--- /dev/null
+++ b/org.spearce.jgit.test/tst/org/spearce/jgit/util/io/TimeoutOutputStreamTest.java
@@ -0,0 +1,279 @@
+/*
+ * Copyright (C) 2009, Google Inc.
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ *   notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ *   copyright notice, this list of conditions and the following
+ *   disclaimer in the documentation and/or other materials provided
+ *   with the distribution.
+ *
+ * - Neither the name of the Git Development Community nor the
+ *   names of its contributors may be used to endorse or promote
+ *   products derived from this software without specific prior
+ *   written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
+ * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+ * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.spearce.jgit.util.io;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Arrays;
+import java.util.List;
+
+import org.spearce.jgit.util.NB;
+import org.spearce.jgit.util.io.InterruptTimer;
+import org.spearce.jgit.util.io.TimeoutOutputStream;
+
+import junit.framework.TestCase;
+
+public class TimeoutOutputStreamTest extends TestCase {
+	private static final int timeout = 250;
+
+	private PipedOutputStream out;
+
+	private FullPipeInputStream in;
+
+	private InterruptTimer timer;
+
+	private TimeoutOutputStream os;
+
+	private long start;
+
+	protected void setUp() throws Exception {
+		super.setUp();
+		out = new PipedOutputStream();
+		in = new FullPipeInputStream(out);
+		timer = new InterruptTimer();
+		os = new TimeoutOutputStream(out, timer);
+		os.setTimeout(timeout);
+	}
+
+	protected void tearDown() throws Exception {
+		timer.terminate();
+		for (Thread t : active())
+			assertFalse(t instanceof InterruptTimer.AlarmThread);
+		super.tearDown();
+	}
+
+	public void testTimeout_writeByte_Success1() throws IOException {
+		in.free(1);
+		os.write('a');
+		in.want(1);
+		assertEquals('a', in.read());
+	}
+
+	public void testTimeout_writeByte_Success2() throws IOException {
+		final byte[] exp = new byte[] { 'a', 'b', 'c' };
+		final byte[] act = new byte[exp.length];
+		in.free(exp.length);
+		os.write(exp[0]);
+		os.write(exp[1]);
+		os.write(exp[2]);
+		in.want(exp.length);
+		in.read(act);
+		assertTrue(Arrays.equals(exp, act));
+	}
+
+	public void testTimeout_writeByte_Timeout() throws IOException {
+		beginWrite();
+		try {
+			os.write('\n');
+			fail("incorrectly write a byte");
+		} catch (InterruptedIOException e) {
+			// expected
+		}
+		assertTimeout();
+	}
+
+	public void testTimeout_writeBuffer_Success1() throws IOException {
+		final byte[] exp = new byte[] { 'a', 'b', 'c' };
+		final byte[] act = new byte[exp.length];
+		in.free(exp.length);
+		os.write(exp);
+		in.want(exp.length);
+		in.read(act);
+		assertTrue(Arrays.equals(exp, act));
+	}
+
+	public void testTimeout_writeBuffer_Timeout() throws IOException {
+		beginWrite();
+		try {
+			os.write(new byte[512]);
+			fail("incorrectly wrote bytes");
+		} catch (InterruptedIOException e) {
+			// expected
+		}
+		assertTimeout();
+	}
+
+	public void testTimeout_flush_Success() throws IOException {
+		final boolean[] called = new boolean[1];
+		os = new TimeoutOutputStream(new OutputStream() {
+			@Override
+			public void write(int b) throws IOException {
+				fail("should not have written");
+			}
+
+			@Override
+			public void flush() throws IOException {
+				called[0] = true;
+			}
+		}, timer);
+		os.setTimeout(timeout);
+		os.flush();
+		assertTrue(called[0]);
+	}
+
+	public void testTimeout_flush_Timeout() throws IOException {
+		final boolean[] called = new boolean[1];
+		os = new TimeoutOutputStream(new OutputStream() {
+			@Override
+			public void write(int b) throws IOException {
+				fail("should not have written");
+			}
+
+			@Override
+			public void flush() throws IOException {
+				called[0] = true;
+				for (;;) {
+					try {
+						Thread.sleep(1000);
+					} catch (InterruptedException e) {
+						throw new InterruptedIOException();
+					}
+				}
+			}
+		}, timer);
+		os.setTimeout(timeout);
+
+		beginWrite();
+		try {
+			os.flush();
+			fail("incorrectly flushed");
+		} catch (InterruptedIOException e) {
+			// expected
+		}
+		assertTimeout();
+		assertTrue(called[0]);
+	}
+
+	public void testTimeout_close_Success() throws IOException {
+		final boolean[] called = new boolean[1];
+		os = new TimeoutOutputStream(new OutputStream() {
+			@Override
+			public void write(int b) throws IOException {
+				fail("should not have written");
+			}
+
+			@Override
+			public void close() throws IOException {
+				called[0] = true;
+			}
+		}, timer);
+		os.setTimeout(timeout);
+		os.close();
+		assertTrue(called[0]);
+	}
+
+	public void testTimeout_close_Timeout() throws IOException {
+		final boolean[] called = new boolean[1];
+		os = new TimeoutOutputStream(new OutputStream() {
+			@Override
+			public void write(int b) throws IOException {
+				fail("should not have written");
+			}
+
+			@Override
+			public void close() throws IOException {
+				called[0] = true;
+				for (;;) {
+					try {
+						Thread.sleep(1000);
+					} catch (InterruptedException e) {
+						throw new InterruptedIOException();
+					}
+				}
+			}
+		}, timer);
+		os.setTimeout(timeout);
+
+		beginWrite();
+		try {
+			os.close();
+			fail("incorrectly closed");
+		} catch (InterruptedIOException e) {
+			// expected
+		}
+		assertTimeout();
+		assertTrue(called[0]);
+	}
+
+	private void beginWrite() {
+		start = now();
+	}
+
+	private void assertTimeout() {
+		// Our timeout was supposed to be ~250 ms. Since this is a timing
+		// test we can't assume we spent *exactly* the timeout period, as
+		// there may be other activity going on in the system. Instead we
+		// look for the delta between the start and end times to be within
+		// 50 ms of the expected timeout.
+		//
+		final long wait = now() - start;
+		assertTrue(Math.abs(wait - timeout) < 50);
+	}
+
+	private static List<Thread> active() {
+		Thread[] all = new Thread[16];
+		int n = Thread.currentThread().getThreadGroup().enumerate(all);
+		while (n == all.length) {
+			all = new Thread[all.length * 2];
+			n = Thread.currentThread().getThreadGroup().enumerate(all);
+		}
+		return Arrays.asList(all).subList(0, n);
+	}
+
+	private static long now() {
+		return System.currentTimeMillis();
+	}
+
+	private final class FullPipeInputStream extends PipedInputStream {
+		FullPipeInputStream(PipedOutputStream src) throws IOException {
+			super(src);
+			src.write(new byte[PIPE_SIZE]);
+		}
+
+		void want(int cnt) throws IOException {
+			NB.skipFully(this, PIPE_SIZE - cnt);
+		}
+
+		void free(int cnt) throws IOException {
+			NB.skipFully(this, cnt);
+		}
+	}
+}
diff --git a/org.spearce.jgit/src/org/spearce/jgit/util/io/InterruptTimer.java b/org.spearce.jgit/src/org/spearce/jgit/util/io/InterruptTimer.java
new file mode 100644
index 0000000..8f625e0
--- /dev/null
+++ b/org.spearce.jgit/src/org/spearce/jgit/util/io/InterruptTimer.java
@@ -0,0 +1,216 @@
+/*
+ * Copyright (C) 2009, Google Inc.
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ *   notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ *   copyright notice, this list of conditions and the following
+ *   disclaimer in the documentation and/or other materials provided
+ *   with the distribution.
+ *
+ * - Neither the name of the Git Development Community nor the
+ *   names of its contributors may be used to endorse or promote
+ *   products derived from this software without specific prior
+ *   written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
+ * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+ * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.spearce.jgit.util.io;
+
+/**
+ * Triggers an interrupt on the calling thread if it doesn't complete a block.
+ * <p>
+ * Classes can use this to trip an alarm interrupting the calling thread if it
+ * doesn't complete a block within the specified timeout. Typical calling
+ * pattern is:
+ *
+ * <pre>
+ * private InterruptTimer myTimer = ...;
+ * void foo() {
+ *   try {
+ *     myTimer.begin(timeout);
+ *     // work
+ *   } finally {
+ *     myTimer.end();
+ *   }
+ * }
+ * </pre>
+ * <p>
+ * An InterruptTimer is not recursive. To implement recursive timers,
+ * independent InterruptTimer instances are required. A single InterruptTimer
+ * may be shared between objects which won't recursively call each other.
+ * <p>
+ * Each InterruptTimer spawns one background thread to sleep the specified time
+ * and interrupt the thread which called {@link #begin(int)}. It is up to the
+ * caller to ensure that the operations within the work block between the
+ * matched begin and end calls tests the interrupt flag (most IO operations do).
+ * <p>
+ * To terminate the background thread, use {@link #terminate()}. If the
+ * application fails to terminate the thread, it will (eventually) terminate
+ * itself when the InterruptTimer instance is garbage collected.
+ *
+ * @see TimeoutInputStream
+ */
+public final class InterruptTimer {
+	private final AlarmState state;
+
+	private final AlarmThread thread;
+
+	final AutoKiller autoKiller;
+
+	/** Create a new timer with a default thread name. */
+	public InterruptTimer() {
+		this("JGit-InterruptTimer");
+	}
+
+	/**
+	 * Create a new timer to signal on interrupt on the caller.
+	 * <p>
+	 * The timer thread is created in the calling thread's ThreadGroup.
+	 *
+	 * @param threadName
+	 *            name of the timer thread.
+	 */
+	public InterruptTimer(final String threadName) {
+		state = new AlarmState();
+		autoKiller = new AutoKiller(state);
+		thread = new AlarmThread(threadName, state);
+		thread.start();
+	}
+
+	/**
+	 * Arm the interrupt timer before entering a blocking operation.
+	 *
+	 * @param timeout
+	 *            number of milliseconds before the interrupt should trigger.
+	 *            Must be > 0.
+	 */
+	public void begin(final int timeout) {
+		if (timeout <= 0)
+			throw new IllegalArgumentException("Invalid timeout: " + timeout);
+		Thread.interrupted();
+		state.begin(timeout);
+	}
+
+	/** Disable the interrupt timer, as the operation is complete. */
+	public void end() {
+		state.end();
+	}
+
+	/** Shutdown the timer thread, and wait for it to terminate. */
+	public void terminate() {
+		state.terminate();
+		try {
+			thread.join();
+		} catch (InterruptedException e) {
+			//
+		}
+	}
+
+	static final class AlarmThread extends Thread {
+		AlarmThread(final String name, final AlarmState q) {
+			super(q);
+			setName(name);
+			setDaemon(true);
+		}
+	}
+
+	// The trick here is, the AlarmThread does not have a reference to the
+	// AutoKiller instance, only the InterruptTimer itself does. Thus when
+	// the InterruptTimer is GC'd, the AutoKiller is also unreachable and
+	// can be GC'd. When it gets finalized, it tells the AlarmThread to
+	// terminate, triggering the thread to exit gracefully.
+	//
+	private static final class AutoKiller {
+		private final AlarmState state;
+
+		AutoKiller(final AlarmState s) {
+			state = s;
+		}
+
+		@Override
+		protected void finalize() throws Throwable {
+			state.terminate();
+		}
+	}
+
+	static final class AlarmState implements Runnable {
+		private Thread callingThread;
+
+		private long deadline;
+
+		private boolean terminated;
+
+		AlarmState() {
+			callingThread = Thread.currentThread();
+		}
+
+		public synchronized void run() {
+			while (!terminated && callingThread.isAlive()) {
+				try {
+					if (0 < deadline) {
+						final long delay = deadline - now();
+						if (delay <= 0) {
+							deadline = 0;
+							callingThread.interrupt();
+						} else {
+							wait(delay);
+						}
+					} else {
+						wait(1000);
+					}
+				} catch (InterruptedException e) {
+					// Treat an interrupt as notice to examine state.
+				}
+			}
+		}
+
+		synchronized void begin(final int timeout) {
+			if (terminated)
+				throw new IllegalStateException("Timer already terminated");
+			callingThread = Thread.currentThread();
+			deadline = now() + timeout;
+			notifyAll();
+		}
+
+		synchronized void end() {
+			if (0 == deadline)
+				Thread.interrupted();
+			else
+				deadline = 0;
+			notifyAll();
+		}
+
+		synchronized void terminate() {
+			if (!terminated) {
+				deadline = 0;
+				terminated = true;
+				notifyAll();
+			}
+		}
+
+		private static long now() {
+			return System.currentTimeMillis();
+		}
+	}
+}
diff --git a/org.spearce.jgit/src/org/spearce/jgit/util/io/TimeoutInputStream.java b/org.spearce.jgit/src/org/spearce/jgit/util/io/TimeoutInputStream.java
new file mode 100644
index 0000000..3a321aa
--- /dev/null
+++ b/org.spearce.jgit/src/org/spearce/jgit/util/io/TimeoutInputStream.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright (C) 2009, Google Inc.
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ *   notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ *   copyright notice, this list of conditions and the following
+ *   disclaimer in the documentation and/or other materials provided
+ *   with the distribution.
+ *
+ * - Neither the name of the Git Development Community nor the
+ *   names of its contributors may be used to endorse or promote
+ *   products derived from this software without specific prior
+ *   written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
+ * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+ * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.spearce.jgit.util.io;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+
+/** InputStream with a configurable timeout. */
+public class TimeoutInputStream extends FilterInputStream {
+	private final InterruptTimer myTimer;
+
+	private int timeout;
+
+	/**
+	 * Wrap an input stream with a timeout on all read operations.
+	 *
+	 * @param src
+	 *            base input stream (to read from). The stream must be
+	 *            interruptible (most socket streams are).
+	 * @param timer
+	 *            timer to manage the timeouts during reads.
+	 */
+	public TimeoutInputStream(final InputStream src,
+			final InterruptTimer timer) {
+		super(src);
+		myTimer = timer;
+	}
+
+	/** @return number of milliseconds before aborting a read. */
+	public int getTimeout() {
+		return timeout;
+	}
+
+	/**
+	 * @param millis
+	 *            number of milliseconds before aborting a read. Must be > 0.
+	 */
+	public void setTimeout(final int millis) {
+		if (millis < 0)
+			throw new IllegalArgumentException("Invalid timeout: " + millis);
+		timeout = millis;
+	}
+
+	@Override
+	public int read() throws IOException {
+		try {
+			beginRead();
+			return super.read();
+		} catch (InterruptedIOException e) {
+			throw readTimedOut();
+		} finally {
+			endRead();
+		}
+	}
+
+	@Override
+	public int read(byte[] buf) throws IOException {
+		return read(buf, 0, buf.length);
+	}
+
+	@Override
+	public int read(byte[] buf, int off, int cnt) throws IOException {
+		try {
+			beginRead();
+			return super.read(buf, off, cnt);
+		} catch (InterruptedIOException e) {
+			throw readTimedOut();
+		} finally {
+			endRead();
+		}
+	}
+
+	@Override
+	public long skip(long cnt) throws IOException {
+		try {
+			beginRead();
+			return super.skip(cnt);
+		} catch (InterruptedIOException e) {
+			throw readTimedOut();
+		} finally {
+			endRead();
+		}
+	}
+
+	private void beginRead() {
+		myTimer.begin(timeout);
+	}
+
+	private void endRead() {
+		myTimer.end();
+	}
+
+	private static InterruptedIOException readTimedOut() {
+		return new InterruptedIOException("Read timed out");
+	}
+}
diff --git a/org.spearce.jgit/src/org/spearce/jgit/util/io/TimeoutOutputStream.java b/org.spearce.jgit/src/org/spearce/jgit/util/io/TimeoutOutputStream.java
new file mode 100644
index 0000000..014cd92
--- /dev/null
+++ b/org.spearce.jgit/src/org/spearce/jgit/util/io/TimeoutOutputStream.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright (C) 2009, Google Inc.
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ *   notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ *   copyright notice, this list of conditions and the following
+ *   disclaimer in the documentation and/or other materials provided
+ *   with the distribution.
+ *
+ * - Neither the name of the Git Development Community nor the
+ *   names of its contributors may be used to endorse or promote
+ *   products derived from this software without specific prior
+ *   written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
+ * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+ * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.spearce.jgit.util.io;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+
+/** OutputStream with a configurable timeout. */
+public class TimeoutOutputStream extends OutputStream {
+	private final OutputStream dst;
+
+	private final InterruptTimer myTimer;
+
+	private int timeout;
+
+	/**
+	 * Wrap an output stream with a timeout on all write operations.
+	 *
+	 * @param destination
+	 *            base input stream (to write to). The stream must be
+	 *            interruptible (most socket streams are).
+	 * @param timer
+	 *            timer to manage the timeouts during writes.
+	 */
+	public TimeoutOutputStream(final OutputStream destination,
+			final InterruptTimer timer) {
+		dst = destination;
+		myTimer = timer;
+	}
+
+	/** @return number of milliseconds before aborting a write. */
+	public int getTimeout() {
+		return timeout;
+	}
+
+	/**
+	 * @param millis
+	 *            number of milliseconds before aborting a write. Must be > 0.
+	 */
+	public void setTimeout(final int millis) {
+		if (millis < 0)
+			throw new IllegalArgumentException("Invalid timeout: " + millis);
+		timeout = millis;
+	}
+
+	@Override
+	public void write(int b) throws IOException {
+		try {
+			beginWrite();
+			dst.write(b);
+		} catch (InterruptedIOException e) {
+			throw writeTimedOut();
+		} finally {
+			endWrite();
+		}
+	}
+
+	@Override
+	public void write(byte[] buf) throws IOException {
+		write(buf, 0, buf.length);
+	}
+
+	@Override
+	public void write(byte[] buf, int off, int len) throws IOException {
+		try {
+			beginWrite();
+			dst.write(buf, off, len);
+		} catch (InterruptedIOException e) {
+			throw writeTimedOut();
+		} finally {
+			endWrite();
+		}
+	}
+
+	@Override
+	public void flush() throws IOException {
+		try {
+			beginWrite();
+			dst.flush();
+		} catch (InterruptedIOException e) {
+			throw writeTimedOut();
+		} finally {
+			endWrite();
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		try {
+			beginWrite();
+			dst.close();
+		} catch (InterruptedIOException e) {
+			throw writeTimedOut();
+		} finally {
+			endWrite();
+		}
+	}
+
+	private void beginWrite() {
+		myTimer.begin(timeout);
+	}
+
+	private void endWrite() {
+		myTimer.end();
+	}
+
+	private static InterruptedIOException writeTimedOut() {
+		return new InterruptedIOException("Write timed out");
+	}
+}
-- 
1.6.3.2.416.g04d0

^ permalink raw reply related	[flat|nested] 11+ messages in thread

end of thread, other threads:[~2009-06-23 16:42 UTC | newest]

Thread overview: 11+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2009-06-19 21:27 [JGIT PATCH 0/6] Add timeouts to network IO Shawn O. Pearce
2009-06-19 21:27 ` [JGIT PATCH 1/6] Create input and output streams that have timeouts Shawn O. Pearce
2009-06-19 21:27   ` [JGIT PATCH 2/6] Add remote.name.timeout to configure an IO timeout Shawn O. Pearce
2009-06-19 21:27     ` [JGIT PATCH 3/6] Add timeouts to smart transport protocol clients Shawn O. Pearce
2009-06-19 21:27       ` [JGIT PATCH 4/6] Add timeouts to smart transport protocol servers Shawn O. Pearce
2009-06-19 21:27         ` [JGIT PATCH 5/6] Add timeouts to anonymous git:// daemon Shawn O. Pearce
2009-06-19 21:27           ` [JGIT PATCH 6/6] Add --timeout command line options Shawn O. Pearce
2009-06-20 22:28     ` [JGIT PATCH 2/6] Add remote.name.timeout to configure an IO timeout Robin Rosenberg
2009-06-20 22:54       ` Shawn O. Pearce
2009-06-22 21:09   ` [JGIT PATCH 1/6] Create input and output streams that have timeouts Robin Rosenberg
2009-06-23 16:41     ` [JGIT PATCH 1/6 v2] " Shawn O. Pearce

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).