diff --git a/README.rst b/README.rst index a0450bc..025a712 100644 --- a/README.rst +++ b/README.rst @@ -51,8 +51,8 @@ Testing ------- This repository contains a Docker-compose testing harness for a Son of Grid -Engine cluster with a master and two slaves. You can initialize this system -as follows +Engine cluster with a master and two slaves. You can initialize this system +as follows: .. code-block:: bash diff --git a/dask_drmaa/adaptive.py b/dask_drmaa/adaptive.py index b505a8f..15f61e6 100644 --- a/dask_drmaa/adaptive.py +++ b/dask_drmaa/adaptive.py @@ -34,7 +34,7 @@ class Adaptive(adaptive.Adaptive): ... """ Remove worker addresses from cluster """ ''' def __init__(self, cluster=None, scheduler=None, interval=1000, - startup_cost=1, scale_factor=2): + startup_cost=1, scale_factor=2, **kwargs): if cluster is None: raise TypeError("`Adaptive.__init__() missing required argument: " "`cluster`") @@ -50,7 +50,8 @@ def __init__(self, cluster=None, scheduler=None, interval=1000, super(Adaptive, self).__init__(scheduler, cluster, interval, startup_cost=startup_cost, - scale_factor=scale_factor) + scale_factor=scale_factor, + **kwargs) def get_busy_workers(self): s = self.scheduler @@ -77,10 +78,12 @@ def get_scale_up_kwargs(self): kwargs = {'n': max(instances, len(self.get_busy_workers()))} memory = [] if self.scheduler.unrunnable: - for key in self.scheduler.unrunnable: + for task in self.scheduler.unrunnable: + key = task.key + prefix = task.prefix duration = 0 memory = [] - duration += self.scheduler.task_duration.get(key, 0.1) + duration += self.scheduler.task_duration.get(prefix, 0.1) if key in self.scheduler.resource_restrictions: m = self.scheduler.resource_restrictions[key].get('memory') @@ -93,7 +96,18 @@ def get_scale_up_kwargs(self): return kwargs @gen.coroutine - def _retire_workers(self): + def _retire_workers(self, workers=None): + if workers is None: + workers = self.workers_to_close() + if not workers: + raise gen.Return(workers) with log_errors(): - workers = yield self.scheduler.retire_workers(close_workers=True) - logger.info("Retiring workers {}".format(workers)) + result = yield self.scheduler.retire_workers(workers, + remove=True, + close_workers=True) + if result: + logger.info("Retiring workers {}".format(result)) + # Diverges from distributed.Adaptive here: + # ref c51a15a35a8a64c21c1182bfd9209cb6b7d95380 + # TODO: can this be reconciled back to base class implementation? + raise gen.Return(result) diff --git a/dask_drmaa/sge.py b/dask_drmaa/sge.py index 483c2d7..f62273f 100644 --- a/dask_drmaa/sge.py +++ b/dask_drmaa/sge.py @@ -42,6 +42,7 @@ def create_job_template(self, nativeSpecification='', cpus=1, memory=None, if memory: args = args + ['--memory-limit', str(memory * (1 - memory_fraction))] args = args + ['--resources', 'memory=%f' % (memory * memory_fraction)] + # h_vmem is SGE-specific ns += ' -l h_vmem=%dG' % int(memory / 1e9) # / cpus if cpus: args = args + ['--nprocs', '1', '--nthreads', str(cpus)] diff --git a/dask_drmaa/tests/test_adaptive.py b/dask_drmaa/tests/test_adaptive.py index ca76c3e..aa1e32d 100644 --- a/dask_drmaa/tests/test_adaptive.py +++ b/dask_drmaa/tests/test_adaptive.py @@ -8,8 +8,6 @@ from distributed import Client from distributed.utils_test import loop, inc, slowinc - -@pytest.mark.skip(reason="currently times out for an unknown reason") def test_adaptive_memory(loop): with SGECluster(scheduler_port=0, cleanup_interval=100) as cluster: adapt = Adaptive(cluster, cluster.scheduler) diff --git a/requirements.txt b/requirements.txt index 70a1a70..2a452ea 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ dask -distributed >= 1.20.0 +distributed >= 1.21.0 drmaa click