Skip to content

Scanon/fastapi #78

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions JobRunner/JobRunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ class JobRunner(object):
on a container runtime. It handles starting the callback service
to support subjobs and provenenace calls.
"""
max_retries = 10
poll_sleep = 0.2
canceled_sleep = 5
retry_sleep = 30

def __init__(self, config: Config, port=None):
"""
Expand Down Expand Up @@ -222,7 +226,7 @@ def _watch(self, config: dict) -> dict:
if not self._check_job_status():
self.logger.error("Job canceled or unexpected error")
self._cancel()
_sleep(5)
_sleep(self.canceled_sleep)
return {"error": "Canceled or unexpected error"}

def _init_callback_url(self, port=None):
Expand Down Expand Up @@ -287,7 +291,8 @@ def _retry_finish(self, finish_job_params: dict, success: bool):
if self.ee2:
self.ee2.finish_job(finish_job_params)
except Exception:
_sleep(30)
logging.warn(f"Waiting for completion: {self.retry_sleep}s")
_sleep(self.retry_sleep)
if self.ee2:
self.ee2.finish_job(finish_job_params)

Expand Down Expand Up @@ -368,6 +373,17 @@ def run(self):
]
self.cbs = Process(target=start_callback_server, args=cb_args)
self.cbs.start()
ct = 0
while ct < self.max_retries:
try:
resp = requests.get(self.callback_url)
break
except requests.exceptions.ConnectionError:
logging.info("Waiting for Callback")
ct += 1
_sleep(self.poll_sleep)
else:
logging.warn("Callback server not responding")

# Submit the main job
self.logger.log(f"Job is about to run {job_params.get('app_id')}")
Expand Down Expand Up @@ -454,3 +470,4 @@ def callback(self, job_params=None):
self.cbs = Process(target=start_callback_server, args=cb_args)
self.cbs.start()
self._watch(config)
self.cbs.terminate()
83 changes: 56 additions & 27 deletions JobRunner/callback_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,30 @@
import uuid
import os
from queue import Empty
import logging
from typing import Annotated, Union
from fastapi import FastAPI, Header, HTTPException
from pydantic import BaseModel
import uvicorn

from sanic import Sanic
from sanic.config import Config
from sanic.exceptions import SanicException
from sanic.log import logger
from sanic.response import json

Config.SANIC_REQUEST_TIMEOUT = 300
app = FastAPI()

app = Sanic("jobrunner")
outputs = dict()
prov = []
token = None
bypass_token = False
in_q = None
out_q = None


def config(conf):
global token
global out_q
global in_q
token = conf["token"]
out_q = conf["out_q"]
in_q = conf["in_q"]


def start_callback_server(ip, port, out_queue, in_queue, token, bypass_token):
Expand All @@ -29,27 +41,42 @@ def start_callback_server(ip, port, out_queue, in_queue, token, bypass_token):
"KEEP_ALIVE_TIMEOUT": timeout,
"REQUEST_MAX_SIZE": max_size_bytes,
}
app.config.update(conf)
#app.config.update(conf)
config(conf)
if os.environ.get("IN_CONTAINER"):
ip = "0.0.0.0"
app.run(host=ip, port=port, debug=False, access_log=False)
#app.run(host=ip, port=port, debug=False, access_log=False)
uvconfig = uvicorn.Config("JobRunner.callback_server:app", host=ip, port=port,
access_log=False, log_level="warning")
server = uvicorn.Server(uvconfig)
server.run()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to see if the

  • monitor_jobrunner.py still finds these logs
  • any missing settings we need




class RPC(BaseModel):
method: str


@app.get("/")
async def root():
return {"ready": True}


@app.route("/", methods=["GET", "POST"])
async def root(request):
data = request.json
if request.method == "POST" and data is not None and "method" in data:
token = request.headers.get("Authorization")
@app.post("/")
async def root(data: dict, Authorization: Annotated[Union[str, None], Header()] = None):
# data = request.json
if data is not None and "method" in data:
token = Authorization
response = await _process_rpc(data, token)
status = 500 if "error" in response else 200
return json(response, status=status)
return json([{}])
# return json(response, status=status)
return response
return {}


