From 51a36a1848da7e78330b737401f2c4247c10cb65 Mon Sep 17 00:00:00 2001 From: Chris Iaconetti Date: Mon, 16 Sep 2024 12:05:08 -0400 Subject: [PATCH 1/5] TUR-21619: Prototype Custom Scheduler (#1) TUR-21619 is a prototype method of executing groups of pytest test cases in series with each group running it's specific test cases in parallel with a variable number of workers. Tests can be marked with a custom mark. This custom mark specifies an arbitrary group name, and the number of processes to run tests in that group with, with the number of processes separated from the group name by an underscore. Due to limitations in pytest/execnet a shutdown signal must be sent to a node when one test case remains on the node (the test case is actually ran immediately, but we do not receive feedback through the channel until a shutdown signal is sent due to an old design decision in pytest). Because of this, in order to run additional tests after a group is nearly complete (when each worker has at most 1 test remaining), we must shutdown each node, recreate each node, and then schedule more tests. This adds overhead as we must teardown + startup nodes, however, this overhead is on the order of 1 second per teardown/startup. If this custom scheduling method allows us to save more than 1 second in test execution, it is worth the increased overhead cost. Note: The -n argument with --dist customgroup is the maximum number of worker processes. If -n specifies 4 but a specific group specifies 8, the specific group will be ran across only 4 nodes, we will never spin up more nodes than the -n argument allows. --------- Co-authored-by: Alvaro Barbeira --- src/xdist/dsession.py | 126 +++++++++- src/xdist/plugin.py | 3 + src/xdist/remote.py | 7 +- src/xdist/scheduler/__init__.py | 1 + src/xdist/scheduler/customgroup.py | 365 +++++++++++++++++++++++++++++ src/xdist/workermanage.py | 4 + xdist-testing-ntop/README.md | 14 ++ xdist-testing-ntop/pytest.ini | 5 + xdist-testing-ntop/test.py | 94 ++++++++ 9 files changed, 610 insertions(+), 9 deletions(-) create mode 100644 src/xdist/scheduler/customgroup.py create mode 100644 xdist-testing-ntop/README.md create mode 100644 xdist-testing-ntop/pytest.ini create mode 100644 xdist-testing-ntop/test.py diff --git a/src/xdist/dsession.py b/src/xdist/dsession.py index 62079a28..5b56a551 100644 --- a/src/xdist/dsession.py +++ b/src/xdist/dsession.py @@ -8,6 +8,7 @@ from typing import Any from typing import Sequence import warnings +import traceback import execnet import pytest @@ -21,6 +22,7 @@ from xdist.scheduler import LoadScopeScheduling from xdist.scheduler import Scheduling from xdist.scheduler import WorkStealingScheduling +from xdist.scheduler import CustomGroup from xdist.workermanage import NodeManager from xdist.workermanage import WorkerController @@ -58,10 +60,14 @@ def __init__(self, config: pytest.Config) -> None: self._failed_collection_errors: dict[object, bool] = {} self._active_nodes: set[WorkerController] = set() self._failed_nodes_count = 0 + self.saved_put = None + self.remake_nodes = False + self.ready_to_run_tests = False self._max_worker_restart = get_default_max_worker_restart(self.config) # summary message to print at the end of the session self._summary_report: str | None = None self.terminal = config.pluginmanager.getplugin("terminalreporter") + self.worker_status: dict[WorkerController, str] = {} if self.terminal: self.trdist = TerminalDistReporter(config) config.pluginmanager.register(self.trdist, "terminaldistreporter") @@ -87,6 +93,7 @@ def pytest_sessionstart(self, session: pytest.Session) -> None: soon as nodes start they will emit the worker_workerready event. """ self.nodemanager = NodeManager(self.config) + self.saved_put = self.queue.put nodes = self.nodemanager.setup_nodes(putevent=self.queue.put) self._active_nodes.update(nodes) self._session = session @@ -123,6 +130,8 @@ def pytest_xdist_make_scheduler( return LoadGroupScheduling(config, log) if dist == "worksteal": return WorkStealingScheduling(config, log) + if dist == "customgroup": + return CustomGroup(config, log) return None @pytest.hookimpl @@ -147,14 +156,19 @@ def loop_once(self) -> None: """Process one callback from one of the workers.""" while 1: if not self._active_nodes: - # If everything has died stop looping - self.triggershutdown() - raise RuntimeError("Unexpectedly no active workers available") + # Worker teardown + recreation only occurs for CustomGroup Scheduler + if isinstance(self.sched, CustomGroup) and self.remake_nodes: + pass + else: + # We aren't using CustomGroup scheduler and everything has died: stop looping + self.triggershutdown() + raise RuntimeError("Unexpectedly no active workers available") try: eventcall = self.queue.get(timeout=2.0) break except Empty: continue + callname, kwargs = eventcall assert callname, kwargs method = "worker_" + callname @@ -165,6 +179,71 @@ def loop_once(self) -> None: if self.sched.tests_finished: self.triggershutdown() + + def is_node_finishing(self, node: WorkerController): + """Check if a test worker is considered to be finishing. + + Evaluate whether it's on its last test, or if no tests are pending. + """ + pending = self.sched.node2pending.get(node) + return pending is not None and len(pending) < 2 + + + def is_node_clear(self, node: WorkerController): + """Check if a test worker has no pending tests.""" + pending = self.sched.node2pending.get(node) + return pending is None or len(pending) == 0 + + + def are_all_nodes_finishing(self): + """Check if all workers are finishing (See 'is_node_finishing' above).""" + return all(self.is_node_finishing(node) for node in self.sched.nodes) + + + def are_all_nodes_done(self): + """Check if all nodes have reported to finish.""" + return all(s == "finished" for s in self.worker_status.values()) + + + def are_all_active_nodes_collected(self): + """Check if all nodes have reported collection to be complete.""" + if not all(n.gateway.id in self.worker_status for n in self._active_nodes): + return False + return all(self.worker_status[n.gateway.id] == "collected" for n in self._active_nodes) + + + def reset_nodes_if_needed(self): + if self.are_all_nodes_finishing() and self.ready_to_run_tests and not self.sched.do_resched: + self.reset_nodes() + + + def reset_nodes(self): + """Issue shutdown notices to workers for rescheduling purposes.""" + if len(self.sched.pending) != 0: + self.remake_nodes = True + for node in self.sched.nodes: + if self.is_node_finishing(node): + node.shutdown() + + + def reschedule(self): + """Reschedule tests.""" + self.sched.do_resched = False + self.sched.check_schedule(self.sched.nodes[0], 1.0, True) + + + def prepare_for_reschedule(self): + """Update test workers and their status tracking so rescheduling is ready.""" + self.remake_nodes = False + num_workers = self.sched.dist_groups[self.sched.pending_groups[0]]['group_workers'] + self.trdist._status = {} + new_nodes = self.nodemanager.setup_nodes(self.saved_put, num_workers) + self.worker_status = {} + self._active_nodes = set() + self._active_nodes.update(new_nodes) + self.sched.node2pending = {} + self.sched.do_resched = True + # # callbacks for processing events from workers # @@ -182,6 +261,7 @@ def worker_workerready( node.workerinfo = workerinfo node.workerinfo["id"] = node.gateway.id node.workerinfo["spec"] = node.gateway.spec + self.update_worker_status(node, "ready") self.config.hook.pytest_testnodeready(node=node) if self.shuttingdown: @@ -198,6 +278,17 @@ def worker_workerfinished(self, node: WorkerController) -> None: The node might not be in the scheduler if it had not emitted workerready before shutdown was triggered. """ + self.update_worker_status(node, "finished") + + if isinstance(self.sched, CustomGroup) and self.remake_nodes: + node.ensure_teardown() + self._active_nodes.remove(node) + if self.are_all_nodes_done(): + try: + self.prepare_for_reschedule() + except Exception as e: + self.shouldstop = f"Exception caught during preparation for rescheduling. Giving up.\n{''.join(traceback.format_exception(e))}" + return self.config.hook.pytest_testnodedown(node=node, error=None) if node.workeroutput["exitstatus"] == 2: # keyboard-interrupt self.shouldstop = f"{node} received keyboard-interrupt" @@ -217,6 +308,13 @@ def worker_workerfinished(self, node: WorkerController) -> None: assert not crashitem, (crashitem, node) self._active_nodes.remove(node) + def update_worker_status(self, node, status): + """Track the worker status. + + Can be used at callbacks like 'worker_workerfinished' so we remember wchic event was reported last by each worker. + """ + self.worker_status[node.workerinfo["id"]] = status + def worker_internal_error( self, node: WorkerController, formatted_error: str ) -> None: @@ -283,7 +381,10 @@ def worker_collectionfinish( scheduling the first time it logs which scheduler is in use. """ if self.shuttingdown: + self.report_line(f"[-] [dse] collectionfinish while closing {node.gateway.id}") return + self.update_worker_status(node, "collected") + self.config.hook.pytest_xdist_node_collection_finished(node=node, ids=ids) # tell session which items were effectively collected otherwise # the controller node will finish the session with EXIT_NOTESTSCOLLECTED @@ -300,10 +401,15 @@ def worker_collectionfinish( self.trdist.ensure_show_status() self.terminal.write_line("") if self.config.option.verbose > 0: - self.terminal.write_line( - f"scheduling tests via {self.sched.__class__.__name__}" - ) - self.sched.schedule() + self.report_line(f"[-] [dse] scheduling tests via {self.sched.__class__.__name__}") + if isinstance(self.sched, CustomGroup) and self.ready_to_run_tests and self.are_all_active_nodes_collected(): + # we're coming back here after finishing a batch of tests - so start the next batch + self.reschedule() + self.reset_nodes_if_needed() + else: + self.ready_to_run_tests = True + self.sched.schedule() + self.reset_nodes_if_needed() def worker_logstart( self, @@ -339,6 +445,12 @@ def worker_runtest_protocol_complete( """ assert self.sched is not None self.sched.mark_test_complete(node, item_index, duration) + if isinstance(self.sched, CustomGroup): + if self.are_all_nodes_finishing(): + if self.shouldstop: + self.report_line("Won't reschedule - should stop.") + else: + self.reset_nodes() def worker_unscheduled( self, node: WorkerController, indices: Sequence[int] diff --git a/src/xdist/plugin.py b/src/xdist/plugin.py index f670d9de..a600a705 100644 --- a/src/xdist/plugin.py +++ b/src/xdist/plugin.py @@ -108,6 +108,7 @@ def pytest_addoption(parser: pytest.Parser) -> None: "loadfile", "loadgroup", "worksteal", + "customgroup", "no", ], dest="dist", @@ -124,6 +125,8 @@ def pytest_addoption(parser: pytest.Parser) -> None: "loadgroup: Like 'load', but sends tests marked with 'xdist_group' to the same worker.\n\n" "worksteal: Split the test suite between available environments," " then re-balance when any worker runs out of tests.\n\n" + # TODO: Update docstring + "customgroup: TODO: add docs here" "(default) no: Run tests inprocess, don't distribute." ), ) diff --git a/src/xdist/remote.py b/src/xdist/remote.py index dd1f9883..0ec9047d 100644 --- a/src/xdist/remote.py +++ b/src/xdist/remote.py @@ -201,15 +201,17 @@ def run_one_test(self) -> None: "runtest_protocol_complete", item_index=self.item_index, duration=duration ) + @pytest.mark.trylast def pytest_collection_modifyitems( self, config: pytest.Config, items: list[pytest.Item], ) -> None: # add the group name to nodeid as suffix if --dist=loadgroup - if config.getvalue("loadgroup"): + if config.getvalue("loadgroup") or config.getvalue("customgroup"): + functional_mark = "xdist_group" if config.getvalue("loadgroup") else "xdist_custom" for item in items: - mark = item.get_closest_marker("xdist_group") + mark = item.get_closest_marker(functional_mark) if not mark: continue gname = ( @@ -357,6 +359,7 @@ def getinfodict() -> WorkerInfo: def setup_config(config: pytest.Config, basetemp: str | None) -> None: config.option.loadgroup = config.getvalue("dist") == "loadgroup" + config.option.customgroup = config.getvalue("dist") == "customgroup" config.option.looponfail = False config.option.usepdb = False config.option.dist = "no" diff --git a/src/xdist/scheduler/__init__.py b/src/xdist/scheduler/__init__.py index b4894732..34b791d7 100644 --- a/src/xdist/scheduler/__init__.py +++ b/src/xdist/scheduler/__init__.py @@ -1,5 +1,6 @@ from xdist.scheduler.each import EachScheduling as EachScheduling from xdist.scheduler.load import LoadScheduling as LoadScheduling +from xdist.scheduler.customgroup import CustomGroup as CustomGroup from xdist.scheduler.loadfile import LoadFileScheduling as LoadFileScheduling from xdist.scheduler.loadgroup import LoadGroupScheduling as LoadGroupScheduling from xdist.scheduler.loadscope import LoadScopeScheduling as LoadScopeScheduling diff --git a/src/xdist/scheduler/customgroup.py b/src/xdist/scheduler/customgroup.py new file mode 100644 index 00000000..9a491965 --- /dev/null +++ b/src/xdist/scheduler/customgroup.py @@ -0,0 +1,365 @@ +"""Run tests across a variable number of nodes based on custom groups. + +# TODO: This is more of a spec/description, update docs/remove this section document within the class +Example: + - 10 test cases exist + - 4 test cases are marked with @pytest.mark.low + - 4 test cases are marked with @pytest.mark.medium + - 2 test cases are marked with @pytest.mark.high + - A pytest.ini file contains the following lines: +[pytest] + +markers= + low: 4 + medium: 2 + high: 1 + +Then the 4 low test cases will be ran on 4 workers (distributed evenly amongst the 4, as the load.py scheduler functions) +Then the 4 medium test cases will be ran on 2 workers (again, distributed evenly), only after the low test cases are complete (or before they start). +Then the 2 high test cases will be ran on 1 worker (distributed evenly), only after the low and medium test cases are complete (or before they start). + +This allows a pytest user more custom control over processing tests. +One potential application would be measuring the resource utilization of all test cases. Test cases that are not +resource intensive can be ran on many workers, and more resource intensive test cases can be ran once the low +resource consuming tests are done on fewer workers, such that resource consumption does not exceed available resources. +""" +from __future__ import annotations + +from itertools import cycle +from typing import Sequence + +import pytest + +from xdist.remote import Producer, WorkerInteractor +from xdist.report import report_collection_diff +from xdist.workermanage import parse_spec_config +from xdist.workermanage import WorkerController + +class CustomGroup: + """ + # TODO: update docs here + """ + + def __init__(self, config: pytest.Config, log: Producer | None = None) -> None: + self.terminal = config.pluginmanager.getplugin("terminalreporter") + self.numnodes = len(parse_spec_config(config)) + self.node2collection: dict[WorkerController, list[str]] = {} + self.node2pending: dict[WorkerController, list[int]] = {} + self.pending: list[int] = [] + self.collection: list[str] | None = None + if log is None: + self.log = Producer("loadsched") + else: + self.log = log.loadsched + self.config = config + self.maxschedchunk = self.config.getoption("maxschedchunk") + # TODO: Type annotation incorrect + self.dist_groups: dict[str, str] = {} + self.pending_groups: list[str] = [] + self.is_first_time = True + self.do_resched = False + + @property + def nodes(self) -> list[WorkerController]: + """A list of all nodes in the scheduler.""" + return list(self.node2pending.keys()) + + @property + def collection_is_completed(self) -> bool: + """Boolean indication initial test collection is complete. + + This is a boolean indicating all initial participating nodes + have finished collection. The required number of initial + nodes is defined by ``.numnodes``. + """ + return len(self.node2collection) >= self.numnodes + + @property + def tests_finished(self) -> bool: + """Return True if all tests have been executed by the nodes.""" + if not self.collection_is_completed: + return False + if self.pending: + return False + for pending in self.node2pending.values(): + if len(pending) >= 2: + return False + return True + + @property + def has_pending(self) -> bool: + """Return True if there are pending test items. + + This indicates that collection has finished and nodes are + still processing test items, so this can be thought of as + "the scheduler is active". + """ + if self.pending: + return True + for pending in self.node2pending.values(): + if pending: + return True + return False + + def add_node(self, node: WorkerController) -> None: + """Add a new node to the scheduler. + + From now on the node will be allocated chunks of tests to + execute. + + Called by the ``DSession.worker_workerready`` hook when it + successfully bootstraps a new node. + """ + assert node not in self.node2pending + self.node2pending[node] = [] + + def add_node_collection( + self, node: WorkerController, collection: Sequence[str] + ) -> None: + """Add the collected test items from a node. + + The collection is stored in the ``.node2collection`` map. + Called by the ``DSession.worker_collectionfinish`` hook. + """ + assert node in self.node2pending + if self.collection_is_completed: + # A new node has been added later, perhaps an original one died. + # .schedule() should have + # been called by now + assert self.collection + if collection != self.collection: + other_node = next(iter(self.node2collection.keys())) + msg = report_collection_diff( + self.collection, collection, other_node.gateway.id, node.gateway.id + ) + self.log(msg) + return + self.node2collection[node] = list(collection) + + def mark_test_complete( + self, node: WorkerController, item_index: int, duration: float = 0 + ) -> None: + """Mark test item as completed by node. + + The duration it took to execute the item is used as a hint to + the scheduler. + + This is called by the ``DSession.worker_testreport`` hook. + """ + self.node2pending[node].remove(item_index) + self.check_schedule(node, duration=duration) + + def mark_test_pending(self, item: str) -> None: + + assert self.collection is not None + self.pending.insert( + 0, + self.collection.index(item), + ) + for node in self.node2pending: + self.check_schedule(node) + + def remove_pending_tests_from_node( + self, + node: WorkerController, + indices: Sequence[int], + ) -> None: + raise NotImplementedError() + + def check_schedule(self, node: WorkerController, duration: float = 0, from_dsession=False) -> None: + """Maybe schedule new items on the node. + + If there are any globally pending nodes left then this will + check if the given node should be given any more tests. The + ``duration`` of the last test is optionally used as a + heuristic to influence how many tests the node is assigned. + """ + if node.shutting_down: + self.report_line(f"[-] [csg] {node.workerinput['workerid']} is already shutting down") + return + + if self.pending: + any_working = False + for node in self.nodes: + if len(self.node2pending[node]) not in [0, 1]: + any_working = True + + if not any_working and from_dsession: + if self.pending_groups: + dist_group_key = self.pending_groups.pop(0) + dist_group = self.dist_groups[dist_group_key] + nodes = cycle(self.nodes[0:dist_group['group_workers']]) + schedule_log = {n.gateway.id:[] for n in self.nodes[0:dist_group['group_workers']]} + for _ in range(len(dist_group['test_indices'])): + n = next(nodes) + #needs cleaner way to be identified + tests_per_node = self.dist_groups[dist_group_key]['pending_indices'][:1] + schedule_log[n.gateway.id].extend(tests_per_node) + + self._send_tests_group(n, 1, dist_group_key) + del self.dist_groups[dist_group_key] + message = f"\n[-] [csg] check_schedule: processed scheduling for {dist_group_key}: {' '.join([f'{nid} ({len(nt)})' for nid,nt in schedule_log.items()])}" + self.report_line(message) + + else: + pending = self.node2pending.get(node) + if len(pending) < 2: + self.report_line(f"[-] [csg] Shutting down {node.workerinput['workerid']} because only one case is pending") + node.shutdown() + + self.log("num items waiting for node:", len(self.pending)) + + def remove_node(self, node: WorkerController) -> str | None: + """Remove a node from the scheduler. + + This should be called either when the node crashed or at + shutdown time. In the former case any pending items assigned + to the node will be re-scheduled. Called by the + ``DSession.worker_workerfinished`` and + ``DSession.worker_errordown`` hooks. + + Return the item which was being executing while the node + crashed or None if the node has no more pending items. + + """ + pending = self.node2pending.pop(node) + if not pending: + return None + + # The node crashed, reassing pending items + assert self.collection is not None + crashitem = self.collection[pending.pop(0)] + self.pending.extend(pending) + for node in self.node2pending: + self.check_schedule(node) + return crashitem + + def schedule(self) -> None: + """Initiate distribution of the test collection. + + Initiate scheduling of the items across the nodes. If this + gets called again later it behaves the same as calling + ``.check_schedule()`` on all nodes so that newly added nodes + will start to be used. + + This is called by the ``DSession.worker_collectionfinish`` hook + if ``.collection_is_completed`` is True. + """ + assert self.collection_is_completed + + # Initial distribution already happened, reschedule on all nodes + if self.collection is not None: + for node in self.nodes: + self.check_schedule(node) + return + + # XXX allow nodes to have different collections + if not self._check_nodes_have_same_collection(): + self.log("**Different tests collected, aborting run**") + return + + # Collections are identical, create the index of pending items. + self.collection = next(iter(self.node2collection.values())) + self.pending[:] = range(len(self.collection)) + if not self.collection: + return + + if self.maxschedchunk is None: + self.maxschedchunk = len(self.collection) + + dist_groups = {} + + if self.is_first_time: + for i, test in enumerate(self.collection): + if '@' in test: + group_mark = test.split('@')[-1] + group_workers = int(group_mark.split('_')[-1]) + if group_workers > len(self.nodes): + # We can only distribute across as many nodes as we have available + # If a group requests more, we fallback to our actual max + group_workers = len(self.nodes) + else: + group_mark = 'default' + group_workers = len(self.nodes) + existing_tests = dist_groups.get(group_mark, {}).get('tests', []) + existing_tests.append(test) + existing_indices = dist_groups.get(group_mark, {}).get('test_indices', []) + existing_indices.append(i) + + dist_groups[group_mark] = { + 'tests': existing_tests, + 'group_workers': group_workers, + 'test_indices': existing_indices, + 'pending_indices': existing_indices + } + self.dist_groups = dist_groups + self.pending_groups = list(dist_groups.keys()) + self.is_first_time = False + else: + for node in self.nodes: + self.check_schedule(node) + + if not self.pending_groups: + return + dist_group_key = self.pending_groups.pop(0) + dist_group = self.dist_groups[dist_group_key] + nodes = cycle(self.nodes[0:dist_group['group_workers']]) + schedule_log = {n.gateway.id: [] for n in self.nodes[0:dist_group['group_workers']]} + for _ in range(len(dist_group['test_indices'])): + n = next(nodes) + # needs cleaner way to be identified + tests_per_node = self.dist_groups[dist_group_key]['pending_indices'][:1] + schedule_log[n.gateway.id].extend(tests_per_node) + self._send_tests_group(n, 1, dist_group_key) + del self.dist_groups[dist_group_key] + message = f"\n[-] [csg] schedule: processed scheduling for {dist_group_key}: {' '.join([f'{nid} ({len(nt)})' for nid, nt in schedule_log.items()])}" + self.report_line(message) + + def _send_tests(self, node: WorkerController, num: int) -> None: + tests_per_node = self.pending[:num] + if tests_per_node: + del self.pending[:num] + self.node2pending[node].extend(tests_per_node) + node.send_runtest_some(tests_per_node) + + def _send_tests_group(self, node: WorkerController, num: int, dist_group_key) -> None: + tests_per_node = self.dist_groups[dist_group_key]['pending_indices'][:num] + if tests_per_node: + del self.dist_groups[dist_group_key]['pending_indices'][:num] + for test_index in tests_per_node: + self.pending.remove(test_index) + self.node2pending[node].extend(tests_per_node) + node.send_runtest_some(tests_per_node) + + + def _check_nodes_have_same_collection(self) -> bool: + """Return True if all nodes have collected the same items. + + If collections differ, this method returns False while logging + the collection differences and posting collection errors to + pytest_collectreport hook. + """ + node_collection_items = list(self.node2collection.items()) + first_node, col = node_collection_items[0] + same_collection = True + for node, collection in node_collection_items[1:]: + msg = report_collection_diff( + col, collection, first_node.gateway.id, node.gateway.id + ) + if msg: + same_collection = False + self.log(msg) + if self.config is not None: + rep = pytest.CollectReport( + nodeid=node.gateway.id, + outcome="failed", + longrepr=msg, + result=[], + ) + self.config.hook.pytest_collectreport(report=rep) + + return same_collection + + def report_line(self, line: str) -> None: + if self.terminal and self.config.option.verbose >= 0: + self.terminal.write_line(line) \ No newline at end of file diff --git a/src/xdist/workermanage.py b/src/xdist/workermanage.py index 44d1be4c..2b410108 100644 --- a/src/xdist/workermanage.py +++ b/src/xdist/workermanage.py @@ -11,6 +11,7 @@ from typing import Literal from typing import Sequence from typing import Union +from typing import Optional import uuid import warnings @@ -82,9 +83,12 @@ def rsync_roots(self, gateway: execnet.Gateway) -> None: def setup_nodes( self, putevent: Callable[[tuple[str, dict[str, Any]]], None], + max_nodes: Optional[int] = None ) -> list[WorkerController]: self.config.hook.pytest_xdist_setupnodes(config=self.config, specs=self.specs) self.trace("setting up nodes") + if max_nodes: + return [self.setup_node(spec, putevent) for spec in self.specs[0:max_nodes]] return [self.setup_node(spec, putevent) for spec in self.specs] def setup_node( diff --git a/xdist-testing-ntop/README.md b/xdist-testing-ntop/README.md new file mode 100644 index 00000000..4306d4f9 --- /dev/null +++ b/xdist-testing-ntop/README.md @@ -0,0 +1,14 @@ +# Testing pytest-xdist Scheduler Custumization +- Run with `python -m pytest test.py` to run tests +- Run with `python -m pytest test.py -n --dist customgroup --junit-xml results.xml -v` to use new scheduler + report to xml and have verbose terminal output + - Verbose terminal output is semi-required when using customgroup. It allows the user to confirm the correct tests are running with the correct number of processes. + +## Notes: +- Install local pytest with `python -m pip install .` or `python -m pip install -e .` + - When ran from root of `pytest-xdist` repository + +## Using Customgroup +- Add pytest mark `xdist_custom(name="_")` to tests + - Tests without this marking will use the maximum worker count specified by `-n` argument +- Add `xdist_custom` to `pytest.ini` to avoid warnings about unregistered marks +- Run tests as detailed above diff --git a/xdist-testing-ntop/pytest.ini b/xdist-testing-ntop/pytest.ini new file mode 100644 index 00000000..9a767524 --- /dev/null +++ b/xdist-testing-ntop/pytest.ini @@ -0,0 +1,5 @@ +[pytest] +log_cli_level=0 + +markers= + xdist_custom \ No newline at end of file diff --git a/xdist-testing-ntop/test.py b/xdist-testing-ntop/test.py new file mode 100644 index 00000000..ff8f9996 --- /dev/null +++ b/xdist-testing-ntop/test.py @@ -0,0 +1,94 @@ +import pytest +import time + + +@pytest.mark.xdist_custom(name="low_4") +def test_1(): + time.sleep(2) + assert True + +@pytest.mark.xdist_custom(name="low_4") +def test_2(): + time.sleep(2) + assert True + +@pytest.mark.xdist_custom(name="low_4") +def test_3(): + time.sleep(2) + assert True + +@pytest.mark.xdist_custom(name="low_4") +def test_4(): + time.sleep(2) + assert True + +# @pytest.mark.xdist_custom(name="low_4") +# def test_4a(): +# time.sleep(2) +# assert True +# +# @pytest.mark.xdist_custom(name="low_4") +# def test_4b(): +# time.sleep(2) +# assert True +# +# @pytest.mark.xdist_custom(name="low_4") +# def test_4c(): +# time.sleep(2) +# assert True +# +# @pytest.mark.xdist_custom(name="low_4") +# def test_4d(): +# time.sleep(2) +# assert True +# +# @pytest.mark.xdist_custom(name="low_4") +# def test_4e(): +# time.sleep(2) +# assert True + +@pytest.mark.xdist_custom(name="med_2") +def test_5(): + time.sleep(3) + assert True + +@pytest.mark.xdist_custom(name="med_2") +def test_6(): + time.sleep(3) + assert True + +@pytest.mark.xdist_custom(name="med_2") +def test_7(): + time.sleep(3) + assert True + +@pytest.mark.xdist_custom(name="med_2") +def test_8(): + time.sleep(3) + assert True + +@pytest.mark.xdist_custom(name="high_1") +def test_9(): + time.sleep(5) + assert True + +@pytest.mark.xdist_custom(name="high_1") +def test_10(): + time.sleep(5) + assert True + +def test_11(): + time.sleep(1) + assert True + +def test_12(): + time.sleep(1) + assert True + +def test_13(): + time.sleep(1) + assert True + +def test_14(): + time.sleep(1) + assert True From 1935a2b33c8abe0b6afcfba0f4ccc5c326d119b2 Mon Sep 17 00:00:00 2001 From: Chris Iaconetti Date: Tue, 17 Sep 2024 16:46:33 -0400 Subject: [PATCH 2/5] TUR-21619: Patch pytest error (#3) * fix pytest error * clearer if --- src/xdist/dsession.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/xdist/dsession.py b/src/xdist/dsession.py index 5b56a551..2eff34c1 100644 --- a/src/xdist/dsession.py +++ b/src/xdist/dsession.py @@ -402,14 +402,17 @@ def worker_collectionfinish( self.terminal.write_line("") if self.config.option.verbose > 0: self.report_line(f"[-] [dse] scheduling tests via {self.sched.__class__.__name__}") - if isinstance(self.sched, CustomGroup) and self.ready_to_run_tests and self.are_all_active_nodes_collected(): - # we're coming back here after finishing a batch of tests - so start the next batch - self.reschedule() - self.reset_nodes_if_needed() + if isinstance(self.sched, CustomGroup): + if self.ready_to_run_tests and self.are_all_active_nodes_collected(): + # we're coming back here after finishing a batch of tests - so start the next batch + self.reschedule() + self.reset_nodes_if_needed() + else: + self.ready_to_run_tests = True + self.sched.schedule() + self.reset_nodes_if_needed() else: - self.ready_to_run_tests = True self.sched.schedule() - self.reset_nodes_if_needed() def worker_logstart( self, From f73e306f92b67a8883751b02ffef52f02b1384ad Mon Sep 17 00:00:00 2001 From: Chris Iaconetti Date: Wed, 18 Sep 2024 17:02:57 -0400 Subject: [PATCH 3/5] TUR-21619: Update docstring+typing (#4) * update docstring * idk --- src/xdist/scheduler/customgroup.py | 108 +++++++++++++++++++---------- 1 file changed, 71 insertions(+), 37 deletions(-) diff --git a/src/xdist/scheduler/customgroup.py b/src/xdist/scheduler/customgroup.py index 9a491965..fec7001b 100644 --- a/src/xdist/scheduler/customgroup.py +++ b/src/xdist/scheduler/customgroup.py @@ -1,43 +1,82 @@ -"""Run tests across a variable number of nodes based on custom groups. - -# TODO: This is more of a spec/description, update docs/remove this section document within the class -Example: - - 10 test cases exist - - 4 test cases are marked with @pytest.mark.low - - 4 test cases are marked with @pytest.mark.medium - - 2 test cases are marked with @pytest.mark.high - - A pytest.ini file contains the following lines: -[pytest] - -markers= - low: 4 - medium: 2 - high: 1 - -Then the 4 low test cases will be ran on 4 workers (distributed evenly amongst the 4, as the load.py scheduler functions) -Then the 4 medium test cases will be ran on 2 workers (again, distributed evenly), only after the low test cases are complete (or before they start). -Then the 2 high test cases will be ran on 1 worker (distributed evenly), only after the low and medium test cases are complete (or before they start). - -This allows a pytest user more custom control over processing tests. -One potential application would be measuring the resource utilization of all test cases. Test cases that are not -resource intensive can be ran on many workers, and more resource intensive test cases can be ran once the low -resource consuming tests are done on fewer workers, such that resource consumption does not exceed available resources. -""" from __future__ import annotations from itertools import cycle -from typing import Sequence +from typing import Sequence, Any import pytest -from xdist.remote import Producer, WorkerInteractor +from xdist.remote import Producer from xdist.report import report_collection_diff from xdist.workermanage import parse_spec_config from xdist.workermanage import WorkerController class CustomGroup: - """ - # TODO: update docs here + """Implement grouped load scheduling across a variable number of nodes. + + This distributes tests into groups based on the presence of xdist_custom pytest marks. + Groups are ran sequentially with tests within each group running in parallel. + The number of workers assigned to each group is based on the xdist_custom pytest mark. + Tests without the xdist_custom pytest mark are assigned to a "default" group and run + using all available workers. + + Example: + Consider 12 pytest test cases. + - 4 test cases are marked with @pytest.mark.xdist_custom(name="low_4") + - 2 test cases are marked with @pytest.mark.xdist_custom(name="med_2") + - 2 test cases are marked with @pytest.mark.xdist_custom(name="high_1") + - 4 test cases are not marked with a xdist_custom mark. + Consider the pytest run was initiated with 4 workers (-n 4) + - The 4 test cases marked with "low_4" would run in a group using 4 workers + - The 2 test cases marked with "med_2" would run in a group using 2 workers + - The 2 test cases marked with "high_1" would run in a group with 1 worker + - The 4 unmarked test cases would run in a group using 4 workers. + Only one group would run at any given time. For example, while the "high_1" tests are executing, + the other pending test groups would not be scheduled or excuting. The order in which groups + are executed is variable. For example, "high_1" may execute first, or it may execute second, etc. + If a group pytest mark specifies more workers than the pytest run is initialized with the + number of workers the run was initialized with will be used instead (-n argument is a maximum). + + Attributes:: + + :terminal: Terminal reporter for writing terminal output + + :numnodes: The expected number of nodes taking part. The actual + number of nodes will vary during the scheduler's lifetime as + nodes are added by the DSession as they are brought up and + removed either because of a dead node or normal shutdown. This + number is primarily used to know when the initial collection is + completed. + + :node2collection: Map of nodes and their test collection. All + collections should always be identical. + + :node2pending: Map of nodes and the indices of their pending + tests. The indices are an index into ``.pending`` (which is + identical to their own collection stored in + ``.node2collection``). + + :pending: List of indices of globally pending tests. These are + tests which have not yet been allocated to a chunk for a node + to process. + + :collection: The one collection once it is validated to be + identical between all the nodes. It is initialised to None + until ``.schedule()`` is called. + + :log: A py.log.Producer instance. + + :config: Config object, used for handling hooks. + + :dist_groups: Execution groups. Updated based on xdist_custom pytest marks. + Maps group names to tests, test indices, pending indices, and stores the number of workers to use + for that test execution group. + + :pending_groups: List of dist_group keys that are pending + + :is_first_time: Boolean to track whether we have called schedule() before or not + + :do_resched: Boolean to track whether we should schedule another distribution group. + Accessed in dsession.py """ def __init__(self, config: pytest.Config, log: Producer | None = None) -> None: @@ -52,12 +91,10 @@ def __init__(self, config: pytest.Config, log: Producer | None = None) -> None: else: self.log = log.loadsched self.config = config - self.maxschedchunk = self.config.getoption("maxschedchunk") - # TODO: Type annotation incorrect - self.dist_groups: dict[str, str] = {} + self.dist_groups: dict[str, Any] = {} self.pending_groups: list[str] = [] - self.is_first_time = True - self.do_resched = False + self.is_first_time: bool = True + self.do_resched: bool = False @property def nodes(self) -> list[WorkerController]: @@ -264,9 +301,6 @@ def schedule(self) -> None: if not self.collection: return - if self.maxschedchunk is None: - self.maxschedchunk = len(self.collection) - dist_groups = {} if self.is_first_time: From c82840f03b3b8a02bc4c033bdc7f72cc652f6254 Mon Sep 17 00:00:00 2001 From: Chris Iaconetti Date: Fri, 20 Sep 2024 14:31:37 -0400 Subject: [PATCH 4/5] Merge pytest-xdist latest changes + correct lint errors in nTop fork (#6) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [pre-commit.ci] pre-commit autoupdate (#1120) updates: - [github.com/astral-sh/ruff-pre-commit: v0.6.1 → v0.6.2](https://github.com/astral-sh/ruff-pre-commit/compare/v0.6.1...v0.6.2) - [github.com/pre-commit/mirrors-mypy: v1.11.1 → v1.11.2](https://github.com/pre-commit/mirrors-mypy/compare/v1.11.1...v1.11.2) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> * build(deps): bump pypa/gh-action-pypi-publish (#1123) Bumps the github-actions group with 1 update: [pypa/gh-action-pypi-publish](https://github.com/pypa/gh-action-pypi-publish). Updates `pypa/gh-action-pypi-publish` from 1.9.0 to 1.10.0 - [Release notes](https://github.com/pypa/gh-action-pypi-publish/releases) - [Commits](https://github.com/pypa/gh-action-pypi-publish/compare/v1.9.0...v1.10.0) --- updated-dependencies: - dependency-name: pypa/gh-action-pypi-publish dependency-type: direct:production update-type: version-update:semver-minor dependency-group: github-actions ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * [pre-commit.ci] pre-commit autoupdate (#1124) updates: - [github.com/astral-sh/ruff-pre-commit: v0.6.2 → v0.6.3](https://github.com/astral-sh/ruff-pre-commit/compare/v0.6.2...v0.6.3) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> * Publish package with attestations (#1125) Follow up to https://github.com/pytest-dev/pytest-xdist/pull/1123. * build(deps): bump the github-actions group with 2 updates (#1127) Bumps the github-actions group with 2 updates: [hynek/build-and-inspect-python-package](https://github.com/hynek/build-and-inspect-python-package) and [pypa/gh-action-pypi-publish](https://github.com/pypa/gh-action-pypi-publish). Updates `hynek/build-and-inspect-python-package` from 2.8 to 2.9 - [Release notes](https://github.com/hynek/build-and-inspect-python-package/releases) - [Changelog](https://github.com/hynek/build-and-inspect-python-package/blob/main/CHANGELOG.md) - [Commits](https://github.com/hynek/build-and-inspect-python-package/compare/v2.8...v2.9) Updates `pypa/gh-action-pypi-publish` from 1.10.0 to 1.10.1 - [Release notes](https://github.com/pypa/gh-action-pypi-publish/releases) - [Commits](https://github.com/pypa/gh-action-pypi-publish/compare/v1.10.0...v1.10.1) --- updated-dependencies: - dependency-name: hynek/build-and-inspect-python-package dependency-type: direct:production update-type: version-update:semver-minor dependency-group: github-actions - dependency-name: pypa/gh-action-pypi-publish dependency-type: direct:production update-type: version-update:semver-patch dependency-group: github-actions ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * [pre-commit.ci] pre-commit autoupdate (#1128) updates: - [github.com/astral-sh/ruff-pre-commit: v0.6.3 → v0.6.4](https://github.com/astral-sh/ruff-pre-commit/compare/v0.6.3...v0.6.4) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> * [pre-commit.ci] pre-commit autoupdate (#1129) updates: - [github.com/astral-sh/ruff-pre-commit: v0.6.4 → v0.6.5](https://github.com/astral-sh/ruff-pre-commit/compare/v0.6.4...v0.6.5) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> * all ruff checks should pass * mypy fixes * undo protocol/other scheduler changes, add asserts for mypy check * undo newline add/remove changes in diff for other schedulers * remove unused function from dsession.py --------- Signed-off-by: dependabot[bot] Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Bruno Oliveira --- .github/workflows/deploy.yml | 6 ++-- .github/workflows/test.yml | 2 +- .pre-commit-config.yaml | 4 +-- src/xdist/dsession.py | 53 ++++++++++++++++++------------ src/xdist/scheduler/__init__.py | 2 +- src/xdist/scheduler/customgroup.py | 28 +++++++++------- src/xdist/workermanage.py | 3 +- xdist-testing-ntop/test.py | 3 +- 8 files changed, 60 insertions(+), 41 deletions(-) diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 9b202bd4..f739d297 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -19,7 +19,7 @@ jobs: - uses: actions/checkout@v4 - name: Build and Check Package - uses: hynek/build-and-inspect-python-package@v2.8 + uses: hynek/build-and-inspect-python-package@v2.9 deploy: needs: package @@ -39,7 +39,9 @@ jobs: path: dist - name: Publish package to PyPI - uses: pypa/gh-action-pypi-publish@v1.9.0 + uses: pypa/gh-action-pypi-publish@v1.10.1 + with: + attestations: true - name: Push tag run: | diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b81f37e3..ca33e3a1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -22,7 +22,7 @@ jobs: steps: - uses: actions/checkout@v4 - name: Build and Check Package - uses: hynek/build-and-inspect-python-package@v2.8 + uses: hynek/build-and-inspect-python-package@v2.9 test: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index cb240bdb..6fffdf2e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/astral-sh/ruff-pre-commit - rev: "v0.6.1" + rev: "v0.6.5" hooks: - id: ruff args: ["--fix"] @@ -23,7 +23,7 @@ repos: language: python additional_dependencies: [pygments, restructuredtext_lint] - repo: https://github.com/pre-commit/mirrors-mypy - rev: v1.11.1 + rev: v1.11.2 hooks: - id: mypy files: ^(src/|testing/) diff --git a/src/xdist/dsession.py b/src/xdist/dsession.py index 2eff34c1..4c3bcec2 100644 --- a/src/xdist/dsession.py +++ b/src/xdist/dsession.py @@ -5,16 +5,18 @@ from queue import Empty from queue import Queue import sys +import traceback from typing import Any +from typing import Callable from typing import Sequence import warnings -import traceback import execnet import pytest from xdist.remote import Producer from xdist.remote import WorkerInfo +from xdist.scheduler import CustomGroup from xdist.scheduler import EachScheduling from xdist.scheduler import LoadFileScheduling from xdist.scheduler import LoadGroupScheduling @@ -22,7 +24,6 @@ from xdist.scheduler import LoadScopeScheduling from xdist.scheduler import Scheduling from xdist.scheduler import WorkStealingScheduling -from xdist.scheduler import CustomGroup from xdist.workermanage import NodeManager from xdist.workermanage import WorkerController @@ -60,14 +61,14 @@ def __init__(self, config: pytest.Config) -> None: self._failed_collection_errors: dict[object, bool] = {} self._active_nodes: set[WorkerController] = set() self._failed_nodes_count = 0 - self.saved_put = None + self.saved_put: Callable[[tuple[str, dict[str, Any]]], None] self.remake_nodes = False self.ready_to_run_tests = False self._max_worker_restart = get_default_max_worker_restart(self.config) # summary message to print at the end of the session self._summary_report: str | None = None self.terminal = config.pluginmanager.getplugin("terminalreporter") - self.worker_status: dict[WorkerController, str] = {} + self.worker_status: dict[str, str] = {} if self.terminal: self.trdist = TerminalDistReporter(config) config.pluginmanager.register(self.trdist, "terminaldistreporter") @@ -180,45 +181,46 @@ def loop_once(self) -> None: self.triggershutdown() - def is_node_finishing(self, node: WorkerController): + def is_node_finishing(self, node: WorkerController) -> bool: """Check if a test worker is considered to be finishing. Evaluate whether it's on its last test, or if no tests are pending. """ + assert self.sched is not None + assert type(self.sched) is CustomGroup pending = self.sched.node2pending.get(node) return pending is not None and len(pending) < 2 - def is_node_clear(self, node: WorkerController): - """Check if a test worker has no pending tests.""" - pending = self.sched.node2pending.get(node) - return pending is None or len(pending) == 0 - - - def are_all_nodes_finishing(self): + def are_all_nodes_finishing(self) -> bool: """Check if all workers are finishing (See 'is_node_finishing' above).""" + assert self.sched is not None return all(self.is_node_finishing(node) for node in self.sched.nodes) - def are_all_nodes_done(self): + def are_all_nodes_done(self) -> bool: """Check if all nodes have reported to finish.""" return all(s == "finished" for s in self.worker_status.values()) - def are_all_active_nodes_collected(self): + def are_all_active_nodes_collected(self) -> bool: """Check if all nodes have reported collection to be complete.""" if not all(n.gateway.id in self.worker_status for n in self._active_nodes): return False return all(self.worker_status[n.gateway.id] == "collected" for n in self._active_nodes) - def reset_nodes_if_needed(self): + def reset_nodes_if_needed(self) -> None: + assert self.sched is not None + assert type(self.sched) is CustomGroup if self.are_all_nodes_finishing() and self.ready_to_run_tests and not self.sched.do_resched: self.reset_nodes() - def reset_nodes(self): + def reset_nodes(self) -> None: """Issue shutdown notices to workers for rescheduling purposes.""" + assert self.sched is not None + assert type(self.sched) is CustomGroup if len(self.sched.pending) != 0: self.remake_nodes = True for node in self.sched.nodes: @@ -226,22 +228,28 @@ def reset_nodes(self): node.shutdown() - def reschedule(self): + def reschedule(self) -> None: """Reschedule tests.""" + assert self.sched is not None + assert type(self.sched) is CustomGroup self.sched.do_resched = False self.sched.check_schedule(self.sched.nodes[0], 1.0, True) - def prepare_for_reschedule(self): + def prepare_for_reschedule(self) -> None: """Update test workers and their status tracking so rescheduling is ready.""" + assert type(self.sched) is CustomGroup + assert self.sched is not None self.remake_nodes = False num_workers = self.sched.dist_groups[self.sched.pending_groups[0]]['group_workers'] self.trdist._status = {} + assert self.nodemanager is not None new_nodes = self.nodemanager.setup_nodes(self.saved_put, num_workers) self.worker_status = {} self._active_nodes = set() self._active_nodes.update(new_nodes) self.sched.node2pending = {} + assert type(self.sched) is CustomGroup self.sched.do_resched = True # @@ -287,7 +295,9 @@ def worker_workerfinished(self, node: WorkerController) -> None: try: self.prepare_for_reschedule() except Exception as e: - self.shouldstop = f"Exception caught during preparation for rescheduling. Giving up.\n{''.join(traceback.format_exception(e))}" + msg = ("Exception caught during preparation for rescheduling. Giving up." + f"\n{''.join(traceback.format_exception(e))}") + self.shouldstop = msg return self.config.hook.pytest_testnodedown(node=node, error=None) if node.workeroutput["exitstatus"] == 2: # keyboard-interrupt @@ -308,10 +318,11 @@ def worker_workerfinished(self, node: WorkerController) -> None: assert not crashitem, (crashitem, node) self._active_nodes.remove(node) - def update_worker_status(self, node, status): + def update_worker_status(self, node: WorkerController, status: str) -> None: """Track the worker status. - Can be used at callbacks like 'worker_workerfinished' so we remember wchic event was reported last by each worker. + Can be used at callbacks like 'worker_workerfinished' so we remember wchic event + was reported last by each worker. """ self.worker_status[node.workerinfo["id"]] = status diff --git a/src/xdist/scheduler/__init__.py b/src/xdist/scheduler/__init__.py index 34b791d7..6395e2a5 100644 --- a/src/xdist/scheduler/__init__.py +++ b/src/xdist/scheduler/__init__.py @@ -1,6 +1,6 @@ +from xdist.scheduler.customgroup import CustomGroup as CustomGroup from xdist.scheduler.each import EachScheduling as EachScheduling from xdist.scheduler.load import LoadScheduling as LoadScheduling -from xdist.scheduler.customgroup import CustomGroup as CustomGroup from xdist.scheduler.loadfile import LoadFileScheduling as LoadFileScheduling from xdist.scheduler.loadgroup import LoadGroupScheduling as LoadGroupScheduling from xdist.scheduler.loadscope import LoadScopeScheduling as LoadScopeScheduling diff --git a/src/xdist/scheduler/customgroup.py b/src/xdist/scheduler/customgroup.py index fec7001b..5d1439a2 100644 --- a/src/xdist/scheduler/customgroup.py +++ b/src/xdist/scheduler/customgroup.py @@ -1,7 +1,8 @@ from __future__ import annotations from itertools import cycle -from typing import Sequence, Any +from typing import Any +from typing import Sequence import pytest @@ -10,6 +11,7 @@ from xdist.workermanage import parse_spec_config from xdist.workermanage import WorkerController + class CustomGroup: """Implement grouped load scheduling across a variable number of nodes. @@ -203,7 +205,7 @@ def remove_pending_tests_from_node( ) -> None: raise NotImplementedError() - def check_schedule(self, node: WorkerController, duration: float = 0, from_dsession=False) -> None: + def check_schedule(self, node: WorkerController, duration: float = 0, from_dsession: bool = False) -> None: """Maybe schedule new items on the node. If there are any globally pending nodes left then this will @@ -226,7 +228,7 @@ def check_schedule(self, node: WorkerController, duration: float = 0, from_dsess dist_group_key = self.pending_groups.pop(0) dist_group = self.dist_groups[dist_group_key] nodes = cycle(self.nodes[0:dist_group['group_workers']]) - schedule_log = {n.gateway.id:[] for n in self.nodes[0:dist_group['group_workers']]} + schedule_log: dict[str, Any] = {n.gateway.id:[] for n in self.nodes[0:dist_group['group_workers']]} for _ in range(len(dist_group['test_indices'])): n = next(nodes) #needs cleaner way to be identified @@ -235,13 +237,16 @@ def check_schedule(self, node: WorkerController, duration: float = 0, from_dsess self._send_tests_group(n, 1, dist_group_key) del self.dist_groups[dist_group_key] - message = f"\n[-] [csg] check_schedule: processed scheduling for {dist_group_key}: {' '.join([f'{nid} ({len(nt)})' for nid,nt in schedule_log.items()])}" + message = (f"\n[-] [csg] check_schedule: processed scheduling for {dist_group_key}:" + f" {' '.join([f'{nid} ({len(nt)})' for nid,nt in schedule_log.items()])}") self.report_line(message) else: - pending = self.node2pending.get(node) + pending = self.node2pending.get(node, []) if len(pending) < 2: - self.report_line(f"[-] [csg] Shutting down {node.workerinput['workerid']} because only one case is pending") + self.report_line( + f"[-] [csg] Shutting down {node.workerinput['workerid']} because only one case is pending" + ) node.shutdown() self.log("num items waiting for node:", len(self.pending)) @@ -301,7 +306,7 @@ def schedule(self) -> None: if not self.collection: return - dist_groups = {} + dist_groups: dict[str, dict[Any, Any]] = {} if self.is_first_time: for i, test in enumerate(self.collection): @@ -338,7 +343,7 @@ def schedule(self) -> None: dist_group_key = self.pending_groups.pop(0) dist_group = self.dist_groups[dist_group_key] nodes = cycle(self.nodes[0:dist_group['group_workers']]) - schedule_log = {n.gateway.id: [] for n in self.nodes[0:dist_group['group_workers']]} + schedule_log: dict[str, Any] = {n.gateway.id: [] for n in self.nodes[0:dist_group['group_workers']]} for _ in range(len(dist_group['test_indices'])): n = next(nodes) # needs cleaner way to be identified @@ -346,7 +351,8 @@ def schedule(self) -> None: schedule_log[n.gateway.id].extend(tests_per_node) self._send_tests_group(n, 1, dist_group_key) del self.dist_groups[dist_group_key] - message = f"\n[-] [csg] schedule: processed scheduling for {dist_group_key}: {' '.join([f'{nid} ({len(nt)})' for nid, nt in schedule_log.items()])}" + message = ("\n[-] [csg] schedule: processed scheduling for " + f"{dist_group_key}: {' '.join([f'{nid} ({len(nt)})' for nid, nt in schedule_log.items()])}") self.report_line(message) def _send_tests(self, node: WorkerController, num: int) -> None: @@ -356,7 +362,7 @@ def _send_tests(self, node: WorkerController, num: int) -> None: self.node2pending[node].extend(tests_per_node) node.send_runtest_some(tests_per_node) - def _send_tests_group(self, node: WorkerController, num: int, dist_group_key) -> None: + def _send_tests_group(self, node: WorkerController, num: int, dist_group_key: str) -> None: tests_per_node = self.dist_groups[dist_group_key]['pending_indices'][:num] if tests_per_node: del self.dist_groups[dist_group_key]['pending_indices'][:num] @@ -396,4 +402,4 @@ def _check_nodes_have_same_collection(self) -> bool: def report_line(self, line: str) -> None: if self.terminal and self.config.option.verbose >= 0: - self.terminal.write_line(line) \ No newline at end of file + self.terminal.write_line(line) diff --git a/src/xdist/workermanage.py b/src/xdist/workermanage.py index 2b410108..963dbb8a 100644 --- a/src/xdist/workermanage.py +++ b/src/xdist/workermanage.py @@ -11,7 +11,6 @@ from typing import Literal from typing import Sequence from typing import Union -from typing import Optional import uuid import warnings @@ -83,7 +82,7 @@ def rsync_roots(self, gateway: execnet.Gateway) -> None: def setup_nodes( self, putevent: Callable[[tuple[str, dict[str, Any]]], None], - max_nodes: Optional[int] = None + max_nodes: int | None = None ) -> list[WorkerController]: self.config.hook.pytest_xdist_setupnodes(config=self.config, specs=self.specs) self.trace("setting up nodes") diff --git a/xdist-testing-ntop/test.py b/xdist-testing-ntop/test.py index ff8f9996..4b370b6c 100644 --- a/xdist-testing-ntop/test.py +++ b/xdist-testing-ntop/test.py @@ -1,6 +1,7 @@ -import pytest import time +import pytest + @pytest.mark.xdist_custom(name="low_4") def test_1(): From d62d632e848b70cbb7af4e70d0d89de6bc909214 Mon Sep 17 00:00:00 2001 From: Chris Iaconetti Date: Fri, 20 Sep 2024 15:56:38 -0400 Subject: [PATCH 5/5] Dev/ci/tur 21619 lint merge 2 (#7) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [pre-commit.ci] pre-commit autoupdate (#1120) updates: - [github.com/astral-sh/ruff-pre-commit: v0.6.1 → v0.6.2](https://github.com/astral-sh/ruff-pre-commit/compare/v0.6.1...v0.6.2) - [github.com/pre-commit/mirrors-mypy: v1.11.1 → v1.11.2](https://github.com/pre-commit/mirrors-mypy/compare/v1.11.1...v1.11.2) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> * build(deps): bump pypa/gh-action-pypi-publish (#1123) Bumps the github-actions group with 1 update: [pypa/gh-action-pypi-publish](https://github.com/pypa/gh-action-pypi-publish). Updates `pypa/gh-action-pypi-publish` from 1.9.0 to 1.10.0 - [Release notes](https://github.com/pypa/gh-action-pypi-publish/releases) - [Commits](https://github.com/pypa/gh-action-pypi-publish/compare/v1.9.0...v1.10.0) --- updated-dependencies: - dependency-name: pypa/gh-action-pypi-publish dependency-type: direct:production update-type: version-update:semver-minor dependency-group: github-actions ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * [pre-commit.ci] pre-commit autoupdate (#1124) updates: - [github.com/astral-sh/ruff-pre-commit: v0.6.2 → v0.6.3](https://github.com/astral-sh/ruff-pre-commit/compare/v0.6.2...v0.6.3) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> * Publish package with attestations (#1125) Follow up to https://github.com/pytest-dev/pytest-xdist/pull/1123. * build(deps): bump the github-actions group with 2 updates (#1127) Bumps the github-actions group with 2 updates: [hynek/build-and-inspect-python-package](https://github.com/hynek/build-and-inspect-python-package) and [pypa/gh-action-pypi-publish](https://github.com/pypa/gh-action-pypi-publish). Updates `hynek/build-and-inspect-python-package` from 2.8 to 2.9 - [Release notes](https://github.com/hynek/build-and-inspect-python-package/releases) - [Changelog](https://github.com/hynek/build-and-inspect-python-package/blob/main/CHANGELOG.md) - [Commits](https://github.com/hynek/build-and-inspect-python-package/compare/v2.8...v2.9) Updates `pypa/gh-action-pypi-publish` from 1.10.0 to 1.10.1 - [Release notes](https://github.com/pypa/gh-action-pypi-publish/releases) - [Commits](https://github.com/pypa/gh-action-pypi-publish/compare/v1.10.0...v1.10.1) --- updated-dependencies: - dependency-name: hynek/build-and-inspect-python-package dependency-type: direct:production update-type: version-update:semver-minor dependency-group: github-actions - dependency-name: pypa/gh-action-pypi-publish dependency-type: direct:production update-type: version-update:semver-patch dependency-group: github-actions ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * [pre-commit.ci] pre-commit autoupdate (#1128) updates: - [github.com/astral-sh/ruff-pre-commit: v0.6.3 → v0.6.4](https://github.com/astral-sh/ruff-pre-commit/compare/v0.6.3...v0.6.4) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> * [pre-commit.ci] pre-commit autoupdate (#1129) updates: - [github.com/astral-sh/ruff-pre-commit: v0.6.4 → v0.6.5](https://github.com/astral-sh/ruff-pre-commit/compare/v0.6.4...v0.6.5) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> * run ruff format --------- Signed-off-by: dependabot[bot] Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Bruno Oliveira --- src/xdist/dsession.py | 36 ++++++++------ src/xdist/remote.py | 4 +- src/xdist/scheduler/customgroup.py | 77 ++++++++++++++++++------------ src/xdist/workermanage.py | 2 +- xdist-testing-ntop/test.py | 14 ++++++ 5 files changed, 86 insertions(+), 47 deletions(-) diff --git a/src/xdist/dsession.py b/src/xdist/dsession.py index 4c3bcec2..3ead8b55 100644 --- a/src/xdist/dsession.py +++ b/src/xdist/dsession.py @@ -180,7 +180,6 @@ def loop_once(self) -> None: if self.sched.tests_finished: self.triggershutdown() - def is_node_finishing(self, node: WorkerController) -> bool: """Check if a test worker is considered to be finishing. @@ -191,32 +190,33 @@ def is_node_finishing(self, node: WorkerController) -> bool: pending = self.sched.node2pending.get(node) return pending is not None and len(pending) < 2 - def are_all_nodes_finishing(self) -> bool: """Check if all workers are finishing (See 'is_node_finishing' above).""" assert self.sched is not None return all(self.is_node_finishing(node) for node in self.sched.nodes) - def are_all_nodes_done(self) -> bool: """Check if all nodes have reported to finish.""" return all(s == "finished" for s in self.worker_status.values()) - def are_all_active_nodes_collected(self) -> bool: """Check if all nodes have reported collection to be complete.""" if not all(n.gateway.id in self.worker_status for n in self._active_nodes): return False - return all(self.worker_status[n.gateway.id] == "collected" for n in self._active_nodes) - + return all( + self.worker_status[n.gateway.id] == "collected" for n in self._active_nodes + ) def reset_nodes_if_needed(self) -> None: assert self.sched is not None assert type(self.sched) is CustomGroup - if self.are_all_nodes_finishing() and self.ready_to_run_tests and not self.sched.do_resched: + if ( + self.are_all_nodes_finishing() + and self.ready_to_run_tests + and not self.sched.do_resched + ): self.reset_nodes() - def reset_nodes(self) -> None: """Issue shutdown notices to workers for rescheduling purposes.""" assert self.sched is not None @@ -227,7 +227,6 @@ def reset_nodes(self) -> None: if self.is_node_finishing(node): node.shutdown() - def reschedule(self) -> None: """Reschedule tests.""" assert self.sched is not None @@ -235,13 +234,14 @@ def reschedule(self) -> None: self.sched.do_resched = False self.sched.check_schedule(self.sched.nodes[0], 1.0, True) - def prepare_for_reschedule(self) -> None: """Update test workers and their status tracking so rescheduling is ready.""" assert type(self.sched) is CustomGroup assert self.sched is not None self.remake_nodes = False - num_workers = self.sched.dist_groups[self.sched.pending_groups[0]]['group_workers'] + num_workers = self.sched.dist_groups[self.sched.pending_groups[0]][ + "group_workers" + ] self.trdist._status = {} assert self.nodemanager is not None new_nodes = self.nodemanager.setup_nodes(self.saved_put, num_workers) @@ -295,8 +295,10 @@ def worker_workerfinished(self, node: WorkerController) -> None: try: self.prepare_for_reschedule() except Exception as e: - msg = ("Exception caught during preparation for rescheduling. Giving up." - f"\n{''.join(traceback.format_exception(e))}") + msg = ( + "Exception caught during preparation for rescheduling. Giving up." + f"\n{''.join(traceback.format_exception(e))}" + ) self.shouldstop = msg return self.config.hook.pytest_testnodedown(node=node, error=None) @@ -392,7 +394,9 @@ def worker_collectionfinish( scheduling the first time it logs which scheduler is in use. """ if self.shuttingdown: - self.report_line(f"[-] [dse] collectionfinish while closing {node.gateway.id}") + self.report_line( + f"[-] [dse] collectionfinish while closing {node.gateway.id}" + ) return self.update_worker_status(node, "collected") @@ -412,7 +416,9 @@ def worker_collectionfinish( self.trdist.ensure_show_status() self.terminal.write_line("") if self.config.option.verbose > 0: - self.report_line(f"[-] [dse] scheduling tests via {self.sched.__class__.__name__}") + self.report_line( + f"[-] [dse] scheduling tests via {self.sched.__class__.__name__}" + ) if isinstance(self.sched, CustomGroup): if self.ready_to_run_tests and self.are_all_active_nodes_collected(): # we're coming back here after finishing a batch of tests - so start the next batch diff --git a/src/xdist/remote.py b/src/xdist/remote.py index 0ec9047d..e032f1b7 100644 --- a/src/xdist/remote.py +++ b/src/xdist/remote.py @@ -209,7 +209,9 @@ def pytest_collection_modifyitems( ) -> None: # add the group name to nodeid as suffix if --dist=loadgroup if config.getvalue("loadgroup") or config.getvalue("customgroup"): - functional_mark = "xdist_group" if config.getvalue("loadgroup") else "xdist_custom" + functional_mark = ( + "xdist_group" if config.getvalue("loadgroup") else "xdist_custom" + ) for item in items: mark = item.get_closest_marker(functional_mark) if not mark: diff --git a/src/xdist/scheduler/customgroup.py b/src/xdist/scheduler/customgroup.py index 5d1439a2..85824225 100644 --- a/src/xdist/scheduler/customgroup.py +++ b/src/xdist/scheduler/customgroup.py @@ -189,7 +189,6 @@ def mark_test_complete( self.check_schedule(node, duration=duration) def mark_test_pending(self, item: str) -> None: - assert self.collection is not None self.pending.insert( 0, @@ -205,7 +204,9 @@ def remove_pending_tests_from_node( ) -> None: raise NotImplementedError() - def check_schedule(self, node: WorkerController, duration: float = 0, from_dsession: bool = False) -> None: + def check_schedule( + self, node: WorkerController, duration: float = 0, from_dsession: bool = False + ) -> None: """Maybe schedule new items on the node. If there are any globally pending nodes left then this will @@ -214,7 +215,9 @@ def check_schedule(self, node: WorkerController, duration: float = 0, from_dsess heuristic to influence how many tests the node is assigned. """ if node.shutting_down: - self.report_line(f"[-] [csg] {node.workerinput['workerid']} is already shutting down") + self.report_line( + f"[-] [csg] {node.workerinput['workerid']} is already shutting down" + ) return if self.pending: @@ -227,18 +230,25 @@ def check_schedule(self, node: WorkerController, duration: float = 0, from_dsess if self.pending_groups: dist_group_key = self.pending_groups.pop(0) dist_group = self.dist_groups[dist_group_key] - nodes = cycle(self.nodes[0:dist_group['group_workers']]) - schedule_log: dict[str, Any] = {n.gateway.id:[] for n in self.nodes[0:dist_group['group_workers']]} - for _ in range(len(dist_group['test_indices'])): + nodes = cycle(self.nodes[0 : dist_group["group_workers"]]) + schedule_log: dict[str, Any] = { + n.gateway.id: [] + for n in self.nodes[0 : dist_group["group_workers"]] + } + for _ in range(len(dist_group["test_indices"])): n = next(nodes) - #needs cleaner way to be identified - tests_per_node = self.dist_groups[dist_group_key]['pending_indices'][:1] + # needs cleaner way to be identified + tests_per_node = self.dist_groups[dist_group_key][ + "pending_indices" + ][:1] schedule_log[n.gateway.id].extend(tests_per_node) self._send_tests_group(n, 1, dist_group_key) del self.dist_groups[dist_group_key] - message = (f"\n[-] [csg] check_schedule: processed scheduling for {dist_group_key}:" - f" {' '.join([f'{nid} ({len(nt)})' for nid,nt in schedule_log.items()])}") + message = ( + f"\n[-] [csg] check_schedule: processed scheduling for {dist_group_key}:" + f" {' '.join([f'{nid} ({len(nt)})' for nid,nt in schedule_log.items()])}" + ) self.report_line(message) else: @@ -310,26 +320,28 @@ def schedule(self) -> None: if self.is_first_time: for i, test in enumerate(self.collection): - if '@' in test: - group_mark = test.split('@')[-1] - group_workers = int(group_mark.split('_')[-1]) + if "@" in test: + group_mark = test.split("@")[-1] + group_workers = int(group_mark.split("_")[-1]) if group_workers > len(self.nodes): # We can only distribute across as many nodes as we have available # If a group requests more, we fallback to our actual max group_workers = len(self.nodes) else: - group_mark = 'default' + group_mark = "default" group_workers = len(self.nodes) - existing_tests = dist_groups.get(group_mark, {}).get('tests', []) + existing_tests = dist_groups.get(group_mark, {}).get("tests", []) existing_tests.append(test) - existing_indices = dist_groups.get(group_mark, {}).get('test_indices', []) + existing_indices = dist_groups.get(group_mark, {}).get( + "test_indices", [] + ) existing_indices.append(i) dist_groups[group_mark] = { - 'tests': existing_tests, - 'group_workers': group_workers, - 'test_indices': existing_indices, - 'pending_indices': existing_indices + "tests": existing_tests, + "group_workers": group_workers, + "test_indices": existing_indices, + "pending_indices": existing_indices, } self.dist_groups = dist_groups self.pending_groups = list(dist_groups.keys()) @@ -342,17 +354,21 @@ def schedule(self) -> None: return dist_group_key = self.pending_groups.pop(0) dist_group = self.dist_groups[dist_group_key] - nodes = cycle(self.nodes[0:dist_group['group_workers']]) - schedule_log: dict[str, Any] = {n.gateway.id: [] for n in self.nodes[0:dist_group['group_workers']]} - for _ in range(len(dist_group['test_indices'])): + nodes = cycle(self.nodes[0 : dist_group["group_workers"]]) + schedule_log: dict[str, Any] = { + n.gateway.id: [] for n in self.nodes[0 : dist_group["group_workers"]] + } + for _ in range(len(dist_group["test_indices"])): n = next(nodes) # needs cleaner way to be identified - tests_per_node = self.dist_groups[dist_group_key]['pending_indices'][:1] + tests_per_node = self.dist_groups[dist_group_key]["pending_indices"][:1] schedule_log[n.gateway.id].extend(tests_per_node) self._send_tests_group(n, 1, dist_group_key) del self.dist_groups[dist_group_key] - message = ("\n[-] [csg] schedule: processed scheduling for " - f"{dist_group_key}: {' '.join([f'{nid} ({len(nt)})' for nid, nt in schedule_log.items()])}") + message = ( + "\n[-] [csg] schedule: processed scheduling for " + f"{dist_group_key}: {' '.join([f'{nid} ({len(nt)})' for nid, nt in schedule_log.items()])}" + ) self.report_line(message) def _send_tests(self, node: WorkerController, num: int) -> None: @@ -362,16 +378,17 @@ def _send_tests(self, node: WorkerController, num: int) -> None: self.node2pending[node].extend(tests_per_node) node.send_runtest_some(tests_per_node) - def _send_tests_group(self, node: WorkerController, num: int, dist_group_key: str) -> None: - tests_per_node = self.dist_groups[dist_group_key]['pending_indices'][:num] + def _send_tests_group( + self, node: WorkerController, num: int, dist_group_key: str + ) -> None: + tests_per_node = self.dist_groups[dist_group_key]["pending_indices"][:num] if tests_per_node: - del self.dist_groups[dist_group_key]['pending_indices'][:num] + del self.dist_groups[dist_group_key]["pending_indices"][:num] for test_index in tests_per_node: self.pending.remove(test_index) self.node2pending[node].extend(tests_per_node) node.send_runtest_some(tests_per_node) - def _check_nodes_have_same_collection(self) -> bool: """Return True if all nodes have collected the same items. diff --git a/src/xdist/workermanage.py b/src/xdist/workermanage.py index 963dbb8a..c130bb5c 100644 --- a/src/xdist/workermanage.py +++ b/src/xdist/workermanage.py @@ -82,7 +82,7 @@ def rsync_roots(self, gateway: execnet.Gateway) -> None: def setup_nodes( self, putevent: Callable[[tuple[str, dict[str, Any]]], None], - max_nodes: int | None = None + max_nodes: int | None = None, ) -> list[WorkerController]: self.config.hook.pytest_xdist_setupnodes(config=self.config, specs=self.specs) self.trace("setting up nodes") diff --git a/xdist-testing-ntop/test.py b/xdist-testing-ntop/test.py index 4b370b6c..e43e881b 100644 --- a/xdist-testing-ntop/test.py +++ b/xdist-testing-ntop/test.py @@ -8,21 +8,25 @@ def test_1(): time.sleep(2) assert True + @pytest.mark.xdist_custom(name="low_4") def test_2(): time.sleep(2) assert True + @pytest.mark.xdist_custom(name="low_4") def test_3(): time.sleep(2) assert True + @pytest.mark.xdist_custom(name="low_4") def test_4(): time.sleep(2) assert True + # @pytest.mark.xdist_custom(name="low_4") # def test_4a(): # time.sleep(2) @@ -48,48 +52,58 @@ def test_4(): # time.sleep(2) # assert True + @pytest.mark.xdist_custom(name="med_2") def test_5(): time.sleep(3) assert True + @pytest.mark.xdist_custom(name="med_2") def test_6(): time.sleep(3) assert True + @pytest.mark.xdist_custom(name="med_2") def test_7(): time.sleep(3) assert True + @pytest.mark.xdist_custom(name="med_2") def test_8(): time.sleep(3) assert True + @pytest.mark.xdist_custom(name="high_1") def test_9(): time.sleep(5) assert True + @pytest.mark.xdist_custom(name="high_1") def test_10(): time.sleep(5) assert True + def test_11(): time.sleep(1) assert True + def test_12(): time.sleep(1) assert True + def test_13(): time.sleep(1) assert True + def test_14(): time.sleep(1) assert True