Skip to content

Commit

Permalink
Compatibility fixes with distributed 1.21.3
Browse files Browse the repository at this point in the history
- Support passing kwargs to distributed.Adaptive.__init__, which now
  takes keyword arguments like minimum and maximum [number of workers].
- Add an optional workers argument to _retire_workers() to match
  dask/distributed#1797 -- currently Adaptive
  raises a TypeError.
  • Loading branch information
azjps committed Mar 15, 2018
1 parent 6ac2d6d commit c189dee
Showing 1 changed file with 17 additions and 5 deletions.
22 changes: 17 additions & 5 deletions dask_drmaa/adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class Adaptive(adaptive.Adaptive):
... """ Remove worker addresses from cluster """
'''
def __init__(self, cluster=None, scheduler=None, interval=1000,
startup_cost=1, scale_factor=2):
startup_cost=1, scale_factor=2, **kwargs):
if cluster is None:
raise TypeError("`Adaptive.__init__() missing required argument: "
"`cluster`")
Expand All @@ -50,7 +50,8 @@ def __init__(self, cluster=None, scheduler=None, interval=1000,

super(Adaptive, self).__init__(scheduler, cluster, interval,
startup_cost=startup_cost,
scale_factor=scale_factor)
scale_factor=scale_factor,
**kwargs)

def get_busy_workers(self):
s = self.scheduler
Expand Down Expand Up @@ -93,7 +94,18 @@ def get_scale_up_kwargs(self):
return kwargs

@gen.coroutine
def _retire_workers(self):
def _retire_workers(self, workers=None):
if workers is None:
workers = self.workers_to_close()
if not workers:
raise gen.Return(workers)
with log_errors():
workers = yield self.scheduler.retire_workers(close_workers=True)
logger.info("Retiring workers {}".format(workers))
result = yield self.scheduler.retire_workers(workers,
remove=True,
close_workers=True)
if result:
logger.info("Retiring workers {}".format(result))
# Diverges from distributed.Adaptive here:
# ref c51a15a35a8a64c21c1182bfd9209cb6b7d95380
raise gen.Return(result)

0 comments on commit c189dee

Please sign in to comment.