Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using coroutines to execute multiple steps and subworkflows in a single worker #1218

Closed
BoPeng opened this issue Feb 21, 2019 · 18 comments
Closed

Comments

@BoPeng
Copy link
Contributor

BoPeng commented Feb 21, 2019

#1056

Right now our worker is executed like this:

  1. worker executes a step or nested workflow,
  2. worker sends pending requests to controller, waiting for a response
  3. controller sends the requests to an idle worker or start a new worker to work on the request
  4. controller sends the result back to the worker
  5. worker continues to run

The problem here is that there is potentially a large number of workers waiting for their requests to be satisfied.

This ticket proposes the use of Python coroutines/generator to resolve this problem. The technique is demonstrated in the following example:

def step():
    requested = yield 'request 1'
    return 'step completed'

def run_step():
    try:
        runner = step()
        pending = next(runner)
        while True:
            # sends result back to the generator AND fetch the next one
            runner.send('resolved')
    except StopIteration as e:
        # returned value is the value of StopIterator
        return e.value

That is to say,

  1. step executor becomes a generator function
  2. Instead of sending requests directly to controller, the step executor yield the request to the runner.
  3. the runner receives the request and send to the controller.
  4. the runner can at the same time request more work for the worker and starts running
  5. the runner receives result from the controller and stores it (a stack)
  6. Once the additional work has been completed (stack popped), the returned result will be send back to the generator function so that the original step can continue
@BoPeng
Copy link
Contributor Author

BoPeng commented Feb 21, 2019

In case that the generator function is a subroutine, the caller function has to wrap it as

waiter = subfunc()
requested = next(waiter)
try:
    res= yield requested 
    requested = waiter.send(res)
except StopIterator as e:
    result = e.value

@gaow
Copy link
Member

gaow commented Feb 22, 2019

Looks like it will this help with this situation?

@BoPeng
Copy link
Contributor Author

BoPeng commented Feb 22, 2019

Yes, it will reduce the number of processes (strict to -j) and therefore extra "base memory" for each process.

@BoPeng
Copy link
Contributor Author

BoPeng commented Feb 24, 2019

