From b33eb6e8d3d467e0648ed0750b95fd413e41a12e Mon Sep 17 00:00:00 2001 From: Shane Canon Date: Sat, 8 Jul 2023 13:26:25 -0700 Subject: [PATCH 1/2] Switch to FastAPI from Sanic This also fixes up some of the tests so they work with Podman. --- JobRunner/callback_server.py | 87 +++++++++++++++++++++++++----------- test/test_callback_server.py | 66 ++++++++++++++------------- test/test_dockerrunner.py | 7 +-- test/test_jobrunner.py | 13 +++--- test/test_methodrunner.py | 43 ++++++------------ 5 files changed, 121 insertions(+), 95 deletions(-) diff --git a/JobRunner/callback_server.py b/JobRunner/callback_server.py index 7d880b9..11a50b8 100644 --- a/JobRunner/callback_server.py +++ b/JobRunner/callback_server.py @@ -2,18 +2,43 @@ import uuid import os from queue import Empty +import logging +from typing import Annotated, Union +from fastapi import FastAPI, Header +from pydantic import BaseModel +import uvicorn -from sanic import Sanic -from sanic.config import Config -from sanic.exceptions import abort -from sanic.log import logger -from sanic.response import json -Config.SANIC_REQUEST_TIMEOUT = 300 +app = FastAPI() + + +#from sanic import Sanic +#from sanic.exceptions import abort +#from sanic.log import logger +#from sanic.response import json + + +#Config.SANIC_REQUEST_TIMEOUT = 300 -app = Sanic() outputs = dict() prov = [] +token = None +bypass_token = False +in_q = None +out_q = None + + +def abort(): + print("TODO") + + +def config(conf): + global token + global out_q + global in_q + token = conf["token"] + out_q = conf["out_q"] + in_q = conf["in_q"] def start_callback_server(ip, port, out_queue, in_queue, token, bypass_token): @@ -29,27 +54,37 @@ def start_callback_server(ip, port, out_queue, in_queue, token, bypass_token): "KEEP_ALIVE_TIMEOUT": timeout, "REQUEST_MAX_SIZE": max_size_bytes, } - app.config.update(conf) + #app.config.update(conf) + config(conf) if os.environ.get("IN_CONTAINER"): ip = "0.0.0.0" - app.run(host=ip, port=port, debug=False, access_log=False) + #app.run(host=ip, port=port, debug=False, access_log=False) + uvconfig = uvicorn.Config("JobRunner.callback_server:app", host=ip, port=port, log_level="info") + server = uvicorn.Server(uvconfig) + server.run() + + + +class RPC(BaseModel): + method: str -@app.route("/", methods=["GET", "POST"]) -async def root(request): - data = request.json - if request.method == "POST" and data is not None and "method" in data: - token = request.headers.get("Authorization") +@app.post("/") +async def root(data: dict, Authorization: Annotated[Union[str, None], Header()] = None): +# data = request.json + if data is not None and "method" in data: + token = Authorization response = await _process_rpc(data, token) status = 500 if "error" in response else 200 - return json(response, status=status) - return json([{}]) + # return json(response, status=status) + return response + return {} def _check_finished(info=None): global prov - logger.debug(info) - in_q = app.config["in_q"] + global in_q + logging.debug(info) try: # Flush the queue while True: @@ -62,9 +97,10 @@ def _check_finished(info=None): pass -def _check_rpc_token(token): - if token != app.config.get("token"): - if app.config.get("bypass_token"): +def _check_rpc_token(req_token): + global token + if req_token != token: + if bypass_token: pass else: abort(401) @@ -76,10 +112,11 @@ def _handle_provenance(): def _handle_submit(module, method, data, token): + global out_q _check_rpc_token(token) job_id = str(uuid.uuid1()) data["method"] = "%s.%s" % (module, method[1:-7]) - app.config["out_q"].put(["submit", job_id, data]) + out_q.put(["submit", job_id, data]) return {"result": [job_id]} @@ -97,7 +134,7 @@ def _handle_checkjob(data): if "error" in resp: return {"result": [resp], "error": resp["error"]} except Exception as e: - logger.debug(e) + logging.debug(e) return {"result": [resp]} @@ -122,7 +159,7 @@ async def _process_rpc(data, token): _check_rpc_token(token) job_id = str(uuid.uuid1()) data["method"] = "%s.%s" % (module, method) - app.config["out_q"].put(["submit", job_id, data]) + out_q.put(["submit", job_id, data]) try: while True: _check_finished(f'synk check for {data["method"]} for {job_id}') @@ -134,7 +171,7 @@ async def _process_rpc(data, token): except Exception as e: # Attempt to log error, but this is not very effective.. exception_message = f"Timeout or exception: {e} {type(e)}" - logger.error(exception_message) + logging.error(exception_message) error_obj = { "error": exception_message, "code": "123", diff --git a/test/test_callback_server.py b/test/test_callback_server.py index 083f1e8..d3fe268 100644 --- a/test/test_callback_server.py +++ b/test/test_callback_server.py @@ -1,5 +1,5 @@ -# Import the Sanic app, usually created with Sanic(__name__) -from JobRunner.callback_server import app +from JobRunner.callback_server import app, config +from fastapi.testclient import TestClient import json from queue import Queue from unittest.mock import patch @@ -7,68 +7,70 @@ _TOKEN = "bogus" +client = TestClient(app) + def _post(data): # Returns -> httpx.Response: header = {"Authorization": _TOKEN} sa = {"access_log": False} - return app.test_client.post("/", server_kwargs=sa, headers=header, data=data)[1] + return client.post("/", headers=header, json=data) -def test_index_returns_200(): - response = app.test_client.get("/")[1] +def xtest_cb_returns_200(): + response = client.get("/")[1] assert response.status == 200 -def test_index_post_empty(): - response = _post(None) - print(response.json) - assert response.json == [{}] +def test_cb_post_empty(): + response = _post({}) + assert response.json() == {} -def test_index_post(): +def test_cb_post(): out_q = Queue() in_q = Queue() conf = {"token": _TOKEN, "out_q": out_q, "in_q": in_q} - app.config.update(conf) - data = json.dumps({"method": "bogus._test_submit"}) + config(conf) + data = {"method": "bogus._test_submit"} response = _post(data) - assert "result" in response.json - job_id = response.json["result"][0] + assert "result" in response.json() + job_id = response.json()["result"][0] mess = out_q.get() assert "submit" in mess - data = json.dumps({"method": "bogus._check_job", "params": [job_id]}) + data = {"method": "bogus._check_job", "params": [job_id]} response = _post(data) - pprint(response) + pprint(response.json()) - assert "result" in response.json - assert response.json["result"][0]["finished"] is 0 - data = json.dumps({"method": "bogus.get_provenance", "params": [job_id]}) + assert "result" in response.json() + assert response.json()["result"][0]["finished"]==0 + data = {"method": "bogus.get_provenance", "params": [job_id]} response = _post(data) - assert "result" in response.json - assert response.json["result"][0] in [None,[]] + assert "result" in response.json() + assert response.json()["result"][0] in [None,[]] in_q.put(["prov", job_id, "bogus"]) response = _post(data) - assert "result" in response.json - assert response.json["result"][0] == "bogus" + assert "result" in response.json() + assert response.json()["result"][0] == "bogus" in_q.put(["output", job_id, {"foo": "bar"}]) - data = json.dumps({"method": "bogus._check_job", "params": [job_id]}) + data = {"method": "bogus._check_job", "params": [job_id]} response = _post(data) - assert "result" in response.json - assert response.json["result"][0]["finished"] is 1 - assert "foo" in response.json["result"][0] + assert "result" in response.json() + assert response.json()["result"][0]["finished"]==1 + assert "foo" in response.json()["result"][0] @patch("JobRunner.callback_server.uuid", autospec=True) -def test_index_submit_sync(mock_uuid): +def test_cb_submit_sync(mock_uuid): out_q = Queue() in_q = Queue() conf = {"token": _TOKEN, "out_q": out_q, "in_q": in_q} - app.config.update(conf) + #app.config.update(conf) + config(conf) mock_uuid.uuid1.return_value = "bogus" - data = json.dumps({"method": "bogus.test"}) + data = {"method": "bogus.test"} in_q.put(["output", "bogus", {"foo": "bar"}]) response = _post(data) - assert "finished" in response.json - assert "foo" in response.json + assert "finished" in response.json() + assert "foo" in response.json() diff --git a/test/test_dockerrunner.py b/test/test_dockerrunner.py index 971e0e9..c5de5dc 100644 --- a/test/test_dockerrunner.py +++ b/test/test_dockerrunner.py @@ -51,10 +51,11 @@ def test_run(self): mlog = MockLogger() dr = DockerRunner(logger=mlog) inp = {"method": "mock_app.bogus"} - with open("/tmp/input.json", "w") as f: + workdir = os.environ.get("JOB_DIR", "/tmp") + with open(f"{workdir}/input.json", "w") as f: f.write(json.dumps(inp)) - vols = {"/tmp": {"bind": "/kb/module/work", "mode": "rw"}} - of = "/tmp/output.json" + vols = {workdir: {"bind": "/kb/module/work", "mode": "rw"}} + of = f"{workdir}/output.json" if os.path.exists(of): os.remove(of) c = dr.run("1234", "mock_app:latest", {}, vols, {}, []) diff --git a/test/test_jobrunner.py b/test/test_jobrunner.py index 594940f..c750348 100644 --- a/test/test_jobrunner.py +++ b/test/test_jobrunner.py @@ -58,17 +58,17 @@ def setUpClass(cls): base = "http://%s/services/" % (os.environ["TEST_URL"]) cls.ee2_url = base + "ee2" cls.jobid = "1234" - cls.workdir = "/tmp/jr/" + cls.workdir = os.environ.get("JOB_DIR", "/tmp/jr/") cls.cfg["token"] = cls.token cls.future = _time() + 3600 cls.config = { "catalog-service-url": base + "catalog", "auth-service-url": base + "auth/api/legacy/KBase/Sessions/Login", "auth2-url": base + "auth/api/V2/token", - "workdir": "/tmp/jr", + "workdir": cls.workdir } - if not os.path.exists("/tmp/jr"): - os.mkdir("/tmp/jr") + if not os.path.exists(cls.workdir): + os.mkdir(cls.workdir) if "KB_ADMIN_AUTH_TOKEN" not in os.environ: os.environ["KB_ADMIN_AUTH_TOKEN"] = "bogus" @@ -169,7 +169,7 @@ def test_run_volume(self, mock_ee2, mock_auth): jr.cc.catalog.get_secure_config_params = MagicMock(return_value=None) jr.logger.ee2.add_job_logs = MagicMock(return_value=rv) jr.ee2.get_job_params.return_value = params - jr.ee2.list_config.return_value = EE2_LIST_CONFIG + jr.ee2.list_config.return_value = deepcopy(EE2_LIST_CONFIG) jr.ee2.check_job_canceled.return_value = {"finished": False} jr.auth.get_user.return_value = "bogus" jr._get_token_lifetime = MagicMock(return_value=self.future) @@ -199,7 +199,7 @@ def test_cancel(self, mock_ee2, mock_auth): ) rv = deepcopy(CATALOG_GET_MODULE_VERSION) rv["docker_img_name"] = "test/runtester:latest" - jr._get_cgroup = MagicMock(return_value='cgroup') + jr._get_cgroup = MagicMock(return_value=None) jr.cc.catalog.get_module_version = MagicMock(return_value=rv) jr.cc.catalog.list_volume_mounts = MagicMock(return_value=[]) jr.cc.catalog.get_secure_config_params = MagicMock(return_value=None) @@ -261,6 +261,7 @@ def test_run_online(self, mock_ee2): jr.logger.ee2.add_job_logs = MagicMock(return_value=rv) jr.ee2.check_job_canceled.return_value = {"finished": False} jr.ee2.get_job_params.return_value = params + jr.ee2.list_config.return_value = EE2_LIST_CONFIG jr._get_cgroup = MagicMock(return_value=None) jr._get_token_lifetime = MagicMock(return_value=self.future) out = jr.run() diff --git a/test/test_methodrunner.py b/test/test_methodrunner.py index 1b30575..c5f3eae 100644 --- a/test/test_methodrunner.py +++ b/test/test_methodrunner.py @@ -52,11 +52,16 @@ def setUpClass(cls): cls.token = os.environ.get("KB_AUTH_TOKEN", "") # WARNING: don't call any logging metholsds on the context object, # it'll result in a NoneType error + cls.workdir = os.environ.get("JOB_DIR", "/tmp") + try: + os.makedirs("/tmp/mr/") + except: + pass cls.cfg = { "catalog-service-url": "http://localhost", "token": cls.token, "admin_token": os.environ.get("KB_ADMIN_AUTH_TOKEN"), - "workdir": "/tmp/mr", + "workdir": cls.workdir } cls.logger = MockLogger() cls.mr = MethodRunner(cls.cfg, "1234", logger=cls.logger) @@ -80,10 +85,6 @@ def test_run(self): mr = MethodRunner(self.cfg, "1234", logger=MockLogger()) module_info = deepcopy(CATALOG_GET_MODULE_VERSION) module_info["docker_img_name"] = "mock_app:latest" - try: - os.makedirs("/tmp/mr/") - except: - pass q = Queue() action = mr.run(self.conf, module_info, EE2_JOB_PARAMS, "1234", fin_q=q) self.assertIn("name", action) @@ -95,12 +96,8 @@ def test_no_output(self): mr = MethodRunner(self.cfg, "1234", logger=MockLogger()) module_info = deepcopy(CATALOG_GET_MODULE_VERSION) module_info["docker_img_name"] = "mock_app:latest" - try: - os.makedirs("/tmp/mr/") - except: - pass - if os.path.exists("/tmp/mr/workdir/output.json"): - os.remove("/tmp/mr/workdir/output.json") + if os.path.exists(f"{self.workdir}/workdir/output.json"): + os.remove(f"{self.workdir}/workdir/output.json") q = Queue() params = deepcopy(EE2_JOB_PARAMS) params["method"] = "echo_test.noout" @@ -117,12 +114,8 @@ def test_too_much_output(self): mr = MethodRunner(self.cfg, "1234", logger=MockLogger()) module_info = deepcopy(CATALOG_GET_MODULE_VERSION) module_info["docker_img_name"] = "mock_app:latest" - try: - os.makedirs("/tmp/mr/") - except: - pass - if os.path.exists("/tmp/mr/workdir/output.json"): - os.remove("/tmp/mr/workdir/output.json") + if os.path.exists(f"{self.workdir}/workdir/output.json"): + os.remove(f"{self.workdir}/workdir/output.json") q = Queue() params = deepcopy(EE2_JOB_PARAMS) params["method"] = "echo_test.bogus" @@ -141,12 +134,8 @@ def test_secure_params(self): module_info = deepcopy(CATALOG_GET_MODULE_VERSION) module_info["docker_img_name"] = "mock_app:latest" module_info["secure_config_params"] = CATALOG_GET_SECURE_CONFIG_PARAMS - try: - os.makedirs("/tmp/mr/") - except: - pass - if os.path.exists("/tmp/mr/workdir/output.json"): - os.remove("/tmp/mr/workdir/output.json") + if os.path.exists(f"{self.workdir}/workdir/output.json"): + os.remove(f"{self.workdir}/workdir/output.json") q = Queue() params = deepcopy(EE2_JOB_PARAMS) params["method"] = "echo_test.bogus" @@ -159,12 +148,8 @@ def test_bad_method(self): mr = MethodRunner(self.cfg, "1234", logger=MockLogger()) module_info = deepcopy(CATALOG_GET_MODULE_VERSION) module_info["docker_img_name"] = "mock_app:latest" - try: - os.makedirs("/tmp/mr/") - except: - pass - if os.path.exists("/tmp/mr/workdir/output.json"): - os.remove("/tmp/mr/workdir/output.json") + if os.path.exists(f"{self.workdir}/workdir/output.json"): + os.remove(f"{self.workdir}/workdir/output.json") q = Queue() params = deepcopy(EE2_JOB_PARAMS) params["method"] = "echo_test.badmethod" From 88dfd6c6d73e628179b766c3113342d0f915877a Mon Sep 17 00:00:00 2001 From: Shane Canon Date: Tue, 30 Jan 2024 13:04:25 -0800 Subject: [PATCH 2/2] Minor cleanup * add error handling * max timeouts parameters * add a wait for start --- JobRunner/JobRunner.py | 21 +++++++++++++++++++-- JobRunner/callback_server.py | 19 ++++++++++--------- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/JobRunner/JobRunner.py b/JobRunner/JobRunner.py index c5dae57..61507b4 100644 --- a/JobRunner/JobRunner.py +++ b/JobRunner/JobRunner.py @@ -30,6 +30,10 @@ class JobRunner(object): on a container runtime. It handles starting the callback service to support subjobs and provenenace calls. """ + max_retries = 10 + poll_sleep = 0.2 + canceled_sleep = 5 + retry_sleep = 30 def __init__(self, config: Config, port=None): """ @@ -222,7 +226,7 @@ def _watch(self, config: dict) -> dict: if not self._check_job_status(): self.logger.error("Job canceled or unexpected error") self._cancel() - _sleep(5) + _sleep(self.canceled_sleep) return {"error": "Canceled or unexpected error"} def _init_callback_url(self, port=None): @@ -287,7 +291,8 @@ def _retry_finish(self, finish_job_params: dict, success: bool): if self.ee2: self.ee2.finish_job(finish_job_params) except Exception: - _sleep(30) + logging.warn(f"Waiting for completion: {self.retry_sleep}s") + _sleep(self.retry_sleep) if self.ee2: self.ee2.finish_job(finish_job_params) @@ -368,6 +373,17 @@ def run(self): ] self.cbs = Process(target=start_callback_server, args=cb_args) self.cbs.start() + ct = 0 + while ct < self.max_retries: + try: + resp = requests.get(self.callback_url) + break + except requests.exceptions.ConnectionError: + logging.info("Waiting for Callback") + ct += 1 + _sleep(self.poll_sleep) + else: + logging.warn("Callback server not responding") # Submit the main job self.logger.log(f"Job is about to run {job_params.get('app_id')}") @@ -454,3 +470,4 @@ def callback(self, job_params=None): self.cbs = Process(target=start_callback_server, args=cb_args) self.cbs.start() self._watch(config) + self.cbs.terminate() diff --git a/JobRunner/callback_server.py b/JobRunner/callback_server.py index ca5c5e5..748db6e 100644 --- a/JobRunner/callback_server.py +++ b/JobRunner/callback_server.py @@ -4,7 +4,7 @@ from queue import Empty import logging from typing import Annotated, Union -from fastapi import FastAPI, Header +from fastapi import FastAPI, Header, HTTPException from pydantic import BaseModel import uvicorn @@ -19,10 +19,6 @@ out_q = None -def abort(): - print("TODO") - - def config(conf): global token global out_q @@ -50,7 +46,8 @@ def start_callback_server(ip, port, out_queue, in_queue, token, bypass_token): if os.environ.get("IN_CONTAINER"): ip = "0.0.0.0" #app.run(host=ip, port=port, debug=False, access_log=False) - uvconfig = uvicorn.Config("JobRunner.callback_server:app", host=ip, port=port, log_level="info") + uvconfig = uvicorn.Config("JobRunner.callback_server:app", host=ip, port=port, + access_log=False, log_level="warning") server = uvicorn.Server(uvconfig) server.run() @@ -60,6 +57,11 @@ class RPC(BaseModel): method: str +@app.get("/") +async def root(): + return {"ready": True} + + @app.post("/") async def root(data: dict, Authorization: Annotated[Union[str, None], Header()] = None): # data = request.json @@ -75,7 +77,6 @@ async def root(data: dict, Authorization: Annotated[Union[str, None], Header()] def _check_finished(info=None): global prov global in_q - logging.debug(info) try: # Flush the queue while True: @@ -94,7 +95,7 @@ def _check_rpc_token(req_token): if bypass_token: pass else: - abort(401) + raise HTTPException(status_code=401, detail="Bad token") def _handle_provenance(): @@ -113,7 +114,7 @@ def _handle_submit(module, method, data, token): def _handle_checkjob(data): if "params" not in data: - abort(404) + raise HTTPException(status_code=404, detail="Missing params") job_id = data["params"][0] _check_finished(f"Checkjob for {job_id}") resp = {"finished": 0}