def _check_finished(info=None):
global prov
logger.debug(info)
in_q = app.config["in_q"]
global in_q
try:
# Flush the queue
while True:
Expand All @@ -62,12 +89,13 @@ def _check_finished(info=None):
pass


def _check_rpc_token(token):
if token != app.config.get("token"):
if app.config.get("bypass_token"):
def _check_rpc_token(req_token):
global token
if req_token != token:
if bypass_token:
pass
else:
raise SanicException(status_code=401)
raise HTTPException(status_code=401, detail="Bad token")


def _handle_provenance():
Expand All @@ -76,16 +104,17 @@ def _handle_provenance():


def _handle_submit(module, method, data, token):
global out_q
_check_rpc_token(token)
job_id = str(uuid.uuid1())
data["method"] = "%s.%s" % (module, method[1:-7])
app.config["out_q"].put(["submit", job_id, data])
out_q.put(["submit", job_id, data])
return {"result": [job_id]}


def _handle_checkjob(data):
if "params" not in data:
raise SanicException(status_code=404)
raise HTTPException(status_code=404, detail="Missing params")
job_id = data["params"][0]
_check_finished(f"Checkjob for {job_id}")
resp = {"finished": 0}
Expand All @@ -97,7 +126,7 @@ def _handle_checkjob(data):
if "error" in resp:
return {"result": [resp], "error": resp["error"]}
except Exception as e:
logger.debug(e)
logging.debug(e)

return {"result": [resp]}

