From c8820b02be33fc441815db68392f5bbb7ce2a6c2 Mon Sep 17 00:00:00 2001 From: Tobias Gesellchen Date: Mon, 12 Apr 2021 01:35:08 +0200 Subject: [PATCH 1/2] Use Okio's Pipe for stream piping --- .../gesellix/docker/engine/AttachConfig.java | 26 ++++- .../docker/engine/OkResponseCallback.java | 105 ++++++++---------- .../OkDockerClientIntegrationSpec.groovy | 10 +- 3 files changed, 74 insertions(+), 67 deletions(-) diff --git a/engine/src/main/java/de/gesellix/docker/engine/AttachConfig.java b/engine/src/main/java/de/gesellix/docker/engine/AttachConfig.java index 3d3ef2ad..1602836f 100644 --- a/engine/src/main/java/de/gesellix/docker/engine/AttachConfig.java +++ b/engine/src/main/java/de/gesellix/docker/engine/AttachConfig.java @@ -56,27 +56,39 @@ public void setOnResponse(Closure onResponse) { setOnResponse(onResponse::call); } + /** + * @deprecated Internal use only. Will eventually be removed. + */ + @Deprecated public Object onSinkClosed(Response r) { return callbacks.onSinkClosed.apply(r); } + /** + * @deprecated Internal use only. Will eventually be removed. + */ + @Deprecated public void setOnSinkClosed(Function onSinkClosed) { callbacks.onSinkClosed = onSinkClosed; } /** * @see #setOnSinkClosed(Function) - * @deprecated Will be removed after migration from Groovy to plain Java + * @deprecated Internal use only. Will eventually be removed. */ @Deprecated public void setOnSinkClosed(Closure onSinkClosed) { setOnSinkClosed(onSinkClosed::call); } - public Object onSinkWritten(Response r) { + public Object onStdInConsumed(Response r) { return callbacks.onSinkWritten.apply(r); } + public Object onSinkWritten(Response r) { + return onStdInConsumed(r); + } + public void setOnSinkWritten(Function onSinkWritten) { callbacks.onSinkWritten = onSinkWritten; } @@ -90,10 +102,14 @@ public void setOnSinkWritten(Closure onSinkWritten) { setOnSinkWritten(onSinkWritten::call); } - public Object onSourceConsumed() { + public Object onStdOutConsumed() { return callbacks.onSourceConsumed.get(); } + public Object onSourceConsumed() { + return onStdOutConsumed(); + } + public void setOnSourceConsumed(Supplier onSourceConsumed) { callbacks.onSourceConsumed = onSourceConsumed; } @@ -151,6 +167,10 @@ public static class Callbacks { private Function onFailure = (Exception e) -> null; private Function onResponse = (Response r) -> null; + /** + * @deprecated Internal use only. Will eventually be removed. + */ + @Deprecated private Function onSinkClosed = (Response r) -> null; private Function onSinkWritten = (Response r) -> null; private Supplier onSourceConsumed = () -> null; diff --git a/engine/src/main/java/de/gesellix/docker/engine/OkResponseCallback.java b/engine/src/main/java/de/gesellix/docker/engine/OkResponseCallback.java index 5c89a447..b513d9ec 100644 --- a/engine/src/main/java/de/gesellix/docker/engine/OkResponseCallback.java +++ b/engine/src/main/java/de/gesellix/docker/engine/OkResponseCallback.java @@ -3,19 +3,19 @@ import okhttp3.Call; import okhttp3.Callback; import okhttp3.Response; -import okio.BufferedSink; +import okio.Buffer; import okio.Okio; +import okio.Pipe; +import okio.Sink; import okio.Source; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; public class OkResponseCallback implements Callback { @@ -40,38 +40,47 @@ public void onFailure(Exception e) { attachConfig.onFailure(e); } + /** Reads all bytes from {@code source} and writes them to {@code sink}. */ + private Long readAll(Source source, Sink sink) throws IOException { + long result = 0L; +// Okio.buffer(sink).writeAll(source); + Buffer buffer = new Buffer(); + for (long count; (count = source.read(buffer, 8192)) != -1L; result += count) { + sink.write(buffer, count); + } + return result; + } + + /** Calls {@link #readAll} on a background thread. */ + private Future readAllAsync(final Source source, final Sink sink) { + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + return executor.submit(() -> readAll(source, sink)); + } + finally { + executor.shutdown(); + } + } + @Override public void onResponse(@NotNull final Call call, @NotNull final Response response) throws IOException { TcpUpgradeVerificator.ensureTcpUpgrade(response); if (attachConfig.getStreams().getStdin() != null) { - // pass input from the client via stdin and pass it to the output stream - // running it in an own thread allows the client to gain back control - final Source stdinSource = Okio.source(attachConfig.getStreams().getStdin()); + // client's stdin -> socket Thread writer = new Thread(() -> { + Pipe p = new Pipe(8192); try { - final BufferedSink bufferedSink = Okio.buffer(getConnectionProvider().getSink()); - bufferedSink.writeAll(stdinSource); - bufferedSink.flush(); - attachConfig.onSinkWritten(response); - CountDownLatch done = new CountDownLatch(1); - delayed(100, "writer", () -> { - try { - bufferedSink.close(); - attachConfig.onSinkClosed(response); - } - catch (Exception e) { - log.warn("error", e); - } - return null; - }, done); - done.await(5, TimeUnit.SECONDS); - } - catch (InterruptedException e) { - log.debug("stdin->sink interrupted", e); - Thread.currentThread().interrupt(); + Future futureSink = readAllAsync(p.source(), getConnectionProvider().getSink()); + Future futureSource = readAllAsync(Okio.source(attachConfig.getStreams().getStdin()), p.sink()); + Long read = futureSource.get(); + p.sink().close(); + attachConfig.onStdInConsumed(response); + Long written = futureSink.get(); + attachConfig.onSinkClosed(response); } catch (Exception e) { + log.warn("error", e); onFailure(e); } finally { @@ -86,23 +95,19 @@ public void onResponse(@NotNull final Call call, @NotNull final Response respons } if (attachConfig.getStreams().getStdout() != null) { - final BufferedSink bufferedStdout = Okio.buffer(Okio.sink(attachConfig.getStreams().getStdout())); + // client's stdout <- socket Thread reader = new Thread(() -> { + Pipe p = new Pipe(8192); try { - bufferedStdout.writeAll(getConnectionProvider().getSource()); - bufferedStdout.flush(); - CountDownLatch done = new CountDownLatch(1); - delayed(100, "reader", () -> { - attachConfig.onSourceConsumed(); - return null; - }, done); - done.await(5, TimeUnit.SECONDS); - } - catch (InterruptedException e) { - log.debug("source->stdout interrupted", e); - Thread.currentThread().interrupt(); + Future futureSink = readAllAsync(p.source(), Okio.sink(attachConfig.getStreams().getStdout())); + Future futureSource = readAllAsync(getConnectionProvider().getSource(), p.sink()); + Long read = futureSource.get(); + attachConfig.onStdOutConsumed(); + p.sink().close(); + Long written = futureSink.get(); } catch (Exception e) { + log.warn("error", e); onFailure(e); } finally { @@ -119,22 +124,6 @@ public void onResponse(@NotNull final Call call, @NotNull final Response respons attachConfig.onResponse(response); } - public static void delayed(long delay, String name, final Supplier action, final CountDownLatch done) { - new Timer(true).schedule(new TimerTask() { - @Override - public void run() { - Thread.currentThread().setName("Delayed " + name + " action (" + Thread.currentThread().getName() + ")"); - try { - action.get(); - } - finally { - done.countDown(); - cancel(); - } - } - }, delay); - } - public ConnectionProvider getConnectionProvider() { return connectionProvider; } diff --git a/integrationtest/src/test/groovy/de/gesellix/docker/engine/OkDockerClientIntegrationSpec.groovy b/integrationtest/src/test/groovy/de/gesellix/docker/engine/OkDockerClientIntegrationSpec.groovy index b4bb95e1..22a1ab3f 100644 --- a/integrationtest/src/test/groovy/de/gesellix/docker/engine/OkDockerClientIntegrationSpec.groovy +++ b/integrationtest/src/test/groovy/de/gesellix/docker/engine/OkDockerClientIntegrationSpec.groovy @@ -145,16 +145,15 @@ class OkDockerClientIntegrationSpec extends Specification { def stdout = new ByteArrayOutputStream(expectedOutput.length()) def stdin = new PipedOutputStream() - def onSinkClosed = new CountDownLatch(1) def onSinkWritten = new CountDownLatch(1) def onSourceConsumed = new CountDownLatch(1) + Response okResponse = null def attachConfig = new AttachConfig() attachConfig.streams.stdin = new PipedInputStream(stdin) attachConfig.streams.stdout = stdout - attachConfig.onSinkClosed = { Response response -> - log.info("[attach (interactive)] sink closed \n${stdout.toString()}") - onSinkClosed.countDown() + attachConfig.onResponse = { Response res -> + okResponse = res } attachConfig.onSinkWritten = { Response response -> log.info("[attach (interactive)] sink written \n${stdout.toString()}") @@ -178,12 +177,11 @@ class OkDockerClientIntegrationSpec extends Specification { stdin.write("$content\n".bytes) stdin.flush() stdin.close() +// okResponse?.close() def sourceConsumed = onSourceConsumed.await(5, SECONDS) def sinkWritten = onSinkWritten.await(5, SECONDS) - def sinkClosed = onSinkClosed.await(5, SECONDS) then: - sinkClosed sinkWritten sourceConsumed stdout.size() > 0 From a7de470b105259ecf25c13cb6fc961732716c52f Mon Sep 17 00:00:00 2001 From: Tobias Gesellchen Date: Sun, 18 Apr 2021 11:52:22 +0200 Subject: [PATCH 2/2] wip --- .../docker/engine/OkResponseCallback.java | 78 +++++++++---------- .../OkDockerClientIntegrationSpec.groovy | 7 ++ 2 files changed, 43 insertions(+), 42 deletions(-) diff --git a/engine/src/main/java/de/gesellix/docker/engine/OkResponseCallback.java b/engine/src/main/java/de/gesellix/docker/engine/OkResponseCallback.java index b513d9ec..753100e8 100644 --- a/engine/src/main/java/de/gesellix/docker/engine/OkResponseCallback.java +++ b/engine/src/main/java/de/gesellix/docker/engine/OkResponseCallback.java @@ -16,6 +16,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.function.BiConsumer; public class OkResponseCallback implements Callback { @@ -45,7 +46,7 @@ private Long readAll(Source source, Sink sink) throws IOException { long result = 0L; // Okio.buffer(sink).writeAll(source); Buffer buffer = new Buffer(); - for (long count; (count = source.read(buffer, 8192)) != -1L; result += count) { + for (long count; (count = source.read(buffer, 1024)) != -1L; result += count) { sink.write(buffer, count); } return result; @@ -62,31 +63,39 @@ private Future readAllAsync(final Source source, final Sink sink) { } } + private Thread transfer(Source source, Sink sink, BiConsumer onFinish) { + return new Thread(() -> { + Pipe p = new Pipe(1024); + try { + Future futureSink = readAllAsync(p.source(), sink); + Future futureSource = readAllAsync(source, p.sink()); + Long read = futureSource.get(); + p.sink().flush(); + p.sink().close(); + Long written = futureSink.get(); + onFinish.accept(read, written); + } + catch (Exception e) { + log.warn("error", e); + onFailure(e); + } + }); + } + @Override public void onResponse(@NotNull final Call call, @NotNull final Response response) throws IOException { TcpUpgradeVerificator.ensureTcpUpgrade(response); if (attachConfig.getStreams().getStdin() != null) { // client's stdin -> socket - Thread writer = new Thread(() -> { - Pipe p = new Pipe(8192); - try { - Future futureSink = readAllAsync(p.source(), getConnectionProvider().getSink()); - Future futureSource = readAllAsync(Okio.source(attachConfig.getStreams().getStdin()), p.sink()); - Long read = futureSource.get(); - p.sink().close(); - attachConfig.onStdInConsumed(response); - Long written = futureSink.get(); - attachConfig.onSinkClosed(response); - } - catch (Exception e) { - log.warn("error", e); - onFailure(e); - } - finally { - log.trace("writer finished"); - } - }); + Thread writer = transfer( + Okio.source(attachConfig.getStreams().getStdin()), + connectionProvider.getSink(), + (read, written) -> { + log.warn("read {}, written {}", read, written); + attachConfig.onStdInConsumed(response); + attachConfig.onSinkClosed(response); + }); writer.setName("stdin-writer " + call.request().url().encodedPath()); writer.start(); } @@ -96,24 +105,13 @@ public void onResponse(@NotNull final Call call, @NotNull final Response respons if (attachConfig.getStreams().getStdout() != null) { // client's stdout <- socket - Thread reader = new Thread(() -> { - Pipe p = new Pipe(8192); - try { - Future futureSink = readAllAsync(p.source(), Okio.sink(attachConfig.getStreams().getStdout())); - Future futureSource = readAllAsync(getConnectionProvider().getSource(), p.sink()); - Long read = futureSource.get(); - attachConfig.onStdOutConsumed(); - p.sink().close(); - Long written = futureSink.get(); - } - catch (Exception e) { - log.warn("error", e); - onFailure(e); - } - finally { - log.trace("reader finished"); - } - }); + Thread reader = transfer( + connectionProvider.getSource(), + Okio.sink(attachConfig.getStreams().getStdout()), + (read, written) -> { + log.warn("read {}, written {}", read, written); + attachConfig.onStdOutConsumed(); + }); reader.setName("stdout-reader " + call.request().url().encodedPath()); reader.start(); } @@ -123,8 +121,4 @@ public void onResponse(@NotNull final Call call, @NotNull final Response respons attachConfig.onResponse(response); } - - public ConnectionProvider getConnectionProvider() { - return connectionProvider; - } } diff --git a/integrationtest/src/test/groovy/de/gesellix/docker/engine/OkDockerClientIntegrationSpec.groovy b/integrationtest/src/test/groovy/de/gesellix/docker/engine/OkDockerClientIntegrationSpec.groovy index 22a1ab3f..1299e624 100644 --- a/integrationtest/src/test/groovy/de/gesellix/docker/engine/OkDockerClientIntegrationSpec.groovy +++ b/integrationtest/src/test/groovy/de/gesellix/docker/engine/OkDockerClientIntegrationSpec.groovy @@ -145,6 +145,7 @@ class OkDockerClientIntegrationSpec extends Specification { def stdout = new ByteArrayOutputStream(expectedOutput.length()) def stdin = new PipedOutputStream() + def onSinkClosed = new CountDownLatch(1) def onSinkWritten = new CountDownLatch(1) def onSourceConsumed = new CountDownLatch(1) @@ -155,6 +156,10 @@ class OkDockerClientIntegrationSpec extends Specification { attachConfig.onResponse = { Response res -> okResponse = res } + attachConfig.onSinkClosed = { Response response -> + log.info("[attach (interactive)] sink closed \n${stdout.toString()}") + onSinkClosed.countDown() + } attachConfig.onSinkWritten = { Response response -> log.info("[attach (interactive)] sink written \n${stdout.toString()}") onSinkWritten.countDown() @@ -180,8 +185,10 @@ class OkDockerClientIntegrationSpec extends Specification { // okResponse?.close() def sourceConsumed = onSourceConsumed.await(5, SECONDS) def sinkWritten = onSinkWritten.await(5, SECONDS) + def sinkClosed = onSinkClosed.await(5, SECONDS) then: + sinkClosed sinkWritten sourceConsumed stdout.size() > 0