Skip to content

Commit

Permalink
Implement proactive step requester #1218
Browse files Browse the repository at this point in the history
  • Loading branch information
Bo Peng committed Feb 22, 2019
1 parent 1311609 commit 01571eb
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 13 deletions.
10 changes: 5 additions & 5 deletions src/sos/step_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1380,9 +1380,9 @@ def run(self):
#
if self._subworkflow_results:
wf_ids = sum([x['pending_workflows'] for x in self._subworkflow_results], [])
for _ in wf_ids:
for wf_id in wf_ids:
# here we did not check if workflow ids match
res = yield "env.__socket__.recv_pyobj()"
res = yield f"result for workflow {wf_id}"
if res is None:
sys.exit(0)
elif isinstance(res, Exception):
Expand Down Expand Up @@ -1525,7 +1525,7 @@ def wait_for_tasks(self, tasks, all_submitted):
results = {}
while True:
# yield an indicator of what is requested, for debugging purpose
res = yield "self.socket.recv_pyobj()"
res = yield "result of task"
if res is None:
sys.exit(0)
results.update(res)
Expand All @@ -1537,7 +1537,7 @@ def wait_for_tasks(self, tasks, all_submitted):

def handle_unknown_target(self, e):
self.socket.send_pyobj(['missing_target', e.target])
res = yield "self.socket.recv_pyobj()"
res = yield f"missing target {e.target}"
if not res:
raise e

Expand All @@ -1554,7 +1554,7 @@ def verify_dynamic_targets(self, targets):
return

self.socket.send_pyobj(['dependent_target'] + traced)
res = yield "self.socket.recv_pyobj()"
res = yield f"dependent target {traced}"
if res != 'target_resolved':
raise RuntimeError(f'Failed to veryify dependent target {traced}')

Expand Down
4 changes: 3 additions & 1 deletion src/sos/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,10 @@ def __init__(self):

def switch(self, idx):
# save old env
if idx == self._sub_idx:
return
self._sub_envs[self._sub_idx]['sos_dict'] = self.sos_dict
if len(self._sub_envs) > idx:
if len(self._sub_envs) <= idx:
self.sos_dict = WorkflowDict()
else:
self.sos_dict = self._sub_envs[idx]['sos_dict']
Expand Down
16 changes: 9 additions & 7 deletions src/sos/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,24 +115,23 @@ def run(self):


def _process_job(self):
env.switch(self._stack_idx)

if len(self._master_sockets) > self._stack_idx:
env.switch(self._stack_idx)
# if current stack is ok
env.master_socket = self._master_sockets[self._stack_idx]
else:
# use a new env
env.switch(self._stack_idx)
# a new socket is needed
env.master_socket = create_socket(env.zmq_context, zmq.PAIR)
port = socket.bind_to_random_port('tcp://127.0.0.1')
self.master_sockets.append(env.master_socket)
self.master_ports.append(port)
port = env.master_socket.bind_to_random_port('tcp://127.0.0.1')
self._master_sockets.append(env.master_socket)
self._master_ports.append(port)

# send the current socket number as a way to notify the availability of worker
env.ctrl_socket.send_pyobj(self._master_ports[self._stack_idx])
work = env.ctrl_socket.recv_pyobj()
env.logger.trace(
f'Worker {self.name} receives request {short_repr(work)}')
f'Worker {self.name} receives request {short_repr(work)} with master port {self._master_ports[self._stack_idx]}')

if work is None:
return False
Expand All @@ -151,9 +150,12 @@ def _process_job(self):

while True:
# wait 0.1s
env.logger.error(f'waiting for request {requested}')
if env.master_socket.poll(100):
# we get a response very quickly, so we continue
yres = env.master_socket.recv_pyobj()
env.logger.error(f'receive {yres} for request {requested}')

requested = runner.send(yres)
break
# now let us ask if the master has something else for us
Expand Down
22 changes: 22 additions & 0 deletions src/sos/workflow_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,22 @@ def add_placeholder_worker(self, runnable, socket):
def push_to_queue(self, runnable, spec):
self.step_queue[runnable] = spec

def send_to_proc(self, proc):
master_port = proc.ctrl_socket.recv_pyobj()
if not self.step_queue:
proc.ctrl_socket.send(None)
return
runnable, spec = self.step_queue.popitem()
# spec is already pickled to "freeze" them
proc.ctrl_socket.send(spec)

master_socket = create_socket(env.zmq_context, zmq.PAIR, 'pair socket for step worker')
master_socket.connect(f'tcp://127.0.0.1:{master_port}')
# we need to create a separaate ProcInfo to keep track of the step
self.procs.append(
ProcInfo(worker=proc.worker, ctrl_socket=proc.ctrl_socket,
socket=master_socket, port=master_port, step=runnable))

def send_new(self):
if not self.step_queue:
return False
Expand Down Expand Up @@ -1089,6 +1105,12 @@ def i_am():
continue
else:
raise RuntimeError('A worker has been killed. Quitting.')

if proc.ctrl_socket is not None and proc.ctrl_socket.poll(0):
# if the ctrl_socket has a message, it means the worker is asking for more
# job proactively
manager.send_to_proc(proc)

# receieve something from the worker
res = proc.socket.recv_pyobj(zmq.NOBLOCK)
runnable = proc.step
Expand Down

0 comments on commit 01571eb

Please sign in to comment.