Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection Issues for Larger Flows With Hundreds of Tasks #7636

Closed
4 tasks done
biancaines opened this issue Nov 22, 2022 · 1 comment
Closed
4 tasks done

Connection Issues for Larger Flows With Hundreds of Tasks #7636

biancaines opened this issue Nov 22, 2022 · 1 comment
Labels
bug Something isn't working cloud Related to Prefect Cloud needs:research Blocked by investigation into feasibility and cause

Comments

@biancaines
Copy link
Contributor

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn't find it.
  • I searched the Prefect documentation for this issue.
  • I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

A user reported that they are experiencing connectivity issues while running a flow with 700+ tasks. The flow produces a few different exception types from the logs (shown in the Error section of this issue), which occur irregularly. Once the exceptions are raised, the running tasks crash and ultimately the flow fails.

Their description of the flow is as follows:

The flow submits a large number of independent tasks that in turn submit compute jobs to an external service and poll periodically for status. Since the tasks spend most of their time sleeping, we decided to implement this using the ConcurrentTaskRunner and increasing the thread count in order to prevent deadlocks. Prefect cloud concurrency limits are in place to limit the number of running task to 100 and individual tasks take anywhere between 1 and 15 minutes. This results as expected in a lot of propose_state calls with "wait" response and recursive retry. However eventually, an exception occurs after a large number of retries during the set_state API call. Please find below the tracebacks from different flow runs with the exact same configuration. Also with this exact config, some flow runs succeeded to complete. Possibly, this is related to #7442.

Reproduction

'''This minimal reproducible example will trigger the aforementioned exceptions after some time has passed. The concurrency limit for the tag 'test-limit' is set to 5.'''

from prefect import flow, task, get_run_logger
from prefect.utilities.asyncutils import run_async_from_worker_thread
import asyncio
from anyio import to_thread
from prefect.utilities.asyncutils import sync_compatible


def wait_async_in_thread(t):
    """Simulating submission of compute job to external service and wait for completion."""
    run_async_from_worker_thread(asyncio.sleep, t)


@task
def start_task():
    """Some start task, fetching config data from a DB"""
    logger = get_run_logger()
    sleep_time = 1
    logger.info(f"sleeping {sleep_time}")
    wait_async_in_thread(60*sleep_time)
    logger.info("Done sleeping")
    return


@task(tags=["test-limit"])
def task_1(input):
    """1st compute stage per item"""
    logger = get_run_logger()
    sleep_time = 5
    logger.info(f"sleeping {sleep_time}m")
    wait_async_in_thread(60*sleep_time)
    logger.info("Done sleeping")
    return


@task(tags=["test-limit"])
def task_2(input):
    """2nd compute stage per item"""
    logger = get_run_logger()
    sleep_time = 10
    logger.info(f"sleeping {sleep_time}m")
    wait_async_in_thread(60*sleep_time)
    logger.info("Done sleeping")
    return


@sync_compatible
async def deactivate_thread_limit() -> None:
    """Deactivate anyio thread limit by setting it to inf.

    This is a temporary workaround until Prefect's tasks concurrency isn't limited by threads anymore.
    """
    to_thread.current_default_thread_limiter().total_tokens = float("inf")


@flow(name="example_flow")
def example_flow():
    deactivate_thread_limit()
    start = start_task.submit()
    futures = []
    for _ in range(0, 700):
        futures.append(task_2.submit(task_1.submit(start)))


if __name__ == "__main__":
    example_flow()

Error

Crash detected! Execution was interrupted by an unexpected exception: Traceback (most recent call last):
  File "<...>/python3.8/site-packages/httpcore/_async/http2.py", line 108, in handle_async_request
    await self._send_request_headers(request=request, stream_id=stream_id)
  File "<...>/python3.8/site-packages/httpcore/_async/http2.py", line 205, in _send_request_headers
    self._h2_state.send_headers(stream_id, headers, end_stream=end_stream)
  File "<...>/python3.8/site-packages/h2/connection.py", line 761, in send_headers
    raise TooManyStreamsError(
h2.exceptions.TooManyStreamsError: Max outbound streams is 100, 100 open


Crash detected! Execution was interrupted by an unexpected exception: Traceback (most recent call last):
  File "<...>/python3.8/site-packages/h2/connection.py", line 224, in process_input
    func, target_state = self._transitions[(self.state, input_)]
KeyError: (<ConnectionState.CLOSED: 3>, <ConnectionInputs.SEND_HEADERS: 0>) 
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "<...>/python3.8/site-packages/httpcore/_async/http2.py", line 108, in handle_async_request
    await self._send_request_headers(request=request, stream_id=stream_id)
  File "<...>/python3.8/site-packages/httpcore/_async/http2.py", line 205, in _send_request_headers
    self._h2_state.send_headers(stream_id, headers, end_stream=end_stream)
  File "<...>/python3.8/site-packages/h2/connection.py", line 766, in send_headers
    self.state_machine.process_input(ConnectionInputs.SEND_HEADERS)
  File "<...>/python3.8/site-packages/h2/connection.py", line 228, in process_input
    raise ProtocolError(
h2.exceptions.ProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in state ConnectionState.CLOSED

Crash detected! Execution was interrupted by an unexpected exception: Traceback (most recent call last):
  File "<...>/python3.8/site-packages/h2/connection.py", line 224, in process_input
    func, target_state = self._transitions[(self.state, input_)]
KeyError: (<ConnectionState.CLOSED: 3>, <ConnectionInputs.RECV_HEADERS: 9>)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "<...>/python3.8/site-packages/httpcore/_async/http2.py", line 114, in handle_async_request
    status, headers = await self._receive_response(
  File "<...>/python3.8/site-packages/httpcore/_async/http2.py", line 231, in _receive_response
    event = await self._receive_stream_event(request, stream_id)
  File "<...>/python3.8/site-packages/httpcore/_async/http2.py", line 262, in _receive_stream_event
    await self._receive_events(request, stream_id)
  File "<...>/python3.8/site-packages/httpcore/_async/http2.py", line 283, in _receive_events
    events = await self._read_incoming_data(request)
  File "<...>/python3.8/site-packages/httpcore/_async/http2.py", line 345, in _read_incoming_data
    events = self._h2_state.receive_data(data)
  File "<...>/python3.8/site-packages/h2/connection.py", line 1463, in receive_data
    events.extend(self._receive_frame(frame))
  File "<...>/python3.8/site-packages/h2/connection.py", line 1487, in _receive_frame
    frames, events = self._frame_dispatch_table[frame.__class__](frame)
  File "<...>/python3.8/site-packages/h2/connection.py", line 1555, in _receive_headers_frame
    events = self.state_machine.process_input(
  File "<...>/python3.8/site-packages/h2/connection.py", line 228, in process_input
    raise ProtocolError(
h2.exceptions.ProtocolError: Invalid input ConnectionInputs.RECV_HEADERS in state ConnectionState.CLOSED


Crash detected! Execution was interrupted by an unexpected exception: Traceback (most recent call last):
  File "<...>/python3.8/site-packages/h2/connection.py", line 224, in process_input
    func, target_state = self._transitions[(self.state, input_)]
KeyError: (<ConnectionState.CLOSED: 3>, <ConnectionInputs.RECV_PING: 14>)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "<...>/python3.8/site-packages/httpcore/_async/http2.py", line 114, in handle_async_request
    status, headers = await self._receive_response(
  File "<...>/python3.8/site-packages/httpcore/_async/http2.py", line 231, in _receive_response
    event = await self._receive_stream_event(request, stream_id)
  File "<...>/python3.8/site-packages/httpcore/_async/http2.py", line 262, in _receive_stream_event
    await self._receive_events(request, stream_id)
  File "<...>/python3.8/site-packages/httpcore/_async/http2.py", line 283, in _receive_events
    events = await self._read_incoming_data(request)
  File "<...>/python3.8/site-packages/httpcore/_async/http2.py", line 345, in _read_incoming_data
    events = self._h2_state.receive_data(data)
  File "<...>/python3.8/site-packages/h2/connection.py", line 1463, in receive_data
    events.extend(self._receive_frame(frame))
  File "<...>/python3.8/site-packages/h2/connection.py", line 1487, in _receive_frame
    frames, events = self._frame_dispatch_table[frame.__class__](frame)
  File "<...>/python3.8/site-packages/h2/connection.py", line 1760, in _receive_ping_frame
    events = self.state_machine.process_input(
  File "<...>/python3.8/site-packages/h2/connection.py", line 228, in process_input
    raise ProtocolError(
h2.exceptions.ProtocolError: Invalid input ConnectionInputs.RECV_PING in state ConnectionState.CLOSED

Versions

Ubuntu 22.04.1

Python 3.8.15

Prefect
Version:             2.6.5
API version:         0.8.3
Python version:      3.8.15
Git commit:          9fc2658f
Built:               Thu, Oct 27, 2022 2:24 PM
OS/Arch:             linux/x86_64
Server type:         cloud

Additional context

No response

@biancaines biancaines added bug Something isn't working v2 cloud Related to Prefect Cloud labels Nov 22, 2022
@biancaines biancaines changed the title Connection Issues for Larger Flows Connection Issues for Larger Flows With Hundreds of Tasks Nov 22, 2022
@zanieb zanieb added status:accepted needs:research Blocked by investigation into feasibility and cause and removed status:triage labels Nov 23, 2022
@zanieb
Copy link
Contributor

zanieb commented Mar 24, 2023

Duplicate of #7442

@zanieb zanieb marked this as a duplicate of #7442 Mar 24, 2023
@zanieb zanieb closed this as completed Mar 24, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working cloud Related to Prefect Cloud needs:research Blocked by investigation into feasibility and cause
Projects
None yet
Development

No branches or pull requests

2 participants