Skip to content
This repository has been archived by the owner on Feb 10, 2021. It is now read-only.

Commit

Permalink
Merge pull request #29 from pitrou/preexec
Browse files Browse the repository at this point in the history
Add a preexec_commands argument to execute arbitrary commands before launching worker
  • Loading branch information
pitrou authored May 25, 2017
2 parents 0599040 + e104aff commit 5b28253
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 7 deletions.
2 changes: 1 addition & 1 deletion dask_drmaa/cli/dask_drmaa.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
@click.command()
@click.argument('nworkers', type=int)
def main(nworkers):
cluster = DRMAACluster(silence_logs=logging.INFO)
cluster = DRMAACluster(silence_logs=logging.INFO, scheduler_port=8786)
cluster.start_workers(nworkers)

def handle_signal(sig, frame):
Expand Down
1 change: 1 addition & 0 deletions dask_drmaa/cli/tests/test_dask_drmaa.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from distributed import Client
from distributed.utils_test import loop


def test_dask_drmaa(loop):
with popen(['dask-drmaa', '2']) as proc:
with Client('127.0.0.1:8786', loop=loop) as client:
Expand Down
23 changes: 17 additions & 6 deletions dask_drmaa/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,20 @@ def get_session():
}


script_template = ("""
#!/bin/bash
%s $1 --name %s.%s "${@:2}"
""" % (worker_bin_path, JOB_ID, TASK_ID)).strip()
def make_job_script(executable, name, preexec=()):
shebang = '#!/bin/bash'
execute = (
'%(executable)s $1 --name %(name)s "${@:2}"'
% dict(executable=executable, name=name)
)
preparation = list(preexec)
script_template = '\n'.join([shebang] + preparation + [execute, ''])
return script_template


class DRMAACluster(object):
def __init__(self, template=None, cleanup_interval=1000, hostname=None,
script=None, **kwargs):
script=None, preexec_commands=(), **kwargs):
"""
Dask workers launched by a DRMAA-compatible cluster
Expand Down Expand Up @@ -91,8 +96,11 @@ def __init__(self, template=None, cleanup_interval=1000, hostname=None,
dir=os.path.curdir)
self.script = fn

script_contents = make_job_script(executable=worker_bin_path,
name='%s.%s' % (JOB_ID, TASK_ID),
preexec=preexec_commands)
with open(fn, 'wt') as f:
f.write(script_template)
f.write(script_contents)

@atexit.register
def remove_script():
Expand All @@ -101,6 +109,9 @@ def remove_script():

os.chmod(self.script, 0o777)

else:
assert not preexec_commands, "Cannot specify both script and preexec_commands"

# TODO: check that user-provided script is executable

self.template = merge(default_template,
Expand Down
27 changes: 27 additions & 0 deletions dask_drmaa/tests/test_core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os
from time import sleep, time
import shutil
import tempfile

import pytest

Expand Down Expand Up @@ -37,6 +39,31 @@ def test_str(loop):
1 + 1


def test_pythonpath():
tmpdir = tempfile.mkdtemp(prefix='test_drmaa_pythonpath_', dir='.')
try:
with open(os.path.join(tmpdir, "bzzz_unlikely_module_name.py"), "w") as f:
f.write("""if 1:
def f():
return 5
""")

def func():
import bzzz_unlikely_module_name
return bzzz_unlikely_module_name.f()

with DRMAACluster(scheduler_port=0,
preexec_commands=['export PYTHONPATH=%s:PYTHONPATH' % tmpdir],
) as cluster:
with Client(cluster) as client:
cluster.start_workers(2)
x = client.submit(func)
assert x.result() == 5

finally:
shutil.rmtree(tmpdir)


def test_job_name_as_name(loop):
with DRMAACluster(scheduler_port=0) as cluster:
cluster.start_workers(2)
Expand Down

0 comments on commit 5b28253

Please sign in to comment.