Expand All @@ -122,7 +151,7 @@ async def _process_rpc(data, token):
_check_rpc_token(token)
job_id = str(uuid.uuid1())
data["method"] = "%s.%s" % (module, method)
app.config["out_q"].put(["submit", job_id, data])
out_q.put(["submit", job_id, data])
try:
while True:
_check_finished(f'synk check for {data["method"]} for {job_id}')
Expand All @@ -134,7 +163,7 @@ async def _process_rpc(data, token):
except Exception as e:
# Attempt to log error, but this is not very effective..
exception_message = f"Timeout or exception: {e} {type(e)}"
logger.error(exception_message)
logging.error(exception_message)
error_obj = {
"error": exception_message,
"code": "123",
Expand Down
66 changes: 34 additions & 32 deletions test/test_callback_server.py
Original file line number Diff line number Diff line change
@@ -1,74 +1,76 @@
# Import the Sanic app, usually created with Sanic(__name__)
from JobRunner.callback_server import app
from JobRunner.callback_server import app, config
from fastapi.testclient import TestClient
import json
from queue import Queue
from unittest.mock import patch
from pprint import pprint

_TOKEN = "bogus"

client = TestClient(app)


def _post(data):
# Returns -> httpx.Response:
header = {"Authorization": _TOKEN}
sa = {"access_log": False}
return app.test_client.post("/", server_kwargs=sa, headers=header, data=data)[1]
return client.post("/", headers=header, json=data)


def test_index_returns_200():
response = app.test_client.get("/")[1]
def xtest_cb_returns_200():
response = client.get("/")[1]
assert response.status == 200


def test_index_post_empty():
response = _post(None)
print(response.json)
assert response.json == [{}]
def test_cb_post_empty():
response = _post({})
assert response.json() == {}


def test_index_post():
def test_cb_post():
out_q = Queue()
in_q = Queue()
conf = {"token": _TOKEN, "out_q": out_q, "in_q": in_q}
app.config.update(conf)
data = json.dumps({"method": "bogus._test_submit"})
config(conf)
data = {"method": "bogus._test_submit"}
response = _post(data)
assert "result" in response.json
job_id = response.json["result"][0]
assert "result" in response.json()
job_id = response.json()["result"][0]
mess = out_q.get()
assert "submit" in mess
data = json.dumps({"method": "bogus._check_job", "params": [job_id]})
data = {"method": "bogus._check_job", "params": [job_id]}
response = _post(data)
pprint(response)
pprint(response.json())

assert "result" in response.json
assert response.json["result"][0]["finished"] is 0
data = json.dumps({"method": "bogus.get_provenance", "params": [job_id]})
assert "result" in response.json()
assert response.json()["result"][0]["finished"]==0
data = {"method": "bogus.get_provenance", "params": [job_id]}
response = _post(data)
assert "result" in response.json
assert response.json["result"][0] in [None,[]]
assert "result" in response.json()
assert response.json()["result"][0] in [None,[]]
in_q.put(["prov", job_id, "bogus"])
response = _post(data)
assert "result" in response.json
assert response.json["result"][0] == "bogus"
assert "result" in response.json()
assert response.json()["result"][0] == "bogus"
in_q.put(["output", job_id, {"foo": "bar"}])
data = json.dumps({"method": "bogus._check_job", "params": [job_id]})
data = {"method": "bogus._check_job", "params": [job_id]}
response = _post(data)
assert "result" in response.json
assert response.json["result"][0]["finished"] is 1
assert "foo" in response.json["result"][0]
assert "result" in response.json()
assert response.json()["result"][0]["finished"]==1
assert "foo" in response.json()["result"][0]


@patch("JobRunner.callback_server.uuid", autospec=True)
def test_index_submit_sync(mock_uuid):
def test_cb_submit_sync(mock_uuid):
out_q = Queue()
in_q = Queue()
conf = {"token": _TOKEN, "out_q": out_q, "in_q": in_q}
app.config.update(conf)
#app.config.update(conf)
config(conf)
mock_uuid.uuid1.return_value = "bogus"
data = json.dumps({"method": "bogus.test"})
data = {"method": "bogus.test"}
in_q.put(["output", "bogus", {"foo": "bar"}])
response = _post(data)
assert "finished" in response.json
assert "foo" in response.json
assert "finished" in response.json()
assert "foo" in response.json()

2 changes: 0 additions & 2 deletions test/test_dockerrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ def test_run(self):
f.write(json.dumps(inp))
vols = {workdir: {"bind": "/kb/module/work", "mode": "rw"}}
of = f"{workdir}/output.json"
# vols = {"/tmp": {"bind": "/kb/module/work", "mode": "rw"}}
# of = "/tmp/output.json"
if os.path.exists(of):
os.remove(of)
c = dr.run("1234", "mock_app:latest", {}, vols, {}, [])
Expand Down
6 changes: 0 additions & 6 deletions test/test_jobrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,6 @@ def setUpClass(cls):
cls.config = Config(job_id=cls.jobid, workdir=cls.workdir)
cls.future = _time() + 3600
cls.auth2_url = f"{base}auth/api/V2/token"
# cls.config = {
# "catalog-service-url": base + "catalog",
# "auth-service-url": base + "auth/api/legacy/KBase/Sessions/Login",
# "auth2-url": base + "auth/api/V2/token",
# "workdir": cls.workdir
# }
if not os.path.exists(cls.workdir):
os.mkdir(cls.workdir)
if "KB_ADMIN_AUTH_TOKEN" not in os.environ:
Expand Down
6 changes: 0 additions & 6 deletions test/test_methodrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,6 @@ def setUpClass(cls):
os.makedirs("/tmp/mr/", exist_ok=True)
cls.job_id = "1234"
cls.cfg = Config(workdir=cls.workdir, job_id=cls.job_id)
# cls.cfg = {
# "catalog-service-url": "http://localhost",
# "token": cls.token,
# "admin_token": os.environ.get("KB_ADMIN_AUTH_TOKEN"),
# "workdir": cls.workdir
# }
cls.logger = MockLogger()
cls.mr = MethodRunner(cls.cfg, logger=cls.logger)
base = "https://ci.kbase.us/services/"
Expand Down