@gaow The worker branch is ready for testing. Basically, -j is enforced (except for blocking nested workflows #1220) so you will have at most -j + 1 processes (1 being the master process) and the number of open files should be in check as well (#1208).

@gaow
Copy link
Member

gaow commented Feb 24, 2019

Okay I did not use -v3 but here is an immediate bug:

WARNING: Failed to respond controller ['worker_available', False]: '<' not supported between instances of 'int' and 'NoneType'

I'm waiting a bit more to see the outcome. Currently the process seems hanging.

@gaow
Copy link
Member

gaow commented Feb 24, 2019

So it appears to hang. I cancelled it,

^CTRACE: 17703 : closes socket with handler 42 (2 left)
TRACE: Stop controller from 17703
  File "/scratch/midway2/gaow/miniconda3/lib/python3.6/site-packages/sos/__main__.py", line 406, in cmd_run
    executor.run(args.__targets__, mode='dryrun' if args.dryrun else 'run')
Traceback (most recent call last):
  File "/scratch/midway2/gaow/miniconda3/lib/python3.6/site-packages/sos/workflow_executor.py", line 273, in run
    return self.run_as_master(targets=targets, mode=mode)
  File "/scratch/midway2/gaow/miniconda3/lib/python3.6/site-packages/sos/workflow_executor.py", line 1302, in run_as_master
    if not manager.send_to_worker():
  File "/scratch/midway2/gaow/miniconda3/lib/python3.6/site-packages/sos/workflow_executor.py", line 142, in send_to_worker
    master_port = request_answer_from_controller(['worker_available', self._is_next_job_blocking()])
  File "/scratch/midway2/gaow/miniconda3/lib/python3.6/site-packages/sos/controller.py", line 58, in request_answer_from_controller
    return env.master_request_socket.recv_pyobj()
  File "/scratch/midway2/gaow/miniconda3/lib/python3.6/site-packages/zmq/sugar/socket.py", line 623, in recv_pyobj
    msg = self.recv(flags)
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/scratch/midway2/gaow/miniconda3/lib/python3.6/site-packages/sos/__main__.py", line 406, in cmd_run
    executor.run(args.__targets__, mode='dryrun' if args.dryrun else 'run')
  File "/scratch/midway2/gaow/miniconda3/lib/python3.6/site-packages/sos/workflow_executor.py", line 285, in run
    request_answer_from_controller(['done', succ])
  File "/scratch/midway2/gaow/miniconda3/lib/python3.6/site-packages/sos/controller.py", line 57, in request_answer_from_controller
    env.master_request_socket.send_pyobj(msg)
  File "/scratch/midway2/gaow/miniconda3/lib/python3.6/site-packages/zmq/sugar/socket.py", line 603, in send_pyobj
    return self.send(msg, flags=flags, **kwargs)
  File "/scratch/midway2/gaow/miniconda3/lib/python3.6/site-packages/zmq/sugar/socket.py", line 392, in send
    return super(Socket, self).send(data, flags=flags, copy=copy, track=track)
zmq.error.ZMQError: Operation cannot be accomplished in current state

and ran it for another time using -v 4, I get:

DEBUG: Set __step_output__ = Unspecified
TRACE: Set __step_output__ to Unspecified with labels []
TRACE: Set SOS_VERSION to '0.18.8' of type str
TRACE: Set step_name to 'a_oracle_generator' of type str
TRACE: Set __null_func__ to <function __null_func__ at 0x7f9d0a... of type function
TRACE: Analyzing a_oracle_generator
DEBUG: Adding step a_oracle_generator with output finemap_output/oracle_generator/oracle_generator_1.pkl to resolve target sos_step("a_oracle_generator")
DEBUG: Master execute 6a4b172f7f1f9dea from DAG
DEBUG: Master execute c5a58cb169500bd5 from DAG
TRACE: 17703 pair socket for step worker: new socket of type PAIR with handler 42 (6 total)
WARNING: Failed to respond controller ['worker_available', False]: '<' not supported between instances of 'int' and 'NoneType'
TRACE: WORKER SoS_Worker-3 (28037) quits after receiving None.
TRACE: 17703 : closes socket with handler 16 (5 left)
TRACE: 17703 : closes socket with handler 19 (4 left)
TRACE: 17703 : closes socket with handler 22 (3 left)
TRACE: controller stopped 17703
TRACE: 28037 worker master: closes socket with handler 38 (4 left)
TRACE: 28037 : closes socket with handler 35 (3 left)
TRACE: Disconnecting sockets from 28037
TRACE: terminate context at 28037

@BoPeng
Copy link
Contributor Author

BoPeng commented Feb 24, 2019

Is this the latest? I think I fixed something after I sent the message.

@gaow
Copy link
Member

gaow commented Feb 24, 2019

Yes it is the latest version

[MW] git pull
Already up-to-date.
[MW] git status
On branch worker

@BoPeng
Copy link
Contributor Author

BoPeng commented Feb 24, 2019

Strange, that function is simple, and the only < is this one, could you use a env.logger.error in that function to check, say, if self._max_workers is None?

@gaow
Copy link
Member

gaow commented Feb 24, 2019

Oh okay it was my bad. I messed up a local copy a few days ago when I was trying to figure out if memory usage was related to number of jobs. ... Now the jobs are running. I have not limited its memory usage yet, just to check if it works or hangs. Will then submit to a compute node and monitor memory usage there if all works well for the current run.

@BoPeng
Copy link
Contributor Author

BoPeng commented Feb 24, 2019

Great. The branch now passes travis and your hang test so any new failure should be recorded as tests.

@gaow
Copy link
Member

gaow commented Feb 25, 2019

The workflow no long hangs, which is good. I still get this though:

ERROR: [4388f477-84ba-4092-b04f-9c04822cdef9]: [mnm_mid_het]: Failed to get results for tasks 332724febe15918b, 7351cb2ffb22bed3, bcf4a8993890e2b7, 112ac60e197346ae, efce6191fcc055ce, e356dd7800cd610b, 87496de5926fc1ba, a6258964be70d4b5, 9688fd031ebe3af5, 2fd7a9812bb4d7df, dcfc4412a373eca3, 0498006aba0ffe1e, 892988926ed5b4da, fffe38cb4db0428a, bbe790a42ef7bf82, 35cd51f372453ee1, 583cc6f8953c9654, 0a49e09fc0d69a11, ddbf4985565f595e, bf0c914cf2260e0a, cc22e01dd55c28b7, ce3fad41b855c9b2, df52ea2f6614b62c, e6dbccf69baee4cc, a7a9470e555284e9, a21acd967423fcc8, 15b7152ec9e3b1f4, afeb80d3412438f3, 967ec630e195f775, 4fc79b97f9d280c1, c6e1a9114aedda16, 6791044fd8407b53, 627766a36d8b00a8, a7069c9f16039e62, f95d09873d3b5872, 892c5284fa0cd682, 7862883a7f39931a, 6fe05f63e0605f85, caaf2b98d771248d, 4ba1267e015495d1, 607e22128b1a4e82, 306593e3f4420c9e, c9415cd80411a877, 1bced3eb715f4ac7, c7e67a7558500f84, ce36b7c3a6632f75, 2974f39ca1e96a90, 2eba76f6185eb56d, 00437260580b3394, 66b2bfc7a4161b44

so the workflow did not finish. sos status suggests status missing. But this might well be random cluster errors? Any insights? I'm submitting it again hoping this time everything finishes.

@gaow
Copy link
Member

gaow commented Feb 25, 2019

Got this at rerun:

 [Errno 2] No such file or directory: '/home/gaow/.sos/tasks/M50_b8970d9d427d0c9d.task'

It seems all the output are generated. This failed towards the end. Not sure if it can be reproduced. Doing it again now.

@BoPeng
Copy link
Contributor Author

BoPeng commented Feb 25, 2019

Missing means the task file disappears and the rerun confirms it. Rerun should perhaps recreate it, not sure why this happened.

@gaow
Copy link
Member

gaow commented Feb 25, 2019

Unfortunately it still hangs I think at the end of the pipeline though. I've waited for it overnight.. But previously it hangs for every step of the pipeline so current behavior is better. Also when I check the number of processes it seems correct. I see you've pushed another commit. I'll check it out and try again. If it still problematic I'll see if I can create an example for you to test on cluster.

@BoPeng
Copy link
Contributor Author

BoPeng commented Feb 25, 2019

Yes, something is going on here because travis sometimes hang, but the offending test passes ok locally. Let me see if I can spot something.

@gaow
Copy link
Member

gaow commented Feb 26, 2019

Using current master on my example I still get all these hangs:

WARNING: PENDING WITH PORT [48779] AT NUM_PENDING 1: 2 workers (of which 0 is blocking), 156 requested, 156 processed
WARNING: PENDING WITH PORT [38269] AT NUM_PENDING 1: 2 workers (of which 0 is blocking), 156 requested, 156 processed
WARNING: PENDING WITH PORT [48779] AT NUM_PENDING 1: 2 workers (of which 0 is blocking), 156 requested, 156 processed

But according to your post on another ticket this should not happen now (there should only be the DAG problem)?

@BoPeng
Copy link
Contributor Author

BoPeng commented Feb 26, 2019

In theory the inter-locking problem should be resolved but there might be bugs that prevents jobs from quitting (e.g. a socket is closed and no longer listen to results). I am still testing...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants