Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Scheduler for Grouping Tests w/ Variable number of Workers #1130

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- uses: actions/checkout@v4

- name: Build and Check Package
uses: hynek/build-and-inspect-python-package@v2.8
uses: hynek/build-and-inspect-python-package@v2.9

deploy:
needs: package
Expand All @@ -39,7 +39,9 @@ jobs:
path: dist

- name: Publish package to PyPI
uses: pypa/gh-action-pypi-publish@v1.9.0
uses: pypa/gh-action-pypi-publish@v1.10.1
with:
attestations: true

- name: Push tag
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Build and Check Package
uses: hynek/build-and-inspect-python-package@v2.8
uses: hynek/build-and-inspect-python-package@v2.9

test:

Expand Down
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: "v0.6.1"
rev: "v0.6.5"
hooks:
- id: ruff
args: ["--fix"]
Expand All @@ -23,7 +23,7 @@ repos:
language: python
additional_dependencies: [pygments, restructuredtext_lint]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.11.1
rev: v1.11.2
hooks:
- id: mypy
files: ^(src/|testing/)
Expand Down
144 changes: 138 additions & 6 deletions src/xdist/dsession.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
from queue import Empty
from queue import Queue
import sys
import traceback
from typing import Any
from typing import Callable
from typing import Sequence
import warnings

Expand All @@ -14,6 +16,7 @@

from xdist.remote import Producer
from xdist.remote import WorkerInfo
from xdist.scheduler import CustomGroup
from xdist.scheduler import EachScheduling
from xdist.scheduler import LoadFileScheduling
from xdist.scheduler import LoadGroupScheduling
Expand Down Expand Up @@ -58,10 +61,14 @@ def __init__(self, config: pytest.Config) -> None:
self._failed_collection_errors: dict[object, bool] = {}
self._active_nodes: set[WorkerController] = set()
self._failed_nodes_count = 0
self.saved_put: Callable[[tuple[str, dict[str, Any]]], None]
self.remake_nodes = False
self.ready_to_run_tests = False
self._max_worker_restart = get_default_max_worker_restart(self.config)
# summary message to print at the end of the session
self._summary_report: str | None = None
self.terminal = config.pluginmanager.getplugin("terminalreporter")
self.worker_status: dict[str, str] = {}
if self.terminal:
self.trdist = TerminalDistReporter(config)
config.pluginmanager.register(self.trdist, "terminaldistreporter")
Expand All @@ -87,6 +94,7 @@ def pytest_sessionstart(self, session: pytest.Session) -> None:
soon as nodes start they will emit the worker_workerready event.
"""
self.nodemanager = NodeManager(self.config)
self.saved_put = self.queue.put
nodes = self.nodemanager.setup_nodes(putevent=self.queue.put)
self._active_nodes.update(nodes)
self._session = session
Expand Down Expand Up @@ -123,6 +131,8 @@ def pytest_xdist_make_scheduler(
return LoadGroupScheduling(config, log)
if dist == "worksteal":
return WorkStealingScheduling(config, log)
if dist == "customgroup":
return CustomGroup(config, log)
return None

@pytest.hookimpl
Expand All @@ -147,14 +157,19 @@ def loop_once(self) -> None:
"""Process one callback from one of the workers."""
while 1:
if not self._active_nodes:
# If everything has died stop looping
self.triggershutdown()
raise RuntimeError("Unexpectedly no active workers available")
# Worker teardown + recreation only occurs for CustomGroup Scheduler
if isinstance(self.sched, CustomGroup) and self.remake_nodes:
pass
else:
# We aren't using CustomGroup scheduler and everything has died: stop looping
self.triggershutdown()
raise RuntimeError("Unexpectedly no active workers available")
try:
eventcall = self.queue.get(timeout=2.0)
break
except Empty:
continue

callname, kwargs = eventcall
assert callname, kwargs
method = "worker_" + callname
Expand All @@ -165,6 +180,78 @@ def loop_once(self) -> None:
if self.sched.tests_finished:
self.triggershutdown()

def is_node_finishing(self, node: WorkerController) -> bool:
"""Check if a test worker is considered to be finishing.

Evaluate whether it's on its last test, or if no tests are pending.
"""
assert self.sched is not None
assert type(self.sched) is CustomGroup
pending = self.sched.node2pending.get(node)
return pending is not None and len(pending) < 2

def are_all_nodes_finishing(self) -> bool:
"""Check if all workers are finishing (See 'is_node_finishing' above)."""
assert self.sched is not None
return all(self.is_node_finishing(node) for node in self.sched.nodes)

def are_all_nodes_done(self) -> bool:
"""Check if all nodes have reported to finish."""
return all(s == "finished" for s in self.worker_status.values())

def are_all_active_nodes_collected(self) -> bool:
"""Check if all nodes have reported collection to be complete."""
if not all(n.gateway.id in self.worker_status for n in self._active_nodes):
return False
return all(
self.worker_status[n.gateway.id] == "collected" for n in self._active_nodes
)

def reset_nodes_if_needed(self) -> None:
assert self.sched is not None
assert type(self.sched) is CustomGroup
if (
self.are_all_nodes_finishing()
and self.ready_to_run_tests
and not self.sched.do_resched
):
self.reset_nodes()

def reset_nodes(self) -> None:
"""Issue shutdown notices to workers for rescheduling purposes."""
assert self.sched is not None
assert type(self.sched) is CustomGroup
if len(self.sched.pending) != 0:
self.remake_nodes = True
for node in self.sched.nodes:
if self.is_node_finishing(node):
node.shutdown()

def reschedule(self) -> None:
"""Reschedule tests."""
assert self.sched is not None
assert type(self.sched) is CustomGroup
self.sched.do_resched = False
self.sched.check_schedule(self.sched.nodes[0], 1.0, True)

def prepare_for_reschedule(self) -> None:
"""Update test workers and their status tracking so rescheduling is ready."""
assert type(self.sched) is CustomGroup
assert self.sched is not None
self.remake_nodes = False
num_workers = self.sched.dist_groups[self.sched.pending_groups[0]][
"group_workers"
]
self.trdist._status = {}
assert self.nodemanager is not None
new_nodes = self.nodemanager.setup_nodes(self.saved_put, num_workers)
self.worker_status = {}
self._active_nodes = set()
self._active_nodes.update(new_nodes)
self.sched.node2pending = {}
assert type(self.sched) is CustomGroup
self.sched.do_resched = True

#
# callbacks for processing events from workers
#
Expand All @@ -182,6 +269,7 @@ def worker_workerready(
node.workerinfo = workerinfo
node.workerinfo["id"] = node.gateway.id
node.workerinfo["spec"] = node.gateway.spec
self.update_worker_status(node, "ready")

self.config.hook.pytest_testnodeready(node=node)
if self.shuttingdown:
Expand All @@ -198,6 +286,21 @@ def worker_workerfinished(self, node: WorkerController) -> None:
The node might not be in the scheduler if it had not emitted
workerready before shutdown was triggered.
"""
self.update_worker_status(node, "finished")

if isinstance(self.sched, CustomGroup) and self.remake_nodes:
node.ensure_teardown()
self._active_nodes.remove(node)
if self.are_all_nodes_done():
try:
self.prepare_for_reschedule()
except Exception as e:
msg = (
"Exception caught during preparation for rescheduling. Giving up."
f"\n{''.join(traceback.format_exception(e))}"
)
self.shouldstop = msg
return
self.config.hook.pytest_testnodedown(node=node, error=None)
if node.workeroutput["exitstatus"] == 2: # keyboard-interrupt
self.shouldstop = f"{node} received keyboard-interrupt"
Expand All @@ -217,6 +320,14 @@ def worker_workerfinished(self, node: WorkerController) -> None:
assert not crashitem, (crashitem, node)
self._active_nodes.remove(node)

def update_worker_status(self, node: WorkerController, status: str) -> None:
"""Track the worker status.

Can be used at callbacks like 'worker_workerfinished' so we remember wchic event
was reported last by each worker.
"""
self.worker_status[node.workerinfo["id"]] = status

def worker_internal_error(
self, node: WorkerController, formatted_error: str
) -> None:
Expand Down Expand Up @@ -283,7 +394,12 @@ def worker_collectionfinish(
scheduling the first time it logs which scheduler is in use.
"""
if self.shuttingdown:
self.report_line(
f"[-] [dse] collectionfinish while closing {node.gateway.id}"
)
return
self.update_worker_status(node, "collected")

self.config.hook.pytest_xdist_node_collection_finished(node=node, ids=ids)
# tell session which items were effectively collected otherwise
# the controller node will finish the session with EXIT_NOTESTSCOLLECTED
Expand All @@ -300,10 +416,20 @@ def worker_collectionfinish(
self.trdist.ensure_show_status()
self.terminal.write_line("")
if self.config.option.verbose > 0:
self.terminal.write_line(
f"scheduling tests via {self.sched.__class__.__name__}"
self.report_line(
f"[-] [dse] scheduling tests via {self.sched.__class__.__name__}"
)
self.sched.schedule()
if isinstance(self.sched, CustomGroup):
if self.ready_to_run_tests and self.are_all_active_nodes_collected():
# we're coming back here after finishing a batch of tests - so start the next batch
self.reschedule()
self.reset_nodes_if_needed()
else:
self.ready_to_run_tests = True
self.sched.schedule()
self.reset_nodes_if_needed()
else:
self.sched.schedule()

def worker_logstart(
self,
Expand Down Expand Up @@ -339,6 +465,12 @@ def worker_runtest_protocol_complete(
"""
assert self.sched is not None
self.sched.mark_test_complete(node, item_index, duration)
if isinstance(self.sched, CustomGroup):
if self.are_all_nodes_finishing():
if self.shouldstop:
self.report_line("Won't reschedule - should stop.")
else:
self.reset_nodes()

def worker_unscheduled(
self, node: WorkerController, indices: Sequence[int]
Expand Down
3 changes: 3 additions & 0 deletions src/xdist/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def pytest_addoption(parser: pytest.Parser) -> None:
"loadfile",
"loadgroup",
"worksteal",
"customgroup",
"no",
],
dest="dist",
Expand All @@ -124,6 +125,8 @@ def pytest_addoption(parser: pytest.Parser) -> None:
"loadgroup: Like 'load', but sends tests marked with 'xdist_group' to the same worker.\n\n"
"worksteal: Split the test suite between available environments,"
" then re-balance when any worker runs out of tests.\n\n"
# TODO: Update docstring
"customgroup: TODO: add docs here"
"(default) no: Run tests inprocess, don't distribute."
),
)
Expand Down
9 changes: 7 additions & 2 deletions src/xdist/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,19 @@ def run_one_test(self) -> None:
"runtest_protocol_complete", item_index=self.item_index, duration=duration
)

@pytest.mark.trylast
def pytest_collection_modifyitems(
self,
config: pytest.Config,
items: list[pytest.Item],
) -> None:
# add the group name to nodeid as suffix if --dist=loadgroup
if config.getvalue("loadgroup"):
if config.getvalue("loadgroup") or config.getvalue("customgroup"):
functional_mark = (
"xdist_group" if config.getvalue("loadgroup") else "xdist_custom"
)
for item in items:
mark = item.get_closest_marker("xdist_group")
mark = item.get_closest_marker(functional_mark)
if not mark:
continue
gname = (
Expand Down Expand Up @@ -357,6 +361,7 @@ def getinfodict() -> WorkerInfo:

def setup_config(config: pytest.Config, basetemp: str | None) -> None:
config.option.loadgroup = config.getvalue("dist") == "loadgroup"
config.option.customgroup = config.getvalue("dist") == "customgroup"
config.option.looponfail = False
config.option.usepdb = False
config.option.dist = "no"
Expand Down
1 change: 1 addition & 0 deletions src/xdist/scheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from xdist.scheduler.customgroup import CustomGroup as CustomGroup
from xdist.scheduler.each import EachScheduling as EachScheduling
from xdist.scheduler.load import LoadScheduling as LoadScheduling
from xdist.scheduler.loadfile import LoadFileScheduling as LoadFileScheduling
Expand Down
Loading