Skip to content

WIP: WDL vol mount #39

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions JobRunner/JobRunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def __init__(self, config, ee2_url, job_id, token, admin_token, debug=False):
self.jr_queue = Queue()
self.callback_queue = Queue()
self.prov = None
self.module = None
self.method = None
self._init_callback_url()
self.debug = debug
self.mr = MethodRunner(
Expand Down Expand Up @@ -120,12 +122,16 @@ def _submit_special(self, config, job_id, job_params):
"""
(module, method) = job_params["method"].split(".")
self.logger.log("Submit %s as a %s:%s job" % (job_id, module, method))
# get the volumes using the parent module and method and
# the special method call (e.g. wdl, hpc)
vm = self.cc.get_volume_mounts(self.module, self.method, method)

self.sr.run(
config,
job_params,
job_id,
callback=self.callback_url,
volumes=vm,
fin_q=[self.jr_queue],
)

Expand Down Expand Up @@ -334,6 +340,7 @@ def run(self):
self.logger.error("Failed to config . Exiting.")
raise e

(self.module, self.method) = job_params["method"].split(".")
config["job_id"] = self.job_id
self.logger.log(
f"Server version of Execution Engine: {config.get('ee.server.version')}"
Expand Down
18 changes: 14 additions & 4 deletions JobRunner/SpecialRunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ def __init__(self, config, job_id, logger=None):
self.containers = []
self.threads = []
self.allowed_types = ["slurm", "wdl"]
self.WDL_RUN = 'wdl_run'

_BATCH_POLL = 10
_FILE_POLL = 10
_MAX_RETRY = 5

def run(self, config, data, job_id, callback=None, fin_q=[]):
def run(self, config, data, job_id, callback=None, volumes=None, fin_q=[]):
# TODO:
# initialize working space
# check job type against an allow list
Expand All @@ -49,7 +50,7 @@ def run(self, config, data, job_id, callback=None, fin_q=[]):
if method == "slurm":
return self._batch_submit(method, config, data, job_id, fin_q)
elif method == "wdl":
return self._wdl_run(method, config, data, job_id, fin_q)
return self._wdl_run(method, config, data, job_id, volumes, fin_q)

def _check_batch_job(self, check, slurm_jobid):
cmd = [check, slurm_jobid]
Expand Down Expand Up @@ -180,13 +181,22 @@ def _readio(self, p, job_id, queues):
for q in queues:
q.put(["finished_special", job_id, result])

def _wdl_run(self, stype, config, data, job_id, queues):
def _wdl_run(self, stype, config, data, job_id, volumes, queues):
"""
This subbmits the job to the batch system and starts
a thread to monitor the progress.

"""
params = data["params"][0]
if volumes:
index = 0
for vol in volumes:
index = 0
key = 'VOL_MOUNT_{}'.format(index)
if vol['read_only']:
vol['flag'] = ':ro'
value = '{host_dir}:{container_dir}{flag}'.format(**vol)
os.environ[key] = value
if "workflow" not in params:
raise ValueError("Missing workflow script")
if "inputs" not in params:
Expand All @@ -199,7 +209,7 @@ def _wdl_run(self, stype, config, data, job_id, queues):
inputs = params["inputs"]
if not os.path.exists(inputs):
raise OSError("Inputs file not found at %s" % (inputs))
cmd = ["wdl_run", inputs, wdl]
cmd = [self.WDL_RUN, inputs, wdl]
proc = Popen(cmd, bufsize=0, stdout=PIPE, stderr=PIPE)
out = Thread(target=self._readio, args=[proc, job_id, queues])
self.threads.append(out)
Expand Down
1 change: 1 addition & 0 deletions test/test_jobrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ def test_special(self, mock_ee2, mock_auth):
self.config, self.ee2_url, self.jobid, self.token, self.admin_token
)
jr._get_cgroup = MagicMock(return_value=None)
jr.cc.catalog.list_volume_mounts = MagicMock(return_value=[])
params = deepcopy(EE2_JOB_PARAMS)
submitscript = os.path.join(self.workdir, "workdir/tmp", "submit.sl")
with open(submitscript, "w") as f:
Expand Down
17 changes: 15 additions & 2 deletions test/test_specialrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import os
import unittest

from .mock_data import CATALOG_LIST_VOLUME_MOUNTS
from JobRunner.SpecialRunner import SpecialRunner
from queue import Queue
import json


class MockLogger(object):
Expand Down Expand Up @@ -40,10 +42,11 @@ def setUpClass(cls):
task hello_world {
String name = "World"
command {
echo $VOL_MOUNT_0
echo 'Hello, ${name}'
}
output {
File out = stdout()
Array[String] out = read_lines(stdout())
}
#runtime {
# docker: 'ubuntu:latest'
Expand Down Expand Up @@ -90,6 +93,12 @@ def test_run_wdl(self):
"params": [{"workflow": "workflow.wdl", "inputs": "inputs.json"}],
}
job_id = "1234"
vm = [{
'host_dir': '/tmp/data',
'container_dir': '/data',
'read_only': 1
}]

if not os.path.exists("/tmp/workdir/tmp"):
os.makedirs("/tmp/workdir/tmp/")
with open("/tmp/workdir/tmp/workflow.wdl", "w") as f:
Expand All @@ -99,7 +108,7 @@ def test_run_wdl(self):
q = Queue()
self.sr._FILE_POLL = 0.1
self.sr._BATCH_POLL = 0.3
self.sr.run(config, data, job_id, fin_q=[q])
self.sr.run(config, data, job_id, volumes=vm, fin_q=[q])
result = q.get(timeout=60)
self.assertEquals(result[0], "finished_special")
self.assertEquals(len(result), 3)
Expand All @@ -110,3 +119,7 @@ def test_run_wdl(self):
count += 1
self.assertTrue(count)
self.assertLess(self.logger.ct, 10)
with open('/tmp/workdir/tmp/meta.json') as f:
data = json.loads(f.read())['outputs']['hello.hello_world.out']
self.assertEquals(data[0], '/tmp/data:/data:ro')
self.assertEquals(data[1], 'Hello, World')