diff --git a/src/main/java/us/kbase/sdk/callback/CallbackServerManager.java b/src/main/java/us/kbase/sdk/callback/CallbackServerManager.java new file mode 100644 index 0000000..fb5a926 --- /dev/null +++ b/src/main/java/us/kbase/sdk/callback/CallbackServerManager.java @@ -0,0 +1,284 @@ +package us.kbase.sdk.callback; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URL; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Scanner; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import us.kbase.auth.AuthToken; +import us.kbase.common.executionengine.ModuleMethod; +// TODO NOW once this works on mac / linux / GHA, create a simplified version for this module +import us.kbase.common.executionengine.ModuleRunVersion; +import us.kbase.common.utils.NetUtils; + +// TODO NOW JAVADOC - add notes about expects to be running outside a container +// TODO NOW TEST +// TODO NOW this won't work inside a container, need to make a call; container vs. native code +// or go to the trouble to make it work in both somehow...? +// TODO NOW python callback server leaves root owned files around after tests +// TODO NOW convert module runner +// TODO NOW delete old callback server code and related code + +/** + * A manager for starting and stopping the Callback server docker image. + * See https://github.com/kbase/JobRunner?tab=readme-ov-file#container-launch + */ +public class CallbackServerManager implements AutoCloseable { + + /* The python dockerized callback server does actually have the ability to perform arbitrary + * mounts on containers it runs; it's just not exposed in the container API. So, theoretically + * we could restore that feature for kb-sdk run. However: + * + * * we shouldn't do it until there's a very clear use case showing it's necessary + * * Why would you need more than the workdir? Just put everything there + * * we should think hard about allowing arbitrary mounts into containers from a security + * perspective, since the CBS pulls docker containers that the user may not expect + */ + + // may want to make this configurable? + public static final String CALLBACK_IMAGE = "ghcr.io/kbase/jobrunner:pr-97"; + + private static final String BUSYBOX_IMAGE = "busybox:1.37.0"; + + private static final Pattern IP_ROUTE_PATTERN = Pattern.compile("^default.*?src\\s+(\\S+)"); + + private final String containerName; + private final URL callbackUrl; + private final int port; + private final Process proc; + private final Path initProvFile; + + public CallbackServerManager( + Path workDir, + URL kbaseBaseURL, + final AuthToken token, + // TODO NOW make a provenance class w/ builder for these + final ModuleRunVersion mrv, + final List params, // TODO NOW note that these must be jsonable + final List inputWorkspaceRefs + ) throws IOException { + requireNonNull(token, "token"); + try { + kbaseBaseURL = requireNonNull(kbaseBaseURL, "kbaseBaseURL").toString().endsWith("/") ? + kbaseBaseURL : new URL(kbaseBaseURL.toString() + "/"); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + workDir = requireNonNull(workDir, "workDir").toAbsolutePath(); + this.containerName = mrv.getModuleMethod().getModuleDotMethod().replace(".", "_") + + "_test_catllback_server_" + UUID.randomUUID().toString(); + final String host = getHost(); + initProvFile = writeProvFile(workDir, mrv, params, inputWorkspaceRefs); + // may want to manually specify? Probably not + port = NetUtils.findFreePort(); + proc = startCBS( host, token, kbaseBaseURL, workDir); + waitForCBS(Duration.ofSeconds(120), Duration.ofSeconds(2)); + callbackUrl = new URL(String.format("http://%s:%s", host, port)); + } + + public URL getCallbackUrl() { + return callbackUrl; + } + + public int getCallbackPort() { + return port; + } + + private Path writeProvFile( + final Path workDir, + final ModuleRunVersion mrv, + final List params, + final List inputWorkspaceRefs + ) throws IOException { + // see https://github.com/kbase/JobRunner/blob/main/JobRunner/provenance.py#L4-L21 + final Map initProv = new HashMap<>(); + initProv.put("method", requireNonNull(mrv, "mrv").getModuleMethod().getModuleDotMethod()); + initProv.put("service_ver", mrv.getRelease()); + initProv.put("params", requireNonNull(params, "params")); + initProv.put("source_ws_objects", + requireNonNull(inputWorkspaceRefs, "inputWorkspaceRefs")); + final Path initProvFile = Files.createTempFile( + workDir, "callback_initial_provenance", ".json"); + Files.write(initProvFile, new ObjectMapper().writeValueAsBytes(initProv)); + return initProvFile; + } + + private Process startCBS( + final String host, + final AuthToken token, + final URL kbaseBaseURL, + final Path workDir + ) throws IOException { + final List command = new LinkedList<>(); + command.addAll(Arrays.asList( + "docker", "run", + "--name", containerName, + "--rm", // make configuratble? + // TODO SECURITY when CBS allows, use a file instead + // https://github.com/kbase/JobRunner/issues/90 + "-e", String.format("KB_AUTH_TOKEN=%s", token.getToken()), + "-e", String.format("KB_BASE_URL=%s", kbaseBaseURL), + "-e", String.format("PROV_FILE=%s", initProvFile), + "-e", String.format("JOB_DIR=%s", workDir), + "-e", String.format("CALLBACK_IP=%s", host), + "-e", String.format("CALLBACK_PORT=%s", port), + // Apparently this is consistent across platforms. Let's find out + // May need to make the socket file configurable + // Put it in sdk.cfg? + "-v", "/var/run/docker.sock:/run/docker.sock", + "-v", String.format("%s:%s", workDir, workDir), + "-p", String.format("%s:%s", port, port), + CALLBACK_IMAGE + )); + final ProcessBuilder pb = new ProcessBuilder(command); + // Let the user see the docker commands for debugging. + // Could make this configurable if it's annoying + // Another case where we may want to pass in IO streams and pipe + pb.inheritIO(); + return pb.start(); + } + + private void waitForCBS(final Duration timeout, final Duration interval) throws IOException { + System.out.println("Waiting for Callback Server to start"); + final HttpClient httpClient = HttpClient.newHttpClient(); + final URI uri = URI.create("http://localhost:" + port + "/"); + final HttpRequest request = HttpRequest.newBuilder() + .uri(uri) + .GET() + .timeout(interval) + .build(); + + final long deadline = System.currentTimeMillis() + timeout.toMillis(); + + while (System.currentTimeMillis() < deadline) { + try { + final HttpResponse response = httpClient.send( + request, HttpResponse.BodyHandlers.ofString() + ); + if (response.statusCode() == 200 && "[{}]".equals(response.body().trim())) { + System.out.println("Callback Server is up."); + return; + } + } catch (IOException | InterruptedException e) { + // Server not ready yet; ignore and retry + } + try { + Thread.sleep(interval.toMillis()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + throw new IOException("Callback Server did not start within the timeout period."); + } + + @Override + public void close() throws IOException { + try { + // proc.destroyForcibly() doesn't seem to work, container keeps running + runQuickCommand(15L, "docker", "stop", containerName); + proc.destroyForcibly(); // why not + } finally { + Files.delete(initProvFile); + } + } + + private String getHost() throws IOException { + // runs a container to see what the container thinks the host address might be. + // should work on linux (on bare machine) or mac (in docker VM) + final Process p = runQuickCommand( + 15L, + "docker", "run", + "--rm", + "--network", "host", + BUSYBOX_IMAGE, + "ip", "route" + ); + final String out = new String( + p.getInputStream().readAllBytes(), StandardCharsets.UTF_8 + ); + final String[] lines = out.split("\n"); + for (final String line: lines) { + final Matcher matcher = IP_ROUTE_PATTERN.matcher(line); + if (matcher.find()) { + return matcher.group(1); + } + } + throw new IOException( + "Unexpected output from busybox ip route command when attempting to determine " + + "docker host:\n" + out + ); + } + + private Process runQuickCommand( + final long timeout, + final String... command + ) throws IOException { + final Process proc = new ProcessBuilder(command).start(); + // these error condition are essentially impossible to test under normal conditions + final boolean finished; + try { + finished = proc.waitFor(timeout, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if (!finished) { + proc.destroyForcibly(); + throw new IOException("Docker process unexpected timeout"); + } + if (proc.exitValue() != 0) { + throw new IOException(String.format( + "Unexpected exit code from docker: %s. Error stream:%s", + proc.exitValue(), + new String(proc.getErrorStream().readAllBytes(), StandardCharsets.UTF_8) + )); + } + return proc; + } + + public static void main(String[] args) throws Exception { + final CallbackServerManager csm = new CallbackServerManager( + Paths.get("."), + new URL("https://ci.kbase.us/services"), + new AuthToken(args[0], "user"), + new ModuleRunVersion( + new URL("https://github.com/kbaseapps/DatafileUtil.git"), + new ModuleMethod("foo.bar"), + "githashhere", + "0.1.0", + "dev" + ), + Arrays.asList("foo", 1), + Arrays.asList("3/4/5") + ); + try (csm) { + System.out.println(csm.getCallbackUrl()); + + System.out.println("Press Enter to exit..."); + try (final Scanner scanner = new Scanner(System.in)) { + scanner.nextLine(); // Waits for Enter + } + } + } +} diff --git a/src/main/java/us/kbase/sdk/tester/ModuleTester.java b/src/main/java/us/kbase/sdk/tester/ModuleTester.java index 1c1f516..1e1eace 100644 --- a/src/main/java/us/kbase/sdk/tester/ModuleTester.java +++ b/src/main/java/us/kbase/sdk/tester/ModuleTester.java @@ -12,33 +12,20 @@ import java.io.StringWriter; import java.net.URL; import java.nio.file.Files; -import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.commons.io.FileUtils; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; -import org.productivity.java.syslog4j.SyslogIF; -import us.kbase.common.executionengine.CallbackServer; -import us.kbase.common.executionengine.LineLogger; import us.kbase.common.executionengine.ModuleMethod; import us.kbase.common.executionengine.ModuleRunVersion; -import us.kbase.common.executionengine.CallbackServerConfigBuilder.CallbackServerConfig; -import us.kbase.common.service.JsonServerServlet; -import us.kbase.common.service.JsonServerSyslog; -import us.kbase.common.service.JsonServerSyslog.SyslogOutput; -import us.kbase.common.service.UObject; -import us.kbase.common.utils.NetUtils; +import us.kbase.sdk.callback.CallbackServerManager; import us.kbase.sdk.common.KBaseYmlConfig; import us.kbase.sdk.common.TestLocalManager; import us.kbase.sdk.common.TestLocalManager.TestLocalInfo; @@ -157,77 +144,30 @@ public int runTests(final boolean skipValidation) throws Exception { if (!buildNewDockerImageWithCleanup(moduleDir, tlDir, runDockerSh, imageName)) return 1; /////////////////////////////////////////////////////////////////////////////////////////// - int callbackPort = NetUtils.findFreePort(); - String[] callbackNetworks = null; - String callbackNetworksText = props.getProperty("callback_networks"); - if (callbackNetworksText != null) { - callbackNetworks = callbackNetworksText.trim().split("\\s*,\\s*"); - System.out.println("Custom network instarface list is defined: " + - Arrays.asList(callbackNetworks)); - } - URL callbackUrl = CallbackServer.getCallbackUrl(callbackPort, callbackNetworks); - Server jettyServer = null; - if (callbackUrl != null) { - JsonServerSyslog.setStaticUseSyslog(false); - CallbackServerConfig cfg = cfgLoader.buildCallbackServerConfig(callbackUrl, - tlDir.toPath(), new LineLogger() { - @Override - public void logNextLine(String line, boolean isError) { - if (isError) { - System.err.println(line); - } else { - System.out.println(line); - } - } - }); - ModuleRunVersion runver = new ModuleRunVersion( - new URL("https://localhost"), - new ModuleMethod(moduleName + ".run_local_tests"), - "local-docker-image", "local", "dev"); - final DockerMountPoints mounts = new DockerMountPoints( - Paths.get("/kb/module/work"), Paths.get("tmp")); - Map localModuleToImage = new LinkedHashMap<>(); - localModuleToImage.put(moduleName, imageName); - JsonServerServlet catalogSrv = new SDKCallbackServer( - cfgLoader.getToken(), cfg, runver, new ArrayList(), - new ArrayList(), mounts, localModuleToImage); - catalogSrv.changeSyslogOutput(new SyslogOutput() { - @Override - public void logToSystem( - final SyslogIF log, - final int level, - final String message) { - System.out.println(message); - } - }); - jettyServer = new Server(callbackPort); - ServletContextHandler context = new ServletContextHandler( - ServletContextHandler.SESSIONS); - context.setContextPath("/"); - jettyServer.setHandler(context); - context.addServlet(new ServletHolder(catalogSrv),"/*"); - jettyServer.start(); - } else { - if (callbackNetworks != null && callbackNetworks.length > 0) { - throw new IllegalStateException("No proper callback IP was found, " + - "please check callback_networks parameter in test.cfg"); - } - System.out.println("WARNING: No callback URL was received " + - "by the job runner. Local callbacks are disabled."); - } - /////////////////////////////////////////////////////////////////////////////////////////// - try { - System.out.println(); + final CallbackServerManager csm = new CallbackServerManager( + tlDir.toPath(), + new URL(cfgLoader.getEndPoint()), + cfgLoader.getToken(), + new ModuleRunVersion( + new URL("https://localhost"), + new ModuleMethod(moduleName + ".run_local_tests"), + "local-docker-image", + "local", + "dev" + ), + Arrays.asList(), + Arrays.asList() + ); + System.out.println(String.format("Local callback port: %s", csm.getCallbackPort())); + System.out.println(String.format("In container callback url: %s", csm.getCallbackUrl())); + try (csm) { ProcessHelper.cmd("chmod", "+x", runTestsSh.getCanonicalPath()).exec(tlDir); - int exitCode = ProcessHelper.cmd("bash", DirUtils.getFilePath(runTestsSh), - callbackUrl == null ? "http://fakecallbackurl" : - callbackUrl.toExternalForm()).exec(tlDir).getExitCode(); + int exitCode = ProcessHelper.cmd( + "bash", + DirUtils.getFilePath(runTestsSh), + csm.getCallbackUrl().toExternalForm() + ).exec(tlDir).getExitCode(); return exitCode; - } finally { - if (jettyServer != null) { - System.out.println("Shutting down callback server..."); - jettyServer.stop(); - } } } diff --git a/src/test/java/us/kbase/test/sdk/tester/ModuleTesterTest.java b/src/test/java/us/kbase/test/sdk/tester/ModuleTesterTest.java index 8f56498..da61f12 100644 --- a/src/test/java/us/kbase/test/sdk/tester/ModuleTesterTest.java +++ b/src/test/java/us/kbase/test/sdk/tester/ModuleTesterTest.java @@ -168,62 +168,4 @@ public void testJavaModuleError() throws Exception { assertEquals(2, exitCode); } - @Test - public void testSelfCalls() throws Exception { - System.out.println("Test [testSelfCalls]"); - String lang = "python"; - String moduleName = SIMPLE_MODULE_NAME + "Self"; - final Path workDir = Paths.get(TestConfigHelper.getTempTestDir(), moduleName); - TestUtils.deleteTestModule(workDir, true, true); - CREATED_MODULES.add(workDir); - String implInit = "" + - "#BEGIN_HEADER\n" + - "import os\n"+ - "from " + moduleName + "." + moduleName + "Client import " + moduleName + " as local_client\n" + - "#END_HEADER\n" + - "\n" + - " #BEGIN_CLASS_HEADER\n" + - // TODO TESTHACK PYTEST upgrade to pytest and remove this line assuming that works - " __test__ = False\n" + // nose is identifying the class as a test case - " #END_CLASS_HEADER\n" + - "\n" + - " #BEGIN_CONSTRUCTOR\n" + - " #END_CONSTRUCTOR\n" + - "\n" + - " #BEGIN run_local\n" + - " returnVal = local_client(os.environ['SDK_CALLBACK_URL']).calc_square(input)\n" + - " #END run_local\n" + - "\n" + - " #BEGIN calc_square\n" + - " returnVal = input * input\n" + - " #END calc_square\n"; - File moduleDir = workDir.toFile(); - File implFile = new File(moduleDir, "lib/" + moduleName + "/" + moduleName + "Impl.py"); - TestUtils.createSdkCfgFile(Paths.get(TestConfigHelper.getTempTestDir()), moduleName); - new ModuleInitializer( - moduleName, - token.getUserName(), - lang, - false, - new File(TestConfigHelper.getTempTestDir()), - true - ).initialize(false); - File specFile = new File(moduleDir, moduleName + ".spec"); - String specText = FileUtils.readFileToString(specFile).replace("};", - "funcdef run_local(int input) returns (int) authentication required;\n" + - "funcdef calc_square(int input) returns (int) authentication required;\n" + - "};"); - File testFile = new File(moduleDir, "test/" + moduleName + "_server_test.py"); - final String testCode = FileUtils.readFileToString(testFile); - final int index = testCode.indexOf(" def test_your_method(self)"); - final String newCode = testCode.substring(0, index) - + " def test_your_method(self):\n" - + " self.assertEqual(25, self.serviceImpl.run_local(self.ctx, 5)[0])\n"; - assertThat(testCode, is(not(newCode))); - FileUtils.writeStringToFile(specFile, specText); - FileUtils.writeStringToFile(implFile, implInit); - FileUtils.writeStringToFile(testFile, newCode); - int exitCode = runTestsInDocker(moduleDir, token, true); - assertEquals(0, exitCode); - } } \ No newline at end of file