diff --git a/JobRunner/JobRunner.py b/JobRunner/JobRunner.py index c5dae57..61507b4 100644 --- a/JobRunner/JobRunner.py +++ b/JobRunner/JobRunner.py @@ -30,6 +30,10 @@ class JobRunner(object): on a container runtime. It handles starting the callback service to support subjobs and provenenace calls. """ + max_retries = 10 + poll_sleep = 0.2 + canceled_sleep = 5 + retry_sleep = 30 def __init__(self, config: Config, port=None): """ @@ -222,7 +226,7 @@ def _watch(self, config: dict) -> dict: if not self._check_job_status(): self.logger.error("Job canceled or unexpected error") self._cancel() - _sleep(5) + _sleep(self.canceled_sleep) return {"error": "Canceled or unexpected error"} def _init_callback_url(self, port=None): @@ -287,7 +291,8 @@ def _retry_finish(self, finish_job_params: dict, success: bool): if self.ee2: self.ee2.finish_job(finish_job_params) except Exception: - _sleep(30) + logging.warn(f"Waiting for completion: {self.retry_sleep}s") + _sleep(self.retry_sleep) if self.ee2: self.ee2.finish_job(finish_job_params) @@ -368,6 +373,17 @@ def run(self): ] self.cbs = Process(target=start_callback_server, args=cb_args) self.cbs.start() + ct = 0 + while ct < self.max_retries: + try: + resp = requests.get(self.callback_url) + break + except requests.exceptions.ConnectionError: + logging.info("Waiting for Callback") + ct += 1 + _sleep(self.poll_sleep) + else: + logging.warn("Callback server not responding") # Submit the main job self.logger.log(f"Job is about to run {job_params.get('app_id')}") @@ -454,3 +470,4 @@ def callback(self, job_params=None): self.cbs = Process(target=start_callback_server, args=cb_args) self.cbs.start() self._watch(config) + self.cbs.terminate() diff --git a/JobRunner/callback_server.py b/JobRunner/callback_server.py index 995b9d6..748db6e 100644 --- a/JobRunner/callback_server.py +++ b/JobRunner/callback_server.py @@ -2,18 +2,30 @@ import uuid import os from queue import Empty +import logging +from typing import Annotated, Union +from fastapi import FastAPI, Header, HTTPException +from pydantic import BaseModel +import uvicorn -from sanic import Sanic -from sanic.config import Config -from sanic.exceptions import SanicException -from sanic.log import logger -from sanic.response import json -Config.SANIC_REQUEST_TIMEOUT = 300 +app = FastAPI() -app = Sanic("jobrunner") outputs = dict() prov = [] +token = None +bypass_token = False +in_q = None +out_q = None + + +def config(conf): + global token + global out_q + global in_q + token = conf["token"] + out_q = conf["out_q"] + in_q = conf["in_q"] def start_callback_server(ip, port, out_queue, in_queue, token, bypass_token): @@ -29,27 +41,42 @@ def start_callback_server(ip, port, out_queue, in_queue, token, bypass_token): "KEEP_ALIVE_TIMEOUT": timeout, "REQUEST_MAX_SIZE": max_size_bytes, } - app.config.update(conf) + #app.config.update(conf) + config(conf) if os.environ.get("IN_CONTAINER"): ip = "0.0.0.0" - app.run(host=ip, port=port, debug=False, access_log=False) + #app.run(host=ip, port=port, debug=False, access_log=False) + uvconfig = uvicorn.Config("JobRunner.callback_server:app", host=ip, port=port, + access_log=False, log_level="warning") + server = uvicorn.Server(uvconfig) + server.run() + + + +class RPC(BaseModel): + method: str + + +@app.get("/") +async def root(): + return {"ready": True} -@app.route("/", methods=["GET", "POST"]) -async def root(request): - data = request.json - if request.method == "POST" and data is not None and "method" in data: - token = request.headers.get("Authorization") +@app.post("/") +async def root(data: dict, Authorization: Annotated[Union[str, None], Header()] = None): +# data = request.json + if data is not None and "method" in data: + token = Authorization response = await _process_rpc(data, token) status = 500 if "error" in response else 200 - return json(response, status=status) - return json([{}]) + # return json(response, status=status) + return response + return {} def _check_finished(info=None): global prov - logger.debug(info) - in_q = app.config["in_q"] + global in_q try: # Flush the queue while True: @@ -62,12 +89,13 @@ def _check_finished(info=None): pass -def _check_rpc_token(token): - if token != app.config.get("token"): - if app.config.get("bypass_token"): +def _check_rpc_token(req_token): + global token + if req_token != token: + if bypass_token: pass else: - raise SanicException(status_code=401) + raise HTTPException(status_code=401, detail="Bad token") def _handle_provenance(): @@ -76,16 +104,17 @@ def _handle_provenance(): def _handle_submit(module, method, data, token): + global out_q _check_rpc_token(token) job_id = str(uuid.uuid1()) data["method"] = "%s.%s" % (module, method[1:-7]) - app.config["out_q"].put(["submit", job_id, data]) + out_q.put(["submit", job_id, data]) return {"result": [job_id]} def _handle_checkjob(data): if "params" not in data: - raise SanicException(status_code=404) + raise HTTPException(status_code=404, detail="Missing params") job_id = data["params"][0] _check_finished(f"Checkjob for {job_id}") resp = {"finished": 0} @@ -97,7 +126,7 @@ def _handle_checkjob(data): if "error" in resp: return {"result": [resp], "error": resp["error"]} except Exception as e: - logger.debug(e) + logging.debug(e) return {"result": [resp]} @@ -122,7 +151,7 @@ async def _process_rpc(data, token): _check_rpc_token(token) job_id = str(uuid.uuid1()) data["method"] = "%s.%s" % (module, method) - app.config["out_q"].put(["submit", job_id, data]) + out_q.put(["submit", job_id, data]) try: while True: _check_finished(f'synk check for {data["method"]} for {job_id}') @@ -134,7 +163,7 @@ async def _process_rpc(data, token): except Exception as e: # Attempt to log error, but this is not very effective.. exception_message = f"Timeout or exception: {e} {type(e)}" - logger.error(exception_message) + logging.error(exception_message) error_obj = { "error": exception_message, "code": "123", diff --git a/test/test_callback_server.py b/test/test_callback_server.py index 083f1e8..d3fe268 100644 --- a/test/test_callback_server.py +++ b/test/test_callback_server.py @@ -1,5 +1,5 @@ -# Import the Sanic app, usually created with Sanic(__name__) -from JobRunner.callback_server import app +from JobRunner.callback_server import app, config +from fastapi.testclient import TestClient import json from queue import Queue from unittest.mock import patch @@ -7,68 +7,70 @@ _TOKEN = "bogus" +client = TestClient(app) + def _post(data): # Returns -> httpx.Response: header = {"Authorization": _TOKEN} sa = {"access_log": False} - return app.test_client.post("/", server_kwargs=sa, headers=header, data=data)[1] + return client.post("/", headers=header, json=data) -def test_index_returns_200(): - response = app.test_client.get("/")[1] +def xtest_cb_returns_200(): + response = client.get("/")[1] assert response.status == 200 -def test_index_post_empty(): - response = _post(None) - print(response.json) - assert response.json == [{}] +def test_cb_post_empty(): + response = _post({}) + assert response.json() == {} -def test_index_post(): +def test_cb_post(): out_q = Queue() in_q = Queue() conf = {"token": _TOKEN, "out_q": out_q, "in_q": in_q} - app.config.update(conf) - data = json.dumps({"method": "bogus._test_submit"}) + config(conf) + data = {"method": "bogus._test_submit"} response = _post(data) - assert "result" in response.json - job_id = response.json["result"][0] + assert "result" in response.json() + job_id = response.json()["result"][0] mess = out_q.get() assert "submit" in mess - data = json.dumps({"method": "bogus._check_job", "params": [job_id]}) + data = {"method": "bogus._check_job", "params": [job_id]} response = _post(data) - pprint(response) + pprint(response.json()) - assert "result" in response.json - assert response.json["result"][0]["finished"] is 0 - data = json.dumps({"method": "bogus.get_provenance", "params": [job_id]}) + assert "result" in response.json() + assert response.json()["result"][0]["finished"]==0 + data = {"method": "bogus.get_provenance", "params": [job_id]} response = _post(data) - assert "result" in response.json - assert response.json["result"][0] in [None,[]] + assert "result" in response.json() + assert response.json()["result"][0] in [None,[]] in_q.put(["prov", job_id, "bogus"]) response = _post(data) - assert "result" in response.json - assert response.json["result"][0] == "bogus" + assert "result" in response.json() + assert response.json()["result"][0] == "bogus" in_q.put(["output", job_id, {"foo": "bar"}]) - data = json.dumps({"method": "bogus._check_job", "params": [job_id]}) + data = {"method": "bogus._check_job", "params": [job_id]} response = _post(data) - assert "result" in response.json - assert response.json["result"][0]["finished"] is 1 - assert "foo" in response.json["result"][0] + assert "result" in response.json() + assert response.json()["result"][0]["finished"]==1 + assert "foo" in response.json()["result"][0] @patch("JobRunner.callback_server.uuid", autospec=True) -def test_index_submit_sync(mock_uuid): +def test_cb_submit_sync(mock_uuid): out_q = Queue() in_q = Queue() conf = {"token": _TOKEN, "out_q": out_q, "in_q": in_q} - app.config.update(conf) + #app.config.update(conf) + config(conf) mock_uuid.uuid1.return_value = "bogus" - data = json.dumps({"method": "bogus.test"}) + data = {"method": "bogus.test"} in_q.put(["output", "bogus", {"foo": "bar"}]) response = _post(data) - assert "finished" in response.json - assert "foo" in response.json + assert "finished" in response.json() + assert "foo" in response.json() diff --git a/test/test_dockerrunner.py b/test/test_dockerrunner.py index c0c5e92..c5de5dc 100644 --- a/test/test_dockerrunner.py +++ b/test/test_dockerrunner.py @@ -56,8 +56,6 @@ def test_run(self): f.write(json.dumps(inp)) vols = {workdir: {"bind": "/kb/module/work", "mode": "rw"}} of = f"{workdir}/output.json" - # vols = {"/tmp": {"bind": "/kb/module/work", "mode": "rw"}} - # of = "/tmp/output.json" if os.path.exists(of): os.remove(of) c = dr.run("1234", "mock_app:latest", {}, vols, {}, []) diff --git a/test/test_jobrunner.py b/test/test_jobrunner.py index 91244d2..419a2eb 100644 --- a/test/test_jobrunner.py +++ b/test/test_jobrunner.py @@ -62,12 +62,6 @@ def setUpClass(cls): cls.config = Config(job_id=cls.jobid, workdir=cls.workdir) cls.future = _time() + 3600 cls.auth2_url = f"{base}auth/api/V2/token" - # cls.config = { - # "catalog-service-url": base + "catalog", - # "auth-service-url": base + "auth/api/legacy/KBase/Sessions/Login", - # "auth2-url": base + "auth/api/V2/token", - # "workdir": cls.workdir - # } if not os.path.exists(cls.workdir): os.mkdir(cls.workdir) if "KB_ADMIN_AUTH_TOKEN" not in os.environ: diff --git a/test/test_methodrunner.py b/test/test_methodrunner.py index 9a4053b..078d229 100644 --- a/test/test_methodrunner.py +++ b/test/test_methodrunner.py @@ -56,12 +56,6 @@ def setUpClass(cls): os.makedirs("/tmp/mr/", exist_ok=True) cls.job_id = "1234" cls.cfg = Config(workdir=cls.workdir, job_id=cls.job_id) - # cls.cfg = { - # "catalog-service-url": "http://localhost", - # "token": cls.token, - # "admin_token": os.environ.get("KB_ADMIN_AUTH_TOKEN"), - # "workdir": cls.workdir - # } cls.logger = MockLogger() cls.mr = MethodRunner(cls.cfg, logger=cls.logger) base = "https://ci.kbase.us/services/"