From 394d6000041c55f71c3b9a5d21a7ff9e23017b85 Mon Sep 17 00:00:00 2001 From: Tobias Gesellchen Date: Sat, 10 Apr 2021 21:41:04 +0200 Subject: [PATCH 1/2] [wip] add logs to debug /attach on windows --- .../docker/engine/OkResponseCallback.java | 165 ++++++++++++++++++ .../OkDockerClientIntegrationSpec.groovy | 21 ++- 2 files changed, 180 insertions(+), 6 deletions(-) create mode 100644 engine/src/main/java/de/gesellix/docker/engine/OkResponseCallback.java diff --git a/engine/src/main/java/de/gesellix/docker/engine/OkResponseCallback.java b/engine/src/main/java/de/gesellix/docker/engine/OkResponseCallback.java new file mode 100644 index 00000000..ee508aa9 --- /dev/null +++ b/engine/src/main/java/de/gesellix/docker/engine/OkResponseCallback.java @@ -0,0 +1,165 @@ +package de.gesellix.docker.engine; + +import okhttp3.Call; +import okhttp3.Callback; +import okhttp3.Response; +import okio.BufferedSink; +import okio.Okio; +import okio.Source; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +public class OkResponseCallback implements Callback { + + private static final Logger log = LoggerFactory.getLogger(OkResponseCallback.class); + + private final ConnectionProvider connectionProvider; + private final AttachConfig attachConfig; + + public OkResponseCallback(ConnectionProvider connectionProvider, AttachConfig attachConfig) { + this.connectionProvider = connectionProvider; + this.attachConfig = attachConfig; + } + + @Override + public void onFailure(@NotNull Call call, @NotNull final IOException e) { + log.error("connection failed: " + e.getMessage(), e); + attachConfig.onFailure(e); + } + + public void onFailure(Exception e) { + log.error("error", e); + attachConfig.onFailure(e); + } + + @Override + public void onResponse(@NotNull final Call call, @NotNull final Response response) throws IOException { + TcpUpgradeVerificator.ensureTcpUpgrade(response); + + if (attachConfig.getStreams().getStdin() != null) { + // pass input from the client via stdin and pass it to the output stream + // running it in an own thread allows the client to gain back control + final Source stdinSource = Okio.source(attachConfig.getStreams().getStdin()); + Thread writer = new Thread(() -> { + try { + final BufferedSink bufferedSink = Okio.buffer(getConnectionProvider().getSink()); + long written = bufferedSink.writeAll(stdinSource); + log.warn("xxxxx - writer - written " + written); + bufferedSink.flush(); + log.warn("xxxxx - writer - flushed"); + attachConfig.onSinkWritten(response); + log.warn("xxxxx - writer - onSinkWritten"); + CountDownLatch done = new CountDownLatch(1); + delayed(100, "writer", () -> { + log.warn("xxxxx - writer - delayed"); + try { + bufferedSink.close(); + log.warn("xxxxx - writer - delayed closed"); + attachConfig.onSinkClosed(response); + log.warn("xxxxx - writer - delayed onSinkClosed"); + } + catch (Exception e) { + log.warn("error", e); + } + log.warn("xxxxx - writer - delayed return"); + return null; + }, done); + boolean inTime = done.await(5, TimeUnit.SECONDS); + if (!inTime) { + log.warn("xxxxx - writer - done timeout"); + } + } + catch (InterruptedException e) { + log.debug("stdin->sink interrupted", e); + Thread.currentThread().interrupt(); + } + catch (Exception e) { + onFailure(e); + } + finally { + log.trace("writer finished"); + } + }); + writer.setName("stdin-writer " + call.request().url().encodedPath()); + writer.start(); + } + else { + log.debug("no stdin."); + } + + if (attachConfig.getStreams().getStdout() != null) { + final BufferedSink bufferedStdout = Okio.buffer(Okio.sink(attachConfig.getStreams().getStdout())); + Thread reader = new Thread(() -> { + try { + log.warn("xxxxx - reader - writeAll -> " + getConnectionProvider().getSource()); + bufferedStdout.writeAll(getConnectionProvider().getSource()); + log.warn("xxxxx - reader - flush"); + bufferedStdout.flush(); + log.warn("xxxxx - reader - flushed"); + CountDownLatch done = new CountDownLatch(1); + delayed(100, "reader", () -> { + log.warn("xxxxx - reader - delay ..."); + attachConfig.onSourceConsumed(); + log.warn("xxxxx - reader - delay onSourceConsumed"); + return null; + }, done); + boolean inTime = done.await(5, TimeUnit.SECONDS); + if (!inTime) { + log.warn("xxxxx - reader - done timeout"); + } + } + catch (InterruptedException e) { + log.debug("source->stdout interrupted", e); + Thread.currentThread().interrupt(); + } + catch (Exception e) { + onFailure(e); + } + finally { + log.trace("reader finished"); + } + }); + reader.setName("stdout-reader " + call.request().url().encodedPath()); + reader.start(); + } + else { + log.debug("no stdout."); + } + + attachConfig.onResponse(response); + } + + public static void delayed(long delay, String name, final Supplier action, final CountDownLatch done) { + new Timer(true).schedule(new TimerTask() { + @Override + public void run() { + Thread.currentThread().setName("Delayed " + name + " action (" + Thread.currentThread().getName() + ")"); + try { + action.get(); + } + catch (Exception e) { + log.warn("xxxxx - delayed - error", e); + throw e; + } + finally { + log.warn("xxxxx - delayed - done"); + done.countDown(); + log.warn("xxxxx - delayed - cancel"); + cancel(); + } + } + }, delay); + } + + public ConnectionProvider getConnectionProvider() { + return connectionProvider; + } +} diff --git a/integrationtest/src/test/groovy/de/gesellix/docker/engine/OkDockerClientIntegrationSpec.groovy b/integrationtest/src/test/groovy/de/gesellix/docker/engine/OkDockerClientIntegrationSpec.groovy index 341757ba..48e719d5 100644 --- a/integrationtest/src/test/groovy/de/gesellix/docker/engine/OkDockerClientIntegrationSpec.groovy +++ b/integrationtest/src/test/groovy/de/gesellix/docker/engine/OkDockerClientIntegrationSpec.groovy @@ -126,10 +126,17 @@ class OkDockerClientIntegrationSpec extends Specification { query: [fromImage: CONSTANTS.imageName]]) // create container def containerConfig = [ - Tty : tty, - OpenStdin : openStdin, - Image : CONSTANTS.imageName, - Entrypoint: ["/cat"] + HostConfig : [ + AutoRemove: true + ], + AttachStdin : true, + AttachStdout: true, + AttachStderr: true, + Tty : tty, + OpenStdin : openStdin, + StdinOnce : true, + Image : CONSTANTS.imageName, + Entrypoint : ["/cat"] ] String containerId = client.post([path : "/containers/create".toString(), query : [name: ""], @@ -139,12 +146,13 @@ class OkDockerClientIntegrationSpec extends Specification { client.post([path : "/containers/${containerId}/start".toString(), requestContentType: "application/json"]) // resize container TTY -// client.post([path : "/containers/${containerId}/attach/resize".toString(), -// query: [h: 46, w: 158]]) + client.post([path : "/containers/${containerId}/attach/resize".toString(), + query: [h: 46, w: 158]]) // inspect container // boolean multiplexStreams = !client.get([path: "/containers/${containerId}/json".toString()]).content.Config.Tty String content = "attach ${UUID.randomUUID()}" + println "content (length ${content.length()}): $content" String expectedOutput = containerConfig.Tty ? "$content\r\n$content\r\n" : "$content\n" def stdout = new ByteArrayOutputStream(expectedOutput.length()) @@ -185,6 +193,7 @@ class OkDockerClientIntegrationSpec extends Specification { when: stdin.write("$content\n".bytes) + println "ttttt - written" stdin.flush() stdin.close() boolean sinkWritten = onSinkWritten.await(5, SECONDS) From 7d890184df6100b3a3fde5f05034282afcd750f3 Mon Sep 17 00:00:00 2001 From: Tobias Gesellchen Date: Sat, 7 Jun 2025 10:39:02 +0200 Subject: [PATCH 2/2] wip --- .../engine/OkDockerClientIntegrationSpec.groovy | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/integrationtest/src/test/groovy/de/gesellix/docker/engine/OkDockerClientIntegrationSpec.groovy b/integrationtest/src/test/groovy/de/gesellix/docker/engine/OkDockerClientIntegrationSpec.groovy index 48e719d5..18732175 100644 --- a/integrationtest/src/test/groovy/de/gesellix/docker/engine/OkDockerClientIntegrationSpec.groovy +++ b/integrationtest/src/test/groovy/de/gesellix/docker/engine/OkDockerClientIntegrationSpec.groovy @@ -46,9 +46,9 @@ class OkDockerClientIntegrationSpec extends Specification { when: def response = client.post(request) then: - response.content.last() in [ - [status: "Status: Image is up to date for ${CONSTANTS.imageName}".toString()], - [status: "Status: Downloaded newer image for ${CONSTANTS.imageName}".toString()] + response.content.last().status in [ + "Status: Image is up to date for ${CONSTANTS.imageName}".toString(), + "Status: Downloaded newer image for ${CONSTANTS.imageName}".toString(), ] } @@ -136,7 +136,9 @@ class OkDockerClientIntegrationSpec extends Specification { OpenStdin : openStdin, StdinOnce : true, Image : CONSTANTS.imageName, - Entrypoint : ["/cat"] + Cmd : LocalDocker.isNativeWindows() + ? ["cmd", "/V:ON", "/C", "set /p line= & echo #!line!#"] + : ["/bin/sh", "-c", "read line && echo \"#\$line#\""] ] String containerId = client.post([path : "/containers/create".toString(), query : [name: ""], @@ -179,10 +181,10 @@ class OkDockerClientIntegrationSpec extends Specification { } attachConfig.onSourceConsumed = { if (stdout.toString() == expectedOutput) { - log.info("[attach (interactive)] consumed (complete: ${stdout.toString() == expectedOutput})\n${stdout.toString()}") + log.info("[attach (interactive)] fully consumed (complete: ${stdout.toString() == expectedOutput})\n${stdout.toString()}") onSourceConsumed.countDown() } else { - log.info("[attach (interactive)] consumed (complete: ${stdout.toString() == expectedOutput})\n${stdout.toString()}") + log.info("[attach (interactive)] partially consumed (complete: ${stdout.toString() == expectedOutput})\n${stdout.toString()}") } } // new OkDockerClient().post([