Skip to content
This repository has been archived by the owner on Feb 10, 2021. It is now read-only.

Compatibility fixes with distributed 1.21.3 #63

Merged
merged 6 commits into from
Mar 24, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 29 additions & 7 deletions dask_drmaa/adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from distributed import Scheduler
from distributed.utils import log_errors
from distributed.deploy import adaptive
from six import string_types
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add six to requirements.txt.

Copy link
Collaborator Author

@azjps azjps Mar 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI six is in the requirements.txt for distributed, but I suppose there is no harm and it is more clear in duplicating here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I spend part of my days with package management issues. Have generally found explicit requirements makes things easier to manage.

from tornado import gen

from .core import get_session
Expand Down Expand Up @@ -34,7 +35,7 @@ class Adaptive(adaptive.Adaptive):
... """ Remove worker addresses from cluster """
'''
def __init__(self, cluster=None, scheduler=None, interval=1000,
startup_cost=1, scale_factor=2):
startup_cost=1, scale_factor=2, **kwargs):
if cluster is None:
raise TypeError("`Adaptive.__init__() missing required argument: "
"`cluster`")
Expand All @@ -50,7 +51,8 @@ def __init__(self, cluster=None, scheduler=None, interval=1000,

super(Adaptive, self).__init__(scheduler, cluster, interval,
startup_cost=startup_cost,
scale_factor=scale_factor)
scale_factor=scale_factor,
**kwargs)

def get_busy_workers(self):
s = self.scheduler
Expand All @@ -77,10 +79,20 @@ def get_scale_up_kwargs(self):
kwargs = {'n': max(instances, len(self.get_busy_workers()))}
memory = []
if self.scheduler.unrunnable:
for key in self.scheduler.unrunnable:
for task in self.scheduler.unrunnable:
if isinstance(task, string_types):
# Backwards compatibility for distributed pre-1.21.0
key = task
prefix = key
else:
# In distributed==1.21.0, the scheduler now stores TaskState objects
# instead of string keys in its task collections:
# https://github.com/dask/distributed/pull/1594
key = task.key
prefix = task.prefix
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any thoughts on bumping our distributed requirement?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure, I think eventually we will definitely want to so that we don't have to deal with backwards-compatibility, but as of right now distributed==1.21.0 has only been out for a month and a half so maybe the dust hasn't settled yet. cc @mrocklin if you have any quick recommendations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm comfortable bumping up requirements. If people want to use old versions of distributed then they can use old versions of dask-drmaa as well. I think we should maintain a little bit of slack between the various dask-foo projects, but not too much.

duration = 0
memory = []
duration += self.scheduler.task_duration.get(key, 0.1)
duration += self.scheduler.task_duration.get(prefix, 0.1)

if key in self.scheduler.resource_restrictions:
m = self.scheduler.resource_restrictions[key].get('memory')
Expand All @@ -93,7 +105,17 @@ def get_scale_up_kwargs(self):
return kwargs

@gen.coroutine
def _retire_workers(self):
def _retire_workers(self, workers=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add this option?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NVM. Comes from PR ( dask/distributed#1797 ).

We might want to revisit whether we should be carrying this function at all or just using the parent class' functionality.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jakirkham Currently dask_drmaa==0.1.2 raises a TypeError when run with distributed==1.21.3 on this line of code in Adaptive. I guess the unit tests don't cover this or the distributed version is not pinned high enough in Travis.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, sorry, missed where this was coming from on the first pass.

Would we be able to reuse that method or do you see issues with that approach?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, agreed that we should try to move back to the base implementation. I am not really sure why the implementation diverged from the base class to begin with, although it definitely seemed intentional: c51a15a#diff-d2ee7bfcb2312cc404b8b4953eaa2576L47. I haven't had a chance to step through the dask/distributed internals to see if there's still any behavioral change here. Perhaps @nevermindewe can shed some light? Anyway for now, like you suggested, it may be safer to move back to the base class in a separate PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added as issue ( #65 ).

if workers is None:
workers = self.workers_to_close()
if not workers:
raise gen.Return(workers)
with log_errors():
workers = yield self.scheduler.retire_workers(close_workers=True)
logger.info("Retiring workers {}".format(workers))
result = yield self.scheduler.retire_workers(workers,
remove=True,
close_workers=True)
if result:
logger.info("Retiring workers {}".format(result))
# Diverges from distributed.Adaptive here:
# ref c51a15a35a8a64c21c1182bfd9209cb6b7d95380
raise gen.Return(result)
1 change: 1 addition & 0 deletions dask_drmaa/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def create_job_template(self, nativeSpecification='', cpus=1, memory=None,
if memory:
args = args + ['--memory-limit', str(memory * (1 - memory_fraction))]
args = args + ['--resources', 'memory=%f' % (memory * memory_fraction)]
# h_vmem is SGE-specific
ns += ' -l h_vmem=%dG' % int(memory / 1e9) # / cpus
if cpus:
args = args + ['--nprocs', '1', '--nthreads', str(cpus)]
Expand Down
2 changes: 0 additions & 2 deletions dask_drmaa/tests/test_adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
from distributed import Client
from distributed.utils_test import loop, inc, slowinc


@pytest.mark.skip(reason="currently times out for an unknown reason")
def test_adaptive_memory(loop):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this seem to be working reliably now or is it still a little flaky?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I've tried, it seems to work reliably now 😁

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it acts up again, we can always reopen issue ( #58 ). Guessing you fixed it though as the issue cropped up with Distributed 1.21. Thanks for working on it.

with SGECluster(scheduler_port=0, cleanup_interval=100) as cluster:
adapt = Adaptive(cluster, cluster.scheduler)
Expand Down