Skip to content

[wip] add logs to debug /attach on windows #27

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 165 additions & 0 deletions engine/src/main/java/de/gesellix/docker/engine/OkResponseCallback.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package de.gesellix.docker.engine;

import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Response;
import okio.BufferedSink;
import okio.Okio;
import okio.Source;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class OkResponseCallback implements Callback {

private static final Logger log = LoggerFactory.getLogger(OkResponseCallback.class);

private final ConnectionProvider connectionProvider;
private final AttachConfig attachConfig;

public OkResponseCallback(ConnectionProvider connectionProvider, AttachConfig attachConfig) {
this.connectionProvider = connectionProvider;
this.attachConfig = attachConfig;
}

@Override
public void onFailure(@NotNull Call call, @NotNull final IOException e) {
log.error("connection failed: " + e.getMessage(), e);
attachConfig.onFailure(e);
}

public void onFailure(Exception e) {
log.error("error", e);
attachConfig.onFailure(e);
}

@Override
public void onResponse(@NotNull final Call call, @NotNull final Response response) throws IOException {
TcpUpgradeVerificator.ensureTcpUpgrade(response);

if (attachConfig.getStreams().getStdin() != null) {
// pass input from the client via stdin and pass it to the output stream
// running it in an own thread allows the client to gain back control
final Source stdinSource = Okio.source(attachConfig.getStreams().getStdin());
Thread writer = new Thread(() -> {
try {
final BufferedSink bufferedSink = Okio.buffer(getConnectionProvider().getSink());
long written = bufferedSink.writeAll(stdinSource);
log.warn("xxxxx - writer - written " + written);
bufferedSink.flush();
log.warn("xxxxx - writer - flushed");
attachConfig.onSinkWritten(response);
log.warn("xxxxx - writer - onSinkWritten");
CountDownLatch done = new CountDownLatch(1);
delayed(100, "writer", () -> {
log.warn("xxxxx - writer - delayed");
try {
bufferedSink.close();
log.warn("xxxxx - writer - delayed closed");
attachConfig.onSinkClosed(response);
log.warn("xxxxx - writer - delayed onSinkClosed");
}
catch (Exception e) {
log.warn("error", e);
}
log.warn("xxxxx - writer - delayed return");
return null;
}, done);
boolean inTime = done.await(5, TimeUnit.SECONDS);
if (!inTime) {
log.warn("xxxxx - writer - done timeout");
}
}
catch (InterruptedException e) {
log.debug("stdin->sink interrupted", e);
Thread.currentThread().interrupt();
}
catch (Exception e) {
onFailure(e);
}
finally {
log.trace("writer finished");
}
});
writer.setName("stdin-writer " + call.request().url().encodedPath());
writer.start();
}
else {
log.debug("no stdin.");
}

if (attachConfig.getStreams().getStdout() != null) {
final BufferedSink bufferedStdout = Okio.buffer(Okio.sink(attachConfig.getStreams().getStdout()));
Thread reader = new Thread(() -> {
try {
log.warn("xxxxx - reader - writeAll -> " + getConnectionProvider().getSource());
bufferedStdout.writeAll(getConnectionProvider().getSource());
log.warn("xxxxx - reader - flush");
bufferedStdout.flush();
log.warn("xxxxx - reader - flushed");
CountDownLatch done = new CountDownLatch(1);
delayed(100, "reader", () -> {
log.warn("xxxxx - reader - delay ...");
attachConfig.onSourceConsumed();
log.warn("xxxxx - reader - delay onSourceConsumed");
return null;
}, done);
boolean inTime = done.await(5, TimeUnit.SECONDS);
if (!inTime) {
log.warn("xxxxx - reader - done timeout");
}
}
catch (InterruptedException e) {
log.debug("source->stdout interrupted", e);
Thread.currentThread().interrupt();
}
catch (Exception e) {
onFailure(e);
}
finally {
log.trace("reader finished");
}
});
reader.setName("stdout-reader " + call.request().url().encodedPath());
reader.start();
}
else {
log.debug("no stdout.");
}

attachConfig.onResponse(response);
}

public static void delayed(long delay, String name, final Supplier<?> action, final CountDownLatch done) {
new Timer(true).schedule(new TimerTask() {
@Override
public void run() {
Thread.currentThread().setName("Delayed " + name + " action (" + Thread.currentThread().getName() + ")");
try {
action.get();
}
catch (Exception e) {
log.warn("xxxxx - delayed - error", e);
throw e;
}
finally {
log.warn("xxxxx - delayed - done");
done.countDown();
log.warn("xxxxx - delayed - cancel");
cancel();
}
}
}, delay);
}

public ConnectionProvider getConnectionProvider() {
return connectionProvider;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ class OkDockerClientIntegrationSpec extends Specification {
when:
def response = client.post(request)
then:
response.content.last() in [
[status: "Status: Image is up to date for ${CONSTANTS.imageName}".toString()],
[status: "Status: Downloaded newer image for ${CONSTANTS.imageName}".toString()]
response.content.last().status in [
"Status: Image is up to date for ${CONSTANTS.imageName}".toString(),
"Status: Downloaded newer image for ${CONSTANTS.imageName}".toString(),
]
}

Expand Down Expand Up @@ -126,10 +126,19 @@ class OkDockerClientIntegrationSpec extends Specification {
query: [fromImage: CONSTANTS.imageName]])
// create container
def containerConfig = [
Tty : tty,
OpenStdin : openStdin,
Image : CONSTANTS.imageName,
Entrypoint: ["/cat"]
HostConfig : [
AutoRemove: true
],
AttachStdin : true,
AttachStdout: true,
AttachStderr: true,
Tty : tty,
OpenStdin : openStdin,
StdinOnce : true,
Image : CONSTANTS.imageName,
Cmd : LocalDocker.isNativeWindows()
? ["cmd", "/V:ON", "/C", "set /p line= & echo #!line!#"]
: ["/bin/sh", "-c", "read line && echo \"#\$line#\""]
]
String containerId = client.post([path : "/containers/create".toString(),
query : [name: ""],
Expand All @@ -139,12 +148,13 @@ class OkDockerClientIntegrationSpec extends Specification {
client.post([path : "/containers/${containerId}/start".toString(),
requestContentType: "application/json"])
// resize container TTY
// client.post([path : "/containers/${containerId}/attach/resize".toString(),
// query: [h: 46, w: 158]])
client.post([path : "/containers/${containerId}/attach/resize".toString(),
query: [h: 46, w: 158]])
// inspect container
// boolean multiplexStreams = !client.get([path: "/containers/${containerId}/json".toString()]).content.Config.Tty

String content = "attach ${UUID.randomUUID()}"
println "content (length ${content.length()}): $content"
String expectedOutput = containerConfig.Tty ? "$content\r\n$content\r\n" : "$content\n"

def stdout = new ByteArrayOutputStream(expectedOutput.length())
Expand All @@ -171,10 +181,10 @@ class OkDockerClientIntegrationSpec extends Specification {
}
attachConfig.onSourceConsumed = {
if (stdout.toString() == expectedOutput) {
log.info("[attach (interactive)] consumed (complete: ${stdout.toString() == expectedOutput})\n${stdout.toString()}")
log.info("[attach (interactive)] fully consumed (complete: ${stdout.toString() == expectedOutput})\n${stdout.toString()}")
onSourceConsumed.countDown()
} else {
log.info("[attach (interactive)] consumed (complete: ${stdout.toString() == expectedOutput})\n${stdout.toString()}")
log.info("[attach (interactive)] partially consumed (complete: ${stdout.toString() == expectedOutput})\n${stdout.toString()}")
}
}
// new OkDockerClient().post([
Expand All @@ -185,6 +195,7 @@ class OkDockerClientIntegrationSpec extends Specification {

when:
stdin.write("$content\n".bytes)
println "ttttt - written"
stdin.flush()
stdin.close()
boolean sinkWritten = onSinkWritten.await(5, SECONDS)
Expand Down
Loading