Skip to content
This repository has been archived by the owner on Feb 10, 2021. It is now read-only.

Allow starting and stopping no workers #68

Merged
merged 4 commits into from
Apr 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions dask_drmaa/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ def create_job_template(self, **kwargs):
return jt

def start_workers(self, n=1, **kwargs):
if n == 0:
return
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: Might be worth adding a warning as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May revisit this later. For now will leave the warning out.


with log_errors():
with self.create_job_template(**kwargs) as jt:
ids = get_session().runBulkJobs(jt, 1, n, 1)
Expand All @@ -213,8 +216,10 @@ def start_workers(self, n=1, **kwargs):
def stop_workers(self, worker_ids, sync=False):
if isinstance(worker_ids, str):
worker_ids = [worker_ids]
else:
elif worker_ids:
worker_ids = list(worker_ids)
else:
return

# Let the scheduler gracefully retire workers first
ids_to_ips = {
Expand Down Expand Up @@ -254,8 +259,7 @@ def scale_down(self, workers):

def close(self):
logger.info("Closing DRMAA cluster")
if self.workers:
self.stop_workers(self.workers, sync=True)
self.stop_workers(self.workers, sync=True)

self.local_cluster.close()
if self._should_cleanup_script and os.path.exists(self.script):
Expand Down
10 changes: 10 additions & 0 deletions dask_drmaa/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@
from distributed.utils import tmpfile


def test_no_workers(loop):
with DRMAACluster(scheduler_port=0) as cluster:
with Client(cluster, loop=loop) as client:
cluster.start_workers(0)
assert not cluster.workers
cluster.stop_workers([])

assert not os.path.exists(cluster.script)


def test_simple(loop):
with DRMAACluster(scheduler_port=0) as cluster:
with Client(cluster, loop=loop) as client:
Expand Down