diff --git a/src/xdist/iso_scheduling_plugin.py b/src/xdist/iso_scheduling_plugin.py index 6babaf9d..2c4f5ad0 100644 --- a/src/xdist/iso_scheduling_plugin.py +++ b/src/xdist/iso_scheduling_plugin.py @@ -34,40 +34,39 @@ is not recommended: importing fixtures into a module will register them in pytest as defined in that module". """ + from __future__ import annotations import contextlib import functools -import logging import json +import logging import pathlib from typing import TYPE_CHECKING import filelock import pytest -from xdist.iso_scheduling_utils import ( - IsoSchedulingFixture, - DistributedSetupCoordinator, - DistributedSetupContext, - DistributedTeardownContext, - CoordinationTimeoutError -) +from xdist.iso_scheduling_utils import CoordinationTimeoutError +from xdist.iso_scheduling_utils import DistributedSetupContext +from xdist.iso_scheduling_utils import DistributedSetupCoordinator +from xdist.iso_scheduling_utils import DistributedTeardownContext +from xdist.iso_scheduling_utils import IsoSchedulingFixture + if TYPE_CHECKING: - from collections.abc import Callable, Generator + from collections.abc import Callable + from collections.abc import Generator from typing import Optional _LOGGER = logging.getLogger(__name__) -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def iso_scheduling( - tmp_path_factory: pytest.TempPathFactory, - testrun_uid: str, - worker_id: str - ) -> IsoSchedulingFixture: + tmp_path_factory: pytest.TempPathFactory, testrun_uid: str, worker_id: str +) -> IsoSchedulingFixture: """A session-scoped pytest fixture for coordinating setup/teardown of test scope/class which is executing under isoscope scheduling. @@ -154,9 +153,9 @@ def test_case3(self) yields an instance of `DistributedSetupCoordinator` for the current Pytest Session. """ - return _IsoSchedulingFixtureImpl(tmp_path_factory=tmp_path_factory, - testrun_uid=testrun_uid, - worker_id=worker_id) + return _IsoSchedulingFixtureImpl( + tmp_path_factory=tmp_path_factory, testrun_uid=testrun_uid, worker_id=worker_id + ) class _IsoSchedulingFixtureImpl(IsoSchedulingFixture): @@ -166,12 +165,12 @@ class _IsoSchedulingFixtureImpl(IsoSchedulingFixture): An instance of _IsoSchedulingFixtureImpl is returned by our pytest fixture `iso_scheduling`. """ + # pylint: disable=too-few-public-methods - def __init__(self, - tmp_path_factory: pytest.TempPathFactory, - testrun_uid: str, - worker_id: str): + def __init__( + self, tmp_path_factory: pytest.TempPathFactory, testrun_uid: str, worker_id: str + ): """ :param tmp_path_factory: pytest interface for temporary directories. :param testrun_uid: Unique id of the current test run. This value is @@ -186,9 +185,8 @@ def __init__(self, @contextlib.contextmanager def coordinate_setup_teardown( - self, - setup_request: pytest.FixtureRequest - ) -> Generator[DistributedSetupCoordinator, None, None]: + self, setup_request: pytest.FixtureRequest + ) -> Generator[DistributedSetupCoordinator, None, None]: """Context manager that yields an instance of `DistributedSetupCoordinator` for distributed coordination of Setup and Teardown. @@ -204,7 +202,8 @@ def coordinate_setup_teardown( setup_request=setup_request, tmp_path_factory=self._tmp_path_factory, testrun_uid=self._testrun_uid, - worker_id=self._worker_id) + worker_id=self._worker_id, + ) # Yield control to the managed code block yield coordinator @@ -221,13 +220,16 @@ class _DistributedSetupCoordinatorImpl(DistributedSetupCoordinator): `iso_scheduling` fixture instead! """ - _DISTRIBUTED_SETUP_ROOT_DIR_LINK_NAME = 'distributed_setup' - def __init__(self, - setup_request: pytest.FixtureRequest, - tmp_path_factory: pytest.TempPathFactory, - testrun_uid: str, - worker_id: str): + _DISTRIBUTED_SETUP_ROOT_DIR_LINK_NAME = "distributed_setup" + + def __init__( + self, + setup_request: pytest.FixtureRequest, + tmp_path_factory: pytest.TempPathFactory, + testrun_uid: str, + worker_id: str, + ): """ :param setup_request: Value of the pytest `request` fixture obtained directly by the calling setup-teardown fixture. @@ -246,9 +248,10 @@ def __init__(self, # directory. `tmp_path_factory.getbasetemp().parent` is common to all # workers in the current PyTest test run. self._root_context_base_dir: pathlib.Path = ( - tmp_path_factory.getbasetemp().parent - / self._DISTRIBUTED_SETUP_ROOT_DIR_LINK_NAME - / testrun_uid) + tmp_path_factory.getbasetemp().parent + / self._DISTRIBUTED_SETUP_ROOT_DIR_LINK_NAME + / testrun_uid + ) self._worker_id: str = worker_id @@ -256,10 +259,10 @@ def __init__(self, self._teardown_context: Optional[DistributedTeardownContext] = None def maybe_call_setup( - self, - setup_callback: Callable[[DistributedSetupContext], None], - timeout: float = DistributedSetupCoordinator.DEFAULT_TIMEOUT_SEC - ) -> None: + self, + setup_callback: Callable[[DistributedSetupContext], None], + timeout: float = DistributedSetupCoordinator.DEFAULT_TIMEOUT_SEC, + ) -> None: """Invoke the Setup callback only if distributed setup has not been performed yet from any other XDist worker for your test scope. Process-safe. @@ -284,8 +287,9 @@ def maybe_call_setup( """ # `maybe_call_setup()` may be called only once per instance of # `_SetupCoordinator` - assert self._setup_context is None, \ - f'maybe_call_setup()` already called {self._setup_context=}' + assert ( + self._setup_context is None + ), f"maybe_call_setup()` already called {self._setup_context=}" node_path = self._setup_request.node.path @@ -296,19 +300,20 @@ def maybe_call_setup( ) with _DistributedSetupCoordinationImpl.acquire_distributed_setup( - root_context_dir=root_context_dir, - worker_id=self._worker_id, - setup_request=self._setup_request, - timeout=timeout) as setup_context: + root_context_dir=root_context_dir, + worker_id=self._worker_id, + setup_request=self._setup_request, + timeout=timeout, + ) as setup_context: self._setup_context = setup_context if self._setup_context.distributed_setup_allowed: setup_callback(self._setup_context) def maybe_call_teardown( - self, - teardown_callback: Callable[[DistributedTeardownContext], None], - timeout: float = DistributedSetupCoordinator.DEFAULT_TIMEOUT_SEC - ) -> None: + self, + teardown_callback: Callable[[DistributedTeardownContext], None], + timeout: float = DistributedSetupCoordinator.DEFAULT_TIMEOUT_SEC, + ) -> None: """Invoke the Teardown callback only in when called in the context of the final XDist Worker process to have finished the execution of the tests for your test scope. Process-safe. @@ -331,34 +336,36 @@ def maybe_call_teardown( """ # Make sure `maybe_call_setup()` was already called on this instance # of `_SetupCoordinator` - assert self._setup_context is not None, \ - f'maybe_call_setup() not called yet {self._setup_context=}' + assert ( + self._setup_context is not None + ), f"maybe_call_setup() not called yet {self._setup_context=}" # Make sure `maybe_call_teardown()` hasn't been called on this instance # of `_SetupCoordinator` yet - assert self._teardown_context is None, \ - f'maybe_call_teardown() already called {self._teardown_context=}' + assert ( + self._teardown_context is None + ), f"maybe_call_teardown() already called {self._teardown_context=}" with _DistributedSetupCoordinationImpl.acquire_distributed_teardown( - setup_context=self._setup_context, - timeout=timeout) as teardown_context: + setup_context=self._setup_context, timeout=timeout + ) as teardown_context: self._teardown_context = teardown_context if self._teardown_context.distributed_teardown_allowed: teardown_callback(self._teardown_context) def _map_file_lock_exception(f: Callable): - """Decorator: map `FileLock` exceptions of interest to our own exceptions. - """ + """Decorator: map `FileLock` exceptions of interest to our own exceptions.""" + @functools.wraps(f) def wrapper(*args, **kwargs): try: return f(*args, **kwargs) except filelock.Timeout as err: raise CoordinationTimeoutError( - f'Another instance of this test scope/class is holding the ' - f'lock too long or timeout value is too short: {err}') \ - from err + f"Another instance of this test scope/class is holding the " + f"lock too long or timeout value is too short: {err}" + ) from err return wrapper @@ -367,26 +374,28 @@ class _DistributedSetupCoordinationImpl: """Low-level implementation of Context Managers for Coordinating Distributed Setup and Teardown for users of isoscope scheduling. """ - _ROOT_STATE_FILE_NAME = 'root_state.json' - _ROOT_LOCK_FILE_NAME = 'lock' + + _ROOT_STATE_FILE_NAME = "root_state.json" + _ROOT_LOCK_FILE_NAME = "lock" class DistributedState: - """State of the Distributed Setup-Teardown Coordination. - """ + """State of the Distributed Setup-Teardown Coordination.""" + def __init__(self, setup_count, teardown_count): self.setup_count = setup_count self.teardown_count = teardown_count def __repr__(self): - return f'<{self.__class__.__qualname__}: ' \ - f'setup_count={self.setup_count}; ' \ - f'teardown_count={self.teardown_count}>' + return ( + f"<{self.__class__.__qualname__}: " + f"setup_count={self.setup_count}; " + f"teardown_count={self.teardown_count}>" + ) @classmethod def load_from_file_path( - cls, - state_file_path: pathlib.Path - ) -> _DistributedSetupCoordinationImpl.DistributedState: + cls, state_file_path: pathlib.Path + ) -> _DistributedSetupCoordinationImpl.DistributedState: """Load the state instance from the given file path. :param state_file_path: @@ -408,8 +417,8 @@ def as_json_kwargs_dict(self) -> dict: ``` """ return { - 'setup_count': self.setup_count, - 'teardown_count': self.teardown_count + "setup_count": self.setup_count, + "teardown_count": self.teardown_count, } def save_to_file_path(self, state_file_path: pathlib.Path): @@ -424,12 +433,12 @@ def save_to_file_path(self, state_file_path: pathlib.Path): @contextlib.contextmanager @_map_file_lock_exception def acquire_distributed_setup( - cls, - root_context_dir: pathlib.Path, - worker_id: str, - setup_request: pytest.FixtureRequest, - timeout: float - ) -> Generator[DistributedSetupContext, None, None]: + cls, + root_context_dir: pathlib.Path, + worker_id: str, + setup_request: pytest.FixtureRequest, + timeout: float, + ) -> Generator[DistributedSetupContext, None, None]: """Low-level implementation of Context Manager for Coordinating Distributed Setup for isoscope scheduling. @@ -450,32 +459,33 @@ def acquire_distributed_setup( setup_allowed=False, root_context_dir=root_context_dir, worker_id=worker_id, - setup_request=setup_request) + setup_request=setup_request, + ) state_file_path = cls._get_root_state_file_path(root_context_dir) # Acquire resource with filelock.FileLock( - str(cls._get_root_lock_file_path(root_context_dir)), - timeout=timeout): + str(cls._get_root_lock_file_path(root_context_dir)), timeout=timeout + ): if state_file_path.is_file(): - state = cls.DistributedState.load_from_file_path( - state_file_path) + state = cls.DistributedState.load_from_file_path(state_file_path) # We never save state with setup_count <= 0 - assert state.setup_count > 0, \ - f'acquire_distributed_setup: non-positive setup ' \ - f'count read from state file - {state_file_path=}; ' \ - f'{worker_id=}; {state}' + assert state.setup_count > 0, ( + f"acquire_distributed_setup: non-positive setup " + f"count read from state file - {state_file_path=}; " + f"{worker_id=}; {state}" + ) # No Teardowns should be executing before all Setups # complete - assert state.teardown_count == 0, \ - f'acquire_distributed_setup: non-zero teardown ' \ - f'count read from state file - {state_file_path=}; ' \ - f'{worker_id=}; {state}' + assert state.teardown_count == 0, ( + f"acquire_distributed_setup: non-zero teardown " + f"count read from state file - {state_file_path=}; " + f"{worker_id=}; {state}" + ) else: # State file not created yet - state = cls.DistributedState(setup_count=0, - teardown_count=0) + state = cls.DistributedState(setup_count=0, teardown_count=0) state.setup_count += 1 @@ -485,8 +495,9 @@ def acquire_distributed_setup( # Yield control to the managed code block # _LOGGER.info( # pylint: disable=logging-fstring-interpolation - f'acquire_distributed_setup: yielding control to ' - f'managed block - {worker_id=}; {setup_context=}') + f"acquire_distributed_setup: yielding control to " + f"managed block - {worker_id=}; {setup_context=}" + ) yield setup_context # @@ -501,10 +512,8 @@ def acquire_distributed_setup( @contextlib.contextmanager @_map_file_lock_exception def acquire_distributed_teardown( - cls, - setup_context: DistributedSetupContext, - timeout: float - ) -> Generator[DistributedTeardownContext, None, None]: + cls, setup_context: DistributedSetupContext, timeout: float + ) -> Generator[DistributedTeardownContext, None, None]: """Low-level implementation of Context Manager for Coordinating Distributed Teardown for the isoscope scheduling. @@ -519,8 +528,8 @@ def acquire_distributed_teardown( # Before control passes to the managed code block # teardown_context = DistributedTeardownContext( - teardown_allowed=False, - setup_context=setup_context) + teardown_allowed=False, setup_context=setup_context + ) # NOTE: Friend-of-class protected member access root_context_dir = teardown_context._root_context_dir # pylint: disable=protected-access @@ -531,36 +540,40 @@ def acquire_distributed_teardown( # Acquire resource with filelock.FileLock( - str(cls._get_root_lock_file_path(root_context_dir)), - timeout=timeout): + str(cls._get_root_lock_file_path(root_context_dir)), timeout=timeout + ): if state_file_path.is_file(): - state = cls.DistributedState.load_from_file_path( - state_file_path) + state = cls.DistributedState.load_from_file_path(state_file_path) assert state.setup_count > 0, ( - f'acquire_distributed_teardown: non-positive ' - f'setup_count read from state file - {state_file_path=}; ' - f'{worker_id=}; {state.setup_count=} <= 0; {state}') + f"acquire_distributed_teardown: non-positive " + f"setup_count read from state file - {state_file_path=}; " + f"{worker_id=}; {state.setup_count=} <= 0; {state}" + ) assert state.teardown_count < state.setup_count, ( - f'acquire_distributed_teardown: teardown_count ' - f'already >= setup_count read from state file - ' - f'{state_file_path=}; {worker_id=}; ' - f'{state.teardown_count=} >= {state.setup_count=}') + f"acquire_distributed_teardown: teardown_count " + f"already >= setup_count read from state file - " + f"{state_file_path=}; {worker_id=}; " + f"{state.teardown_count=} >= {state.setup_count=}" + ) else: raise RuntimeError( - f'acquire_distributed_teardown: state file not found: ' - f'{state_file_path=}; {worker_id=}') + f"acquire_distributed_teardown: state file not found: " + f"{state_file_path=}; {worker_id=}" + ) state.teardown_count += 1 teardown_context.distributed_teardown_allowed = ( - state.teardown_count == state.setup_count) + state.teardown_count == state.setup_count + ) # # Yield control to the managed code block # _LOGGER.info( # pylint: disable=logging-fstring-interpolation - f'acquire_distributed_teardown: yielding control to ' - f'managed block - {worker_id=}; {teardown_context=}') + f"acquire_distributed_teardown: yielding control to " + f"managed block - {worker_id=}; {teardown_context=}" + ) yield teardown_context # @@ -572,9 +585,7 @@ def acquire_distributed_teardown( state.save_to_file_path(state_file_path) @classmethod - def _get_root_state_file_path( - cls, - root_state_dir: pathlib.Path) -> pathlib.Path: + def _get_root_state_file_path(cls, root_state_dir: pathlib.Path) -> pathlib.Path: """Return the path of the file for storing the root state, creating all parent directories if they don't exist yet. @@ -585,9 +596,7 @@ def _get_root_state_file_path( return root_state_dir / cls._ROOT_STATE_FILE_NAME @classmethod - def _get_root_lock_file_path( - cls, - root_lock_dir: pathlib.Path) -> pathlib.Path: + def _get_root_lock_file_path(cls, root_lock_dir: pathlib.Path) -> pathlib.Path: """Return the path of the lock file, creating all parent directories if they don't exist yet. diff --git a/src/xdist/iso_scheduling_utils.py b/src/xdist/iso_scheduling_utils.py index 73067dcd..5f0a3ead 100644 --- a/src/xdist/iso_scheduling_utils.py +++ b/src/xdist/iso_scheduling_utils.py @@ -28,14 +28,18 @@ See also `iso_scheduling_plugin.py` for fixtures specific to isoscope scheduling. """ + from __future__ import annotations import abc import pathlib from typing import TYPE_CHECKING + if TYPE_CHECKING: - from collections.abc import Callable, Generator + from collections.abc import Callable + from collections.abc import Generator + import pytest @@ -51,13 +55,13 @@ class IsoSchedulingFixture(abc.ABC): which yields an instance of the implementation of the `DistributedSetupCoordinator` interface. """ + # pylint: disable=too-few-public-methods @abc.abstractmethod def coordinate_setup_teardown( - self, - setup_request: pytest.FixtureRequest - ) -> Generator[DistributedSetupCoordinator, None, None]: + self, setup_request: pytest.FixtureRequest + ) -> Generator[DistributedSetupCoordinator, None, None]: """Context manager that yields an instance of `DistributedSetupCoordinator` for distributed coordination of Setup and Teardown. @@ -153,9 +157,10 @@ def test_case3(self) @abc.abstractmethod def maybe_call_setup( - self, - setup_callback: Callable[[DistributedSetupContext], None], - timeout: float = DEFAULT_TIMEOUT_SEC) -> None: + self, + setup_callback: Callable[[DistributedSetupContext], None], + timeout: float = DEFAULT_TIMEOUT_SEC, + ) -> None: """Invoke the Setup callback only if distributed setup has not been performed yet from any other XDist worker for your test scope. Process-safe. @@ -182,10 +187,10 @@ def maybe_call_setup( @abc.abstractmethod def maybe_call_teardown( - self, - teardown_callback: Callable[[DistributedTeardownContext], None], - timeout: float = DEFAULT_TIMEOUT_SEC - ) -> None: + self, + teardown_callback: Callable[[DistributedTeardownContext], None], + timeout: float = DEFAULT_TIMEOUT_SEC, + ) -> None: """Invoke the Teardown callback only in when called in the context of the final XDist Worker process to have finished the execution of the tests for your test scope. Process-safe. @@ -210,13 +215,13 @@ def maybe_call_teardown( class _DistributedSetupTeardownContextMixin: # pylint: disable=too-few-public-methods - """Mixin for `DistributedSetupContext` and DistributedTeardownContext`. - """ + """Mixin for `DistributedSetupContext` and DistributedTeardownContext`.""" + # Expected instance members in derived class _root_context_dir: pathlib.Path _setup_node_name: str - _CLIENT_SUBDIRECTORY_LINK = 'client-workspace' + _CLIENT_SUBDIRECTORY_LINK = "client-workspace" @property def client_dir(self) -> pathlib.Path: @@ -225,8 +230,7 @@ def client_dir(self) -> pathlib.Path: client-specific state, creating the directory if not already created. """ - client_dir_path = (self._root_context_dir - / self._CLIENT_SUBDIRECTORY_LINK) + client_dir_path = self._root_context_dir / self._CLIENT_SUBDIRECTORY_LINK client_dir_path.mkdir(parents=True, exist_ok=True) return client_dir_path @@ -237,11 +241,13 @@ class DistributedSetupContext(_DistributedSetupTeardownContextMixin): manager. """ - def __init__(self, - setup_allowed: bool, - root_context_dir: pathlib.Path, - worker_id: str, - setup_request: pytest.FixtureRequest): + def __init__( + self, + setup_allowed: bool, + root_context_dir: pathlib.Path, + worker_id: str, + setup_request: pytest.FixtureRequest, + ): """ :param setup_allowed: Whether distributed setup may be performed by the current process. @@ -271,11 +277,12 @@ def __init__(self, def __repr__(self) -> str: return ( - f'< {self.__class__.__name__}: ' - f'node_name={self._setup_node_name}; ' - f'setup_allowed={self.distributed_setup_allowed}; ' - f'worker_id={self.worker_id}; ' - f'client_dir={self.client_dir} >') + f"< {self.__class__.__name__}: " + f"node_name={self._setup_node_name}; " + f"setup_allowed={self.distributed_setup_allowed}; " + f"worker_id={self.worker_id}; " + f"client_dir={self.client_dir} >" + ) class DistributedTeardownContext(_DistributedSetupTeardownContextMixin): @@ -283,9 +290,7 @@ class DistributedTeardownContext(_DistributedSetupTeardownContextMixin): manager. """ - def __init__(self, - teardown_allowed: bool, - setup_context: DistributedSetupContext): + def __init__(self, teardown_allowed: bool, setup_context: DistributedSetupContext): """ :param teardown_allowed: Whether Distributed Teardown may be performed by the current process. @@ -310,8 +315,9 @@ def __init__(self, def __repr__(self) -> str: return ( - f'< {self.__class__.__name__}: ' - f'node_name={self._setup_node_name}; ' - f'teardown_allowed={self.distributed_teardown_allowed}; ' - f'worker_id={self.worker_id}; ' - f'client_dir={self.client_dir} >') + f"< {self.__class__.__name__}: " + f"node_name={self._setup_node_name}; " + f"teardown_allowed={self.distributed_teardown_allowed}; " + f"worker_id={self.worker_id}; " + f"client_dir={self.client_dir} >" + ) diff --git a/src/xdist/scheduler/isoscope.py b/src/xdist/scheduler/isoscope.py index 4f132423..b365415e 100644 --- a/src/xdist/scheduler/isoscope.py +++ b/src/xdist/scheduler/isoscope.py @@ -27,10 +27,10 @@ Properties of this scheduler: 1. Executes one test scope/class at a time. - 2. Distributes tests of the executing scope/class to the configured XDist + 2. Distributes tests of the executing scope/class to the configured XDist Workers. 3. Guarantees that the Setup of the executing scope/class completes in all - XDist Workers BEFORE any of those Workers start processing the + XDist Workers BEFORE any of those Workers start processing the Teardown of that test scope/class. 4. Guarantees that the Teardown phase of the executing test scope/class completes in all XDist Workers before the Setup phase begins for the @@ -40,6 +40,7 @@ * Implementation of `_split_scope()` and public method documentation: - borrowed from the builtin `loadscope` scheduler """ # pylint: disable=too-many-lines + from __future__ import annotations from collections import OrderedDict @@ -48,15 +49,21 @@ import random from typing import TYPE_CHECKING -import pytest from _pytest.runner import CollectReport +import pytest + from xdist.report import report_collection_diff from xdist.workermanage import parse_spec_config if TYPE_CHECKING: - from typing import NoReturn, Optional, Sequence - from collections.abc import Generator, Iterable, ValuesView + from collections.abc import Generator + from collections.abc import Iterable + from collections.abc import ValuesView + from typing import NoReturn + from typing import Optional + from typing import Sequence + import xdist.remote from xdist.workermanage import WorkerController @@ -72,13 +79,14 @@ class IsoScopeScheduling: # pylint: disable=too-many-instance-attributes once per scope (vs. per worker) using `FileLock` or similar for coordination. """ + class _State(str, enum.Enum): # Waiting for scheduler to be ready to distribute the next Scope. When # the Workset Queue is NOT empty AND all workers which are shutting down # reach zero pending tests AND all other workers have no more than one # pending tests AND at least one worker is available for the distribution # of the next scope, then transition to `ACTIVATE_SCOPE` - WAIT_READY_TO_ACTIVATE_SCOPE = 'WAIT-READY-TO-ACTIVATE-SCOPE' + WAIT_READY_TO_ACTIVATE_SCOPE = "WAIT-READY-TO-ACTIVATE-SCOPE" # Activate (i.e., distribute) tests from the next Scope, if any. If a # scope was distributed, then transition to `WAIT_READY_TO_FENCE`. @@ -87,7 +95,7 @@ class _State(str, enum.Enum): # which are not shutting down. Workers with matching fence tests have # priority over empty workers (to satisfy that "at least two # active-Scope tests per worker" Rule) - ACTIVATE_SCOPE = 'ACTIVATE-SCOPE' + ACTIVATE_SCOPE = "ACTIVATE-SCOPE" # Waiting for scheduler to be ready to fence the active (i.e., # distributed) scope. Wait until each non-empty worker has only one @@ -98,7 +106,7 @@ class _State(str, enum.Enum): # Scope, then reset current active scope and transition to # `WAIT-READY-TO-ACTIVATE-SCOPE` (this means that all workers containing # active-Scope tests crashed) - WAIT_READY_TO_FENCE = 'WAIT-READY-TO-FENCE' + WAIT_READY_TO_FENCE = "WAIT-READY-TO-FENCE" # Fence the workers containing the final active-Scope tests in # order to allow those final pending tests to complete execution. Fence @@ -108,18 +116,16 @@ class _State(str, enum.Enum): # out of tests for fencing, then send "shutdown" to the balance of those # workers instead of a fence test. Finally, transition to # `WAIT_READY_TO_ACTIVATE_SCOPE`. - FENCE = 'FENCE' + FENCE = "FENCE" - def __init__( - self, - config: pytest.Config, - log: xdist.remote.Producer): + def __init__(self, config: pytest.Config, log: xdist.remote.Producer): self._config = config self._log: xdist.remote.Producer = log.distscopeisosched # Current scheduling state - self._state: IsoScopeScheduling._State = \ + self._state: IsoScopeScheduling._State = ( self._State.WAIT_READY_TO_ACTIVATE_SCOPE + ) # Scope ID of tests that are currently executing; `None` prior to the # initial distribution @@ -153,8 +159,9 @@ def __init__( # `_WorkerProxy` instances. Initially empty, it will be populated by # our `add_node_collection` implementation as it's called by xdist's # `DSession` and the corresponding test collection passes validation. - self._worker_by_node: \ - OrderedDict[WorkerController, _WorkerProxy] = OrderedDict() + self._worker_by_node: OrderedDict[WorkerController, _WorkerProxy] = ( + OrderedDict() + ) # Workers pending validation of their Test collections that have not # been admitted to `_worker_by_node` yet. @@ -171,8 +178,9 @@ def __init__( # # A worker is removed from `_pending_worker_by_node` when the xdist # controller invokes `remove_node()` with the corresponding node. - self._pending_worker_by_node: \ - OrderedDict[WorkerController, _WorkerProxy] = OrderedDict() + self._pending_worker_by_node: OrderedDict[WorkerController, _WorkerProxy] = ( + OrderedDict() + ) @property def nodes(self) -> list[WorkerController]: @@ -180,8 +188,9 @@ def nodes(self) -> list[WorkerController]: Called by xdist `DSession`. """ - return (list(self._worker_by_node.keys()) - + list(self._pending_worker_by_node.keys())) + return list(self._worker_by_node.keys()) + list( + self._pending_worker_by_node.keys() + ) @property def collection_is_completed(self) -> bool: @@ -240,10 +249,11 @@ def add_node(self, node: WorkerController) -> None: Called by the ``DSession.worker_workerready`` hook - when it successfully bootstraps a new remote worker. """ - self._log(f'Registering remote worker node {node}') + self._log(f"Registering remote worker node {node}") - assert node not in self._pending_worker_by_node, \ - f'{node=} already in pending workers' + assert ( + node not in self._pending_worker_by_node + ), f"{node=} already in pending workers" self._pending_worker_by_node[node] = _WorkerProxy(node) @@ -274,14 +284,15 @@ def remove_node(self, node: WorkerController) -> Optional[str]: :raise KeyError: if the Remote Worker node has not been registered with the scheduler. (NOTE: xdist's `DSession` expects this behavior) """ - self._log(f'Removing remote worker node {node}') + self._log(f"Removing remote worker node {node}") if node in self._pending_worker_by_node: # Worker was not admitted to active workers yet, remove it from the # pending worker collection. self._pending_worker_by_node.pop(node) - assert node not in self._worker_by_node, \ - f'{node=} in both pending and active workers' + assert ( + node not in self._worker_by_node + ), f"{node=} in both pending and active workers" return None # Worker was admitted to active workers already @@ -302,14 +313,16 @@ def remove_node(self, node: WorkerController) -> Optional[str]: first_pending_test = worker.head_pending_test crashed_test_id = first_pending_test.test_id - self._log(f'Remote Worker {repr(worker)} shut down ungracefully. It ' - f'may have crashed while executing the pending test ' - f'{first_pending_test}. ' - f'NOTE: The ungraceful shutdown may create an imbalance ' - f'between the execution of the setup and teardown ' - f'fixture(s). THIS MAY LEAVE THE SYSTEM UNDER TEST IN AN ' - f'UNEXPECTED STATE, COMPROMISING EXECUTION OF ALL SUBSEQUENT ' - f'TESTS IN CURRENT AND FUTURE SESSIONS.') + self._log( + f"Remote Worker {worker!r} shut down ungracefully. It " + f"may have crashed while executing the pending test " + f"{first_pending_test}. " + f"NOTE: The ungraceful shutdown may create an imbalance " + f"between the execution of the setup and teardown " + f"fixture(s). THIS MAY LEAVE THE SYSTEM UNDER TEST IN AN " + f"UNEXPECTED STATE, COMPROMISING EXECUTION OF ALL SUBSEQUENT " + f"TESTS IN CURRENT AND FUTURE SESSIONS." + ) # Return the pending tests back to the workset queue for test in worker.release_pending_tests(): @@ -318,9 +331,7 @@ def remove_node(self, node: WorkerController) -> Optional[str]: return crashed_test_id def add_node_collection( - self, - node: WorkerController, - collection: Sequence[str] + self, node: WorkerController, collection: Sequence[str] ) -> None: """Register the collected test items from a Remote Worker node. @@ -340,12 +351,12 @@ def add_node_collection( - ``DSession.worker_collectionfinish``. """ - self._log(f'Adding collection for node {node}: {len(collection)=}') + self._log(f"Adding collection for node {node}: {len(collection)=}") # Check that add_node() was called on the node before - assert node in self._pending_worker_by_node, \ - f'Received test collection for {node=} which is not in pending ' \ - f'workers' + assert node in self._pending_worker_by_node, ( + f"Received test collection for {node=} which is not in pending " f"workers" + ) worker = self._pending_worker_by_node[node] @@ -357,10 +368,11 @@ def add_node_collection( # Check that the new collection matches the official collection if self._do_two_nodes_have_same_collection( - reference_node=self._official_test_collection_node, - reference_collection=self._official_test_collection, - node=node, - collection=collection): + reference_node=self._official_test_collection_node, + reference_collection=self._official_test_collection, + node=node, + collection=collection, + ): # The worker's collection is valid, so activate the new worker self._pending_worker_by_node.pop(node) self._worker_by_node[node] = worker @@ -376,8 +388,8 @@ def add_node_collection( # Get all pending workers with registered test collection w: _WorkerProxy workers_with_collection = [ - w for w in self._pending_worker_by_node.values() - if w.collection is not None] + w for w in self._pending_worker_by_node.values() if w.collection is not None + ] if len(workers_with_collection) < self._expected_num_workers: # Not enough test collections registered yet @@ -388,15 +400,15 @@ def add_node_collection( reference_worker = workers_with_collection[0] for pending_worker in workers_with_collection[1:]: if not self._do_two_nodes_have_same_collection( - reference_node=reference_worker.node, - reference_collection=reference_worker.collection, - node=pending_worker.node, - collection=pending_worker.collection): + reference_node=reference_worker.node, + reference_collection=reference_worker.collection, + node=pending_worker.node, + collection=pending_worker.collection, + ): same_collection = False if not same_collection: - self._log( - '**Different tests collected, aborting worker activation**') + self._log("**Different tests collected, aborting worker activation**") return # Collections are identical! @@ -417,8 +429,8 @@ def add_node_collection( # particularly slow) all_tests = [ _TestProxy(test_id=test_id, test_index=test_index) - for test_index, test_id - in enumerate(self._official_test_collection)] + for test_index, test_id in enumerate(self._official_test_collection) + ] shuffled_test_collection = random.sample(all_tests, k=len(all_tests)) # Organize tests into a queue of worksets grouped by test scope ID @@ -426,10 +438,7 @@ def add_node_collection( self._workset_queue.add_test(test) def mark_test_complete( - self, - node: WorkerController, - item_index: int, - duration: float + self, node: WorkerController, item_index: int, duration: float ) -> None: """Mark test item as completed by node and remove from pending tests in the worker and reschedule. @@ -444,9 +453,11 @@ def mark_test_complete( worker = self._worker_by_node[node] if self._log.enabled: - self._log(f'Marking test complete: ' - f'test_id={self._official_test_collection[item_index]}; ' - f'{item_index=}; {worker}') + self._log( + f"Marking test complete: " + f"test_id={self._official_test_collection[item_index]}; " + f"{item_index=}; {worker}" + ) worker.handle_test_completion(test_index=item_index) @@ -474,8 +485,9 @@ def schedule(self) -> None: - ``DSession.worker_collectionfinish``. """ - assert self.collection_is_completed, \ - 'schedule() called before test collection completed' + assert ( + self.collection_is_completed + ), "schedule() called before test collection completed" # Test collection has been completed, so reschedule if needed self._reschedule_workers() @@ -503,7 +515,7 @@ def split_scope(test_id: str) -> str: example/loadsuite/test/test_delta.py::Delta1 example/loadsuite/epsilon/__init__.py """ - return test_id.rsplit('::', 1)[0] + return test_id.rsplit("::", 1)[0] @property def _workers(self) -> Iterable[_WorkerProxy]: @@ -513,8 +525,7 @@ def _workers(self) -> Iterable[_WorkerProxy]: return self._worker_by_node.values() def _reschedule_workers(self) -> None: - """Distribute work to workers if needed at this time. - """ + """Distribute work to workers if needed at this time.""" assert self._state is not None traversed_states = [] @@ -523,9 +534,10 @@ def _reschedule_workers(self) -> None: # NOTE: This loop will terminate because completion of tests and # worker availability are reported outside the scope of this # function, and our state transitions are limited by those factors - assert len(traversed_states) <= len(self._State), \ - f'Too many traversed states - {len(traversed_states)}: ' \ - f'{traversed_states}' + assert len(traversed_states) <= len(self._State), ( + f"Too many traversed states - {len(traversed_states)}: " + f"{traversed_states}" + ) traversed_states.append(self._state) previous_state = self._state @@ -539,7 +551,7 @@ def _reschedule_workers(self) -> None: elif self._state is self._State.FENCE: self._handle_state_fence() else: - raise RuntimeError(f'Unhandled state: {self._state}') + raise RuntimeError(f"Unhandled state: {self._state}") def _handle_state_wait_ready_to_activate_scope(self) -> None: """Handle the `WAIT_READY_TO_ACTIVATE_SCOPE` state. @@ -550,8 +562,9 @@ def _handle_state_wait_ready_to_activate_scope(self) -> None: pending tests AND at least one worker is available for the distribution of the next scope, then transition to `ACTIVATE_SCOPE` """ - assert self._state is self._State.WAIT_READY_TO_ACTIVATE_SCOPE, \ - f'{self._state=} != {self._State.WAIT_READY_TO_ACTIVATE_SCOPE}' + assert ( + self._state is self._State.WAIT_READY_TO_ACTIVATE_SCOPE + ), f"{self._state=} != {self._State.WAIT_READY_TO_ACTIVATE_SCOPE}" if self._workset_queue.empty: # No more scopes are available @@ -575,21 +588,21 @@ def _handle_state_wait_ready_to_activate_scope(self) -> None: # session. next_scope_id = self._workset_queue.head_workset.scope_id - if not self._get_workers_available_for_distribution( - scope_id=next_scope_id): + if not self._get_workers_available_for_distribution(scope_id=next_scope_id): # No workers are available for distribution of the next scope. # It appears that some workers have crashed. xdist will either # replace crashed workers or terminate the session. if self._log.enabled: - self._log(f'No workers are available for {next_scope_id=}, ' - f'they likely crashed; staying in {self._state=}') + self._log( + f"No workers are available for {next_scope_id=}, " + f"they likely crashed; staying in {self._state=}" + ) return # Conditions are satisfied for transition to next state previous_state = self._state self._state = self._State.ACTIVATE_SCOPE - self._log(f'Transitioned from {str(previous_state)} to ' - f'{str(self._state)}') + self._log(f"Transitioned from {previous_state!s} to " f"{self._state!s}") def _handle_state_activate_scope(self) -> None: """Handle the `ACTIVATE_SCOPE` state. @@ -602,12 +615,13 @@ def _handle_state_activate_scope(self) -> None: priority over empty workers (to satisfy the "at least two active-Scope tests per worker" Rule) """ - assert self._state is self._State.ACTIVATE_SCOPE, \ - f'{self._state=} != {self._State.ACTIVATE_SCOPE}' + assert ( + self._state is self._State.ACTIVATE_SCOPE + ), f"{self._state=} != {self._State.ACTIVATE_SCOPE}" # The previous state is responsible for ensuring that the workset queue # is not empty - assert not self._workset_queue.empty, f'Empty {self._workset_queue}' + assert not self._workset_queue.empty, f"Empty {self._workset_queue}" workset = self._workset_queue.dequeue_workset() @@ -615,12 +629,14 @@ def _handle_state_activate_scope(self) -> None: # contain a fence test belonging to this scope as well as empty workers # which are not shutting down available_workers = self._get_workers_available_for_distribution( - scope_id=workset.scope_id) + scope_id=workset.scope_id + ) # The previous state is responsible for ensuring that workers are # available for this Scope - assert available_workers, \ - f'No workers available for {workset.scope_id=} in {self._state=}' + assert ( + available_workers + ), f"No workers available for {workset.scope_id=} in {self._state=}" # Distribute the workset to the available workers self._distribute_workset(workset=workset, workers=available_workers) @@ -631,9 +647,11 @@ def _handle_state_activate_scope(self) -> None: # Conditions are satisfied for transition to next state previous_state = self._state self._state = self._State.WAIT_READY_TO_FENCE - self._log(f'Transitioned from {str(previous_state)} to ' - f'{str(self._state)}. ' - f'Activated scope={self._active_scope_id}') + self._log( + f"Transitioned from {previous_state!s} to " + f"{self._state!s}. " + f"Activated scope={self._active_scope_id}" + ) def _handle_state_wait_ready_to_fence(self) -> None: """Handle the `WAIT_READY_TO_FENCE` state. @@ -648,11 +666,11 @@ def _handle_state_wait_ready_to_fence(self) -> None: `WAIT-READY-TO-ACTIVATE-SCOPE` (this means that all workers containing active-Scope tests crashed) """ - assert self._state is self._State.WAIT_READY_TO_FENCE, \ - f'{self._state=} != {self._State.WAIT_READY_TO_FENCE}' + assert ( + self._state is self._State.WAIT_READY_TO_FENCE + ), f"{self._state=} != {self._State.WAIT_READY_TO_FENCE}" - assert self._active_scope_id is not None, \ - f'{self._active_scope_id=} is None' + assert self._active_scope_id is not None, f"{self._active_scope_id=} is None" for worker in self._workers: if worker.num_pending_tests > 1: @@ -660,7 +678,8 @@ def _handle_state_wait_ready_to_fence(self) -> None: return workers_to_fence = self._get_workers_ready_for_fencing( - scope_id=self._active_scope_id) + scope_id=self._active_scope_id + ) # Conditions are satisfied for transition to next state previous_state = self._state @@ -672,11 +691,12 @@ def _handle_state_wait_ready_to_fence(self) -> None: # No active-Scope tests pending, so nothing to fence. Their # worker(s) must have crashed? self._state = self._State.WAIT_READY_TO_ACTIVATE_SCOPE - self._log(f'Nothing to fence! No active-scope tests pending - ' - f'workers crashed? {self._active_scope_id=}') + self._log( + f"Nothing to fence! No active-scope tests pending - " + f"workers crashed? {self._active_scope_id=}" + ) - self._log(f'Transitioned from {str(previous_state)} to ' - f'{str(self._state)}') + self._log(f"Transitioned from {previous_state!s} to " f"{self._state!s}") def _handle_state_fence(self) -> None: """Handle the `FENCE` state. @@ -690,32 +710,38 @@ def _handle_state_fence(self) -> None: workers instead of a fence test. Finally, transition to `WAIT_READY_TO_ACTIVATE_SCOPE`. """ - assert self._state is self._State.FENCE, \ - f'{self._state=} is not {self._State.FENCE}' + assert ( + self._state is self._State.FENCE + ), f"{self._state=} is not {self._State.FENCE}" workers_to_fence = self._get_workers_ready_for_fencing( - scope_id=self._active_scope_id) + scope_id=self._active_scope_id + ) # The prior state should have ensured that there is at least one worker # that needs to be fenced - assert workers_to_fence, \ - f'No workers ready to fence {self._active_scope_id=} ' \ - f'in {self._state=}; ' \ - f'active workers: {[w.verbose_repr() for w in self._workers]}' + assert workers_to_fence, ( + f"No workers ready to fence {self._active_scope_id=} " + f"in {self._state=}; " + f"active workers: {[w.verbose_repr() for w in self._workers]}" + ) # We will take Fence tests from subsequent worksets. # NOTE: A given workset may be used to fence multiple preceding active # Scopes fence_item_generator = self._generate_fence_items( - source_worksets=self._workset_queue.worksets) + source_worksets=self._workset_queue.worksets + ) # Start fencing for worker in workers_to_fence: fence_item = next(fence_item_generator) if fence_item is not None: worker.run_some_tests([fence_item]) - self._log(f'Fenced {worker} with {fence_item}. ' - f'Active scope={self._active_scope_id}') + self._log( + f"Fenced {worker} with {fence_item}. " + f"Active scope={self._active_scope_id}" + ) else: # No more fence items, so send the "shutdown" message to # the worker to force it to execute its final pending test and @@ -727,13 +753,10 @@ def _handle_state_fence(self) -> None: # Transition to next state previous_state = self._state self._state = self._State.WAIT_READY_TO_ACTIVATE_SCOPE - self._log(f'Transitioned from {str(previous_state)} to ' - f'{str(self._state)}') + self._log(f"Transitioned from {previous_state!s} to " f"{self._state!s}") def _distribute_workset( - self, - workset: _ScopeWorkset, - workers: list[_WorkerProxy] + self, workset: _ScopeWorkset, workers: list[_WorkerProxy] ) -> None: """Distribute the tests in the given workset to the given workers. @@ -762,48 +785,54 @@ def _distribute_workset( # Remaining tests in the workset plus the number borrowed as fences # must add up to the original total tests in the workset - assert (workset.num_tests + num_workers_with_fences - == workset.high_water), \ - f'{workset}.num_tests + {num_workers_with_fences=} ' \ - f'!= {workset.high_water=}; {workers=}' + assert workset.num_tests + num_workers_with_fences == workset.high_water, ( + f"{workset}.num_tests + {num_workers_with_fences=} " + f"!= {workset.high_water=}; {workers=}" + ) # Determine the number of workers we will use for this distribution num_workers_to_use = min( - self._get_max_workers_for_num_tests(workset.high_water), - len(workers)) + self._get_max_workers_for_num_tests(workset.high_water), len(workers) + ) # At minimum, all workers fenced from the given Scope Workset must be # included in the distribution - assert num_workers_to_use >= num_workers_with_fences, \ - f'{num_workers_to_use=} < {num_workers_with_fences=} ' \ - f'for {workset} and available {len(workers)=}' + assert num_workers_to_use >= num_workers_with_fences, ( + f"{num_workers_to_use=} < {num_workers_with_fences=} " + f"for {workset} and available {len(workers)=}" + ) # We should only be called when there is work to be done - assert num_workers_to_use > 0, f'{num_workers_to_use=} <= 0' + assert num_workers_to_use > 0, f"{num_workers_to_use=} <= 0" # Our workset's footprint should not exceed available workers - assert num_workers_to_use <= len(workers), \ - f'{num_workers_to_use=} > {len(workers)=} for {workset}' + assert num_workers_to_use <= len( + workers + ), f"{num_workers_to_use=} > {len(workers)=} for {workset}" # Distribute the tests to the selected workers - self._log(f'Distributing {workset} to {num_workers_to_use=}: ' - f'{workers[:num_workers_to_use]}') + self._log( + f"Distributing {workset} to {num_workers_to_use=}: " + f"{workers[:num_workers_to_use]}" + ) num_tests_remaining = workset.high_water - for (worker, num_available_workers) in zip( - workers, - range(num_workers_to_use, 0, -1)): + for worker, num_available_workers in zip( + workers, range(num_workers_to_use, 0, -1) + ): worker: _WorkerProxy num_available_workers: int # Workers ready for distribution must have no more than one pending # test - assert worker.num_pending_tests <= 1, \ - f'{worker.verbose_repr()} num_pending_tests > 1' + assert ( + worker.num_pending_tests <= 1 + ), f"{worker.verbose_repr()} num_pending_tests > 1" if not worker.empty: # The single pending test in the worker must be a Fence test # borrowed from the given workset - assert worker.head_pending_test.scope_id == workset.scope_id, \ - f'Scope IDs of {worker.verbose_repr()} and {workset} differ' + assert ( + worker.head_pending_test.scope_id == workset.scope_id + ), f"Scope IDs of {worker.verbose_repr()} and {workset} differ" # Determine the target number of tests for this worker (including # a matching Fence test, if any) @@ -818,26 +847,27 @@ def _distribute_workset( if num_tests_to_add: tests_to_add = workset.dequeue_tests(num_tests=num_tests_to_add) worker.run_some_tests(tests_to_add) - self._log(f'Distributed {len(tests_to_add)} tests to {worker} ' - f'from {workset}') + self._log( + f"Distributed {len(tests_to_add)} tests to {worker} " + f"from {workset}" + ) else: # NOTE: A Workset with a high watermark of just one item becomes # empty if a Fence item was withdrawn from it - assert workset.high_water == 1, \ - f'Attempted to distribute 0 tests to {worker} ' \ - f'from {workset}' - self._log(f'No more tests to distribute from {workset} ' - f'to {worker}') + assert workset.high_water == 1, ( + f"Attempted to distribute 0 tests to {worker} " f"from {workset}" + ) + self._log(f"No more tests to distribute from {workset} " f"to {worker}") # Workset should be empty now - assert workset.empty, \ - f'{workset} is not empty after distribution to {num_workers_to_use} ' \ - f'workers: {workers[:num_workers_to_use]}.' + assert workset.empty, ( + f"{workset} is not empty after distribution to {num_workers_to_use} " + f"workers: {workers[:num_workers_to_use]}." + ) @classmethod def _generate_fence_items( - cls, - source_worksets: Iterable[_ScopeWorkset] + cls, source_worksets: Iterable[_ScopeWorkset] ) -> Generator[Optional[_TestProxy], None, None]: """Generator that withdraws (i.e., dequeues) Fence test items from the given ordered Scope Worksets and yields them until it runs out of the @@ -904,13 +934,13 @@ def _get_fence_capacity_of_workset(cls, workset: _ScopeWorkset) -> int: :param workset: The given Scope Workset :return: """ - num_fence_items = ( - cls._get_max_workers_for_num_tests(num_tests=workset.high_water) - - (workset.high_water - workset.num_tests) - ) + num_fence_items = cls._get_max_workers_for_num_tests( + num_tests=workset.high_water + ) - (workset.high_water - workset.num_tests) - assert num_fence_items >= 0, f'Number of fences below zero ' \ - f'({num_fence_items}) in {workset}' + assert num_fence_items >= 0, ( + f"Number of fences below zero " f"({num_fence_items}) in {workset}" + ) return num_fence_items @@ -941,8 +971,7 @@ def _get_max_workers_for_num_tests(num_tests: int) -> int: return num_tests // 2 def _get_workers_available_for_distribution( - self, - scope_id: str + self, scope_id: str ) -> list[_WorkerProxy]: """Return workers available for distribution of the given Scope. @@ -959,16 +988,15 @@ def _get_workers_available_for_distribution( :return: A (possibly empty) list of workers available for distribution. """ return [ - worker for worker in self._workers - if (not worker.shutting_down - and (worker.empty - or worker.tail_pending_test.scope_id == scope_id)) + worker + for worker in self._workers + if ( + not worker.shutting_down + and (worker.empty or worker.tail_pending_test.scope_id == scope_id) + ) ] - def _get_workers_ready_for_fencing( - self, - scope_id: str - ) -> list[_WorkerProxy]: + def _get_workers_ready_for_fencing(self, scope_id: str) -> list[_WorkerProxy]: """Return workers that are ready to be Fenced for the given test Scope. A worker that needs to be Fenced satisfies all the following conditions: @@ -981,10 +1009,13 @@ def _get_workers_ready_for_fencing( :return: A (possibly empty) list of workers to Fence. """ return [ - worker for worker in self._workers - if (not worker.shutting_down + worker + for worker in self._workers + if ( + not worker.shutting_down and worker.num_pending_tests == 1 - and worker.head_pending_test.scope_id == scope_id) + and worker.head_pending_test.scope_id == scope_id + ) ] def _do_two_nodes_have_same_collection( @@ -992,7 +1023,7 @@ def _do_two_nodes_have_same_collection( reference_node: WorkerController, reference_collection: tuple[str], node: WorkerController, - collection: tuple[str, ...] + collection: tuple[str, ...], ) -> bool: """ If collections differ, this method returns False while logging @@ -1008,8 +1039,8 @@ def _do_two_nodes_have_same_collection( otherwise. """ msg = report_collection_diff( - reference_collection, collection, reference_node.gateway.id, - node.gateway.id) + reference_collection, collection, reference_node.gateway.id, node.gateway.id + ) if not msg: return True @@ -1019,8 +1050,7 @@ def _do_two_nodes_have_same_collection( # NOTE: Not sure why/when `_config` would be `None`. Copied check # from the `loadscope` scheduler. - report = CollectReport(node.gateway.id, 'failed', longrepr=msg, - result=[]) + report = CollectReport(node.gateway.id, "failed", longrepr=msg, result=[]) self._config.hook.pytest_collectreport(report=report) return False @@ -1046,8 +1076,7 @@ def __init__(self, node: WorkerController): # Initially None, until assigned by the Scheduler self._collection: Optional[tuple[str]] = None - self._pending_test_by_index: \ - OrderedDict[int, _TestProxy] = OrderedDict() + self._pending_test_by_index: OrderedDict[int, _TestProxy] = OrderedDict() def __repr__(self): return self.verbose_repr(verbose=False) @@ -1073,10 +1102,11 @@ def collection(self, collection: tuple[str]): :param collection: An ordered collection of test IDs collected by the remote worker. Must not be `None`. Also, MUST NOT be set already. """ - assert collection is not None, f'None test collection passed to {self}' + assert collection is not None, f"None test collection passed to {self}" - assert self._collection is None, \ - f'Test collection passed when one already exists to {self}' + assert ( + self._collection is None + ), f"Test collection passed when one already exists to {self}" self._collection = collection @@ -1113,8 +1143,7 @@ def empty(self) -> bool: @property def num_pending_tests(self) -> int: - """Count of tests in the pending queue - """ + """Count of tests in the pending queue""" return len(self._pending_test_by_index) @property @@ -1134,24 +1163,22 @@ def verbose_repr(self, verbose: bool = True) -> str: :return: `repr` of the instance. """ items = [ - '<', - f'{self.__class__.__name__}:', - f'{self._node}', - f'shutting_down={self.shutting_down}', - f'num_pending={self.num_pending_tests}' + "<", + f"{self.__class__.__name__}:", + f"{self._node}", + f"shutting_down={self.shutting_down}", + f"num_pending={self.num_pending_tests}", ] if verbose: if self.num_pending_tests == 1: - items.append( - f'head_scope_id={self.head_pending_test.scope_id}') + items.append(f"head_scope_id={self.head_pending_test.scope_id}") if self.num_pending_tests > 1: - items.append( - f'tail_scope_id={self.tail_pending_test.scope_id}') + items.append(f"tail_scope_id={self.tail_pending_test.scope_id}") - items.append('>') + items.append(">") - return ' '.join(items) + return " ".join(items) def run_some_tests(self, tests: Iterable[_TestProxy]) -> None: """ @@ -1172,8 +1199,9 @@ def handle_test_completion(self, test_index: int) -> None: # Completion should be reported in same order the tests were sent to # the remote worker - assert head_pending_test_index == test_index, \ - f'{head_pending_test_index=} != {test_index}' + assert ( + head_pending_test_index == test_index + ), f"{head_pending_test_index=} != {test_index}" # Remove the test from the worker's pending queue self._pending_test_by_index.pop(test_index) @@ -1198,13 +1226,16 @@ def shutdown(self) -> None: class _TestProxy: """ - Represents a single test from the overall test - collection to be executed + Represents a single test from the overall test + collection to be executed """ # There can be a large number of tests, so economize memory by declaring # `__slots__` (see https://wiki.python.org/moin/UsingSlots) - __slots__ = ('test_id', 'test_index',) + __slots__ = ( + "test_id", + "test_index", + ) def __init__(self, test_id: str, test_index: int): """ @@ -1217,13 +1248,14 @@ def __init__(self, test_id: str, test_index: int): self.test_index: int = test_index def __repr__(self): - return f'<{self.__class__.__name__}: test_index={self.test_index} ' \ - f'scope_id={self.scope_id} test_id={self.test_id}>' + return ( + f"<{self.__class__.__name__}: test_index={self.test_index} " + f"scope_id={self.scope_id} test_id={self.test_id}>" + ) @property def scope_id(self) -> str: - """Scope ID to which this test belongs. - """ + """Scope ID to which this test belongs.""" return IsoScopeScheduling.split_scope(self.test_id) @@ -1232,7 +1264,11 @@ class _ScopeWorkset: Ordered collection of Tests for the given scope """ - __slots__ = ('scope_id', '_high_water', '_test_by_index',) + __slots__ = ( + "scope_id", + "_high_water", + "_test_by_index", + ) def __init__(self, scope_id: str): """ @@ -1246,8 +1282,10 @@ def __init__(self, scope_id: str): self._test_by_index: OrderedDict[int, _TestProxy] = OrderedDict() def __repr__(self): - return f'<{self.__class__.__name__}: scope_id={self.scope_id} ' \ - f'num_tests={self.num_tests} high_water={self.high_water}>' + return ( + f"<{self.__class__.__name__}: scope_id={self.scope_id} " + f"num_tests={self.num_tests} high_water={self.high_water}>" + ) @property def empty(self) -> bool: @@ -1268,11 +1306,11 @@ def num_tests(self) -> int: def enqueue_test(self, test: _TestProxy) -> None: """Append given test to ordered test collection""" - assert test.scope_id == self.scope_id, \ - f'Wrong {test.scope_id=} for {self}' + assert test.scope_id == self.scope_id, f"Wrong {test.scope_id=} for {self}" - assert test.test_index not in self._test_by_index, \ - f'{test.test_index=} was already assigned to {self}' + assert ( + test.test_index not in self._test_by_index + ), f"{test.test_index=} was already assigned to {self}" self._test_by_index[test.test_index] = test @@ -1290,17 +1328,18 @@ def dequeue_tests(self, num_tests: int) -> list[_TestProxy]: available tests. @raise IndexError: If `num_tests` exceeds available tests. """ - assert num_tests > 0, f'Non-positive {num_tests=} requested.' + assert num_tests > 0, f"Non-positive {num_tests=} requested." if num_tests > len(self._test_by_index): - raise IndexError( - f'{num_tests=} exceeds {len(self._test_by_index)=}') + raise IndexError(f"{num_tests=} exceeds {len(self._test_by_index)=}") key_iter = iter(self._test_by_index.keys()) test_indexes_to_dequeue = [next(key_iter) for _ in range(num_tests)] - return [self._test_by_index.pop(test_index) - for test_index in test_indexes_to_dequeue] + return [ + self._test_by_index.pop(test_index) + for test_index in test_indexes_to_dequeue + ] class _WorksetQueue: @@ -1310,8 +1349,10 @@ def __init__(self): self._workset_by_scope: OrderedDict[str, _ScopeWorkset] = OrderedDict() def __repr__(self): - return f'<{self.__class__.__name__}: ' \ - f'num_worksets={len(self._workset_by_scope)}>' + return ( + f"<{self.__class__.__name__}: " + f"num_worksets={len(self._workset_by_scope)}>" + ) @property def empty(self) -> bool: @@ -1357,7 +1398,6 @@ def dequeue_workset(self) -> _ScopeWorkset: @raise IndexError: If queue is empty. """ if self.empty: - raise IndexError('Attempted dequeue from empty Workset Queue.') + raise IndexError("Attempted dequeue from empty Workset Queue.") - return self._workset_by_scope.pop( - next(iter(self._workset_by_scope.keys()))) + return self._workset_by_scope.pop(next(iter(self._workset_by_scope.keys())))