Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/changelog/3869.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Break deadlock in execution interrupt chain that caused ~18 flaky timeout failures across 9 tests on Windows/macOS CI -
by :user:`gaborbernat`.
76 changes: 49 additions & 27 deletions src/tox/session/cmd/run/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import os
import time
from argparse import Action, ArgumentError, ArgumentParser, Namespace
from concurrent.futures import CancelledError, Future, ThreadPoolExecutor, as_completed
from concurrent.futures import FIRST_COMPLETED, CancelledError, Future, ThreadPoolExecutor
from concurrent.futures import wait as wait_futures
from fnmatch import fnmatchcase
from pathlib import Path
from signal import SIGINT, Handlers, signal
Expand Down Expand Up @@ -227,7 +228,8 @@ def execute(state: State, max_workers: int | None, has_spinner: bool, live: bool
)
thread.start()
try:
thread.join()
while thread.is_alive():
thread.join(timeout=1)
except KeyboardInterrupt:
previous, has_previous = signal(SIGINT, Handlers.SIG_IGN), True
spinner.print_report = False # no need to print reports at this point, final report coming up
Expand Down Expand Up @@ -287,6 +289,18 @@ def update_spinner(self, result: ToxEnvRunResult, success: bool) -> None: # noq
done(result.name)


def _next_completed(
future_to_env: dict[Future[ToxEnvRunResult], ToxEnv],
interrupt: Event,
) -> Future[ToxEnvRunResult] | None:
while True:
done_futures, _ = wait_futures(list(future_to_env), timeout=1, return_when=FIRST_COMPLETED)
if done_futures:
return done_futures.pop()
if interrupt.is_set():
return None


def _queue_and_wait( # noqa: C901, PLR0913, PLR0915, PLR0912
state: State,
to_run_list: list[str],
Expand Down Expand Up @@ -337,32 +351,40 @@ def _run(tox_env: RunToxEnv) -> ToxEnvRunResult:

if not future_to_env:
result: ToxEnvRunResult | None = None
else: # if we have queued wait for completed
future = next(as_completed(future_to_env))
tox_env_done = future_to_env.pop(future)
try:
result = future.result()
except CancelledError:
tox_env_done.teardown()
name = tox_env_done.conf.name
result = ToxEnvRunResult(
name=name,
skipped=False,
code=-3,
outcomes=[],
duration=MISS_DURATION,
)
results.append(result)
completed.add(result.name)
if (
result.code != Outcome.OK
and not result.ignore_outcome
and (options.parsed.fail_fast or result.fail_fast)
):
interrupt.set()
else:
completed_future = _next_completed(future_to_env, interrupt)
if completed_future is None:
for pending_future, pending_env in list(future_to_env.items()):
if not pending_future.cancel() and not pending_future.done():
pending_env.interrupt()
future_to_env.clear()
env_list = []
for pending_future in list(future_to_env.keys()):
pending_future.cancel()
result = None
else:
tox_env_done = future_to_env.pop(completed_future)
try:
result = completed_future.result()
except CancelledError:
tox_env_done.teardown()
name = tox_env_done.conf.name
result = ToxEnvRunResult(
name=name,
skipped=False,
code=-3,
outcomes=[],
duration=MISS_DURATION,
)
results.append(result)
completed.add(result.name)
if (
result.code != Outcome.OK
and not result.ignore_outcome
and (options.parsed.fail_fast or result.fail_fast)
):
interrupt.set()
env_list = []
for pending_future in list(future_to_env.keys()):
pending_future.cancel()

if not interrupt.is_set() and not env_list:
env_list = next(envs_to_run_generator, [])
Expand Down