Skip to content
This repository has been archived by the owner on Feb 10, 2021. It is now read-only.

Commit

Permalink
Merge pull request #30 from pitrou/stdout_unbuffered
Browse files Browse the repository at this point in the history
Set standard output to unbuffered / line-buffered
  • Loading branch information
pitrou authored May 31, 2017
2 parents 5b28253 + 122484f commit fc24bbd
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 5 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ matrix:
include:
- python: "2.7"
env: OS=ubuntu-14.04
- python: "3.6"
env: OS=ubuntu-14.04

env:
global:
Expand Down
22 changes: 18 additions & 4 deletions dask_drmaa/core.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections import namedtuple
import logging
import os
import socket
Expand All @@ -24,18 +25,26 @@ def get_session():
return _global_session[0]


WorkerSpec = namedtuple('WorkerSpec',
('job_id', 'kwargs', 'stdout', 'stderr'))


worker_bin_path = os.path.join(sys.exec_prefix, 'bin', 'dask-worker')

# All JOB_ID and TASK_ID environment variables
JOB_ID = "$JOB_ID$SLURM_JOB_ID$LSB_JOBID"
TASK_ID = "$SGE_TASK_ID$SLURM_ARRAY_TASK_ID$LSB_JOBINDEX"

worker_out_path_template = os.path.join(os.getcwd(), 'worker.%(jid)s.%(kind)s')

default_template = {
'jobName': 'dask-worker',
'outputPath': ':%s/worker.$JOB_ID.$drmaa_incr_ph$.out' % os.getcwd(),
'errorPath': ':%s/worker.$JOB_ID.$drmaa_incr_ph$.err' % os.getcwd(),
'outputPath': ':' + worker_out_path_template % dict(jid='$JOB_ID.$drmaa_incr_ph$', kind='out'),
'errorPath': ':' + worker_out_path_template % dict(jid='$JOB_ID.$drmaa_incr_ph$', kind='err'),
'workingDirectory': os.getcwd(),
'nativeSpecification': '',
# stdout/stderr are redirected to files, make sure their contents don't lag
'jobEnvironment': {'PYTHONUNBUFFERED': '1'},
'args': []
}

Expand Down Expand Up @@ -123,7 +132,7 @@ def remove_script():
io_loop=self.scheduler.loop)
self._cleanup_callback.start()

self.workers = {} # {job-id: {'resource': quanitty}}
self.workers = {} # {job-id: WorkerSpec}

@gen.coroutine
def _start(self):
Expand Down Expand Up @@ -158,7 +167,12 @@ def start_workers(self, n=1, **kwargs):
with self.create_job_template(**kwargs) as jt:
ids = get_session().runBulkJobs(jt, 1, n, 1)
logger.info("Start %d workers. Job ID: %s", len(ids), ids[0].split('.')[0])
self.workers.update({jid: kwargs for jid in ids})
self.workers.update(
{jid: WorkerSpec(job_id=jid, kwargs=kwargs,
stdout=worker_out_path_template % dict(jid=jid, kind='out'),
stderr=worker_out_path_template % dict(jid=jid, kind='err'),
)
for jid in ids})

def stop_workers(self, worker_ids, sync=False):
if isinstance(worker_ids, str):
Expand Down
33 changes: 32 additions & 1 deletion dask_drmaa/tests/test_core.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from __future__ import print_function

import os
from time import sleep, time
import shutil
import sys
import tempfile
from time import sleep, time

import pytest

Expand Down Expand Up @@ -137,7 +140,35 @@ def test_logs(loop):
assert "worker" in f.read()


def test_stdout_in_worker():
"""
stdout and stderr should be redirected and line-buffered in workers.
"""
def inc_and_print(x):
print("stdout: inc_and_print(%s)" % (x,))
print("stderr: inc_and_print(%s)" % (x,), file=sys.stderr)
return x + 1

def get_lines(fn):
with open(fn) as f:
return [line.strip() for line in f]

with DRMAACluster(scheduler_port=0, diagnostics_port=None) as cluster:
with Client(cluster) as client:
cluster.start_workers(1)
future = client.submit(inc_and_print, 1)
assert future.result() == 2

w, = cluster.workers.values()
assert "stdout: inc_and_print(1)" in get_lines(w.stdout)
assert "stderr: inc_and_print(1)" in get_lines(w.stderr)


def test_cleanup():
"""
Not a test, just ensure that all worker logs are cleaned up at the
end of the test run.
"""
def cleanup_logs():
from glob import glob
import os
Expand Down

0 comments on commit fc24bbd

Please sign in to comment.