Fix swarm ext process bugs 27#4263
Closed
GeorgeWang-nv wants to merge 5 commits intoNVIDIA:2.7from
Closed
Conversation
Defer download transaction deletion in _delete_download_tx_on_msg_root() by 30 seconds using a daemon Timer thread. When a blob envelope is delivered, blob_cb is queued asynchronously on callback_thread_pool. The message root can be deleted before blob_cb runs, causing delete_transaction() to remove refs from _ref_table prematurely. When blob_cb later initiates secondary tensor downloads, _handle_download() returns "no ref found" and the transfer fails. The 30s deferral gives blob_cb time to complete secondary downloads. The _monitor_tx() daemon thread still cleans up finished transactions naturally, so the timer is just a safety net. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add a persistor-based model retrieval path before falling back to submit_model_executor.execute(). With external process executors (e.g. ClientAPILauncherExecutor), execute() launches a fresh subprocess that has no trained model state, so it cannot return the locally trained model. The best model is already saved by client_controller_executor._process_final_result() via persistor.save(), so loading it from the persistor is both correct and consistent with how _prepare_global_model() works. The persistor path tries the exact model_name first, then falls back to iterating the persistor inventory in case model_name does not match the stored key (e.g. "best_model" vs "FL_global_model.pt"). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When pass_through_on_send is enabled, FlareAgent._do_submit_result() unconditionally waits up to 1800s for DOWNLOAD_COMPLETE_CB after sending any result. For validate results (metrics only, no tensors), ViaDownloader never creates a download transaction, so the callback is never fired. The subprocess blocks until timeout, missing subsequent task assignments and causing CSE round 2+ to hang. Add a DOWNLOAD_STARTED_CB mechanism: ViaDownloader._finalize_download_tx() invokes this callback when a download transaction is actually created. FlareAgent registers a threading.Event via this callback before sending the result. After the send completes, if download_started is not set, the result contained no large objects — proceed immediately instead of blocking on download_done.wait(). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
New test files (20 tests): - test_via_downloader_deferred_deletion.py: 6 tests for Bug 1 deferred deletion (timer scheduling, daemon flag, target function) - test_cse_persistor_fallback.py: 6 tests for Bug 2 persistor-based model retrieval (direct get, inventory fallback, exception fallback, no persistor, no executor, empty inventory) - test_download_started_gating.py: 8 tests for Bug 3 DOWNLOAD_STARTED_CB (registration, skip-wait for validate, wait for train, cleanup) and ViaDownloader callback invocation Updated existing tests: - test_download_complete_gating.py: Fix 3 tests that broke due to Bug 3 changes — now fire DOWNLOAD_STARTED_CB in send_to_peer side_effect so the code reaches the download_done.wait() path instead of returning early on the new "no download transaction" fast path. Source files reformatted with black (no logic changes). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…Recipe Expose learn_task_ack_timeout, learn_task_timeout, and final_result_ack_timeout in SimpleSwarmLearningRecipe so callers can configure P2P broadcast timeouts for large models (e.g. Llama 1B ~2.8GB). The defaults (10s) are too short for tensor streaming transfers that take 14-20s per site. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Contributor
Greptile SummaryThis PR fixes three bugs in the swarm/external-process training flow in NVFlare:
Key issues:
All three fixes come with well-written unit tests covering the specified contracts. Confidence Score: 4/5
Sequence DiagramsequenceDiagram
participant FA as FlareAgent (_do_submit_result)
participant CP as CellPipe (send_to_peer)
participant VD as ViaDownloaderDecomposer (_finalize_download_tx)
participant CJ as ClientJob (peer ACK)
participant SRV as FL Server (tensor pull)
FA->>FA: register DOWNLOAD_STARTED_CB & DOWNLOAD_COMPLETE_CB
FA->>CP: send_to_peer(reply, timeout)
CP->>VD: serialize → _finalize_download_tx()
alt result has large tensors (train path)
VD->>FA: DOWNLOAD_STARTED_CB() — sets download_started event
VD->>VD: subscribe_to_msg_root (→ deferred delete after 30s)
end
CP->>CJ: send message
CJ-->>CP: ACK
CP-->>FA: sent=True
alt download_started NOT set (validate metrics path)
FA->>FA: return True immediately
else download_started IS set (train path)
FA->>FA: wait(download_done, timeout=download_complete_timeout)
SRV->>VD: pull tensors via DownloadService
VD-->>FA: DOWNLOAD_COMPLETE_CB(tid, FINISHED, objs)
FA->>FA: return True
end
FA->>FA: clear DOWNLOAD_STARTED_CB & DOWNLOAD_COMPLETE_CB (finally)
Last reviewed commit: 67e8de6 |
| progress_timeout: float = 3600, | ||
| max_status_report_interval: float = 300, | ||
| learn_task_ack_timeout: float = 10, | ||
| learn_task_timeout: float = None, |
Contributor
There was a problem hiding this comment.
Incorrect type annotation for learn_task_timeout
learn_task_timeout: float = None is invalid — the default value None is not a float. This will fail strict type-checkers (mypy, pyright) and can mislead users. Optional is already imported in this file (line 15).
Suggested change
| learn_task_timeout: float = None, | |
| learn_task_timeout: Optional[float] = None, |
10 tasks
chesterxgchen
added a commit
that referenced
this pull request
Mar 6, 2026
… validate hang, round_timeout (#4270) ## Summary Fixes four bugs in NVFlare 2.7.2 RC12 affecting Swarm Learning with `launch_external_process=True` Implementation Reference @GeorgeWang-nv's original PR #4263 Bug 1 -- different implementation from PR4263 Bug 2 -- accept the implementation approach with small changes from PR 4263 Bug 3 -- use different approach Bug 4 -- add timeout to swarm API but consolidate the timeouts from 4 to 2 and expose to Swarm Recipes Bug 5 -- restore `self._max_resends` (private convention); subprocess logging fixed (see below) Bug 6 -- guard ConnManager against RuntimeError after shutdown; 4 regression tests added **Additional (review feedback):** - Rename `SimpleSwarmLearningRecipe` → `SwarmLearningRecipe` everywhere; backward-compat alias kept - Fix docs: add missing required `min_clients` arg to all `SwarmLearningRecipe` examples (TypeError for users copying doc examples) - CSE inventory scan: take LAST "best" key, not first — prevents initial checkpoint named "best_*" from shadowing trained best model - Add `pipe_type` / `pipe_root_path` to `SwarmLearningRecipe` so users can select `FilePipe` without dropping to low-level API ### Bug 1 — Download ref deleted before tensor receivers finish (`via_downloader.py`) **Root cause**: `_create_downloader()` subscribed to msg_root deletion to clean up download transactions. The msg_root is deleted as soon as blob envelopes are delivered, but `blob_cb` fires asynchronously — secondary tensor downloads were still in flight when `delete_transaction()` removed the ref from `_ref_table`, causing `"no ref found" FATAL_SYSTEM_ERROR` on large models (≥1B parameters). **Fix**: Remove `subscribe_to_msg_root` from `_create_downloader()` entirely. Transaction lifecycle is now managed solely by `_monitor_tx()` in `download_service.py`, which polls `is_finished()` every 5s and cleans up only after all chunk downloads complete. ### Bug 2 — CSE fails to load local model after ext-process training (`cse_client_ctl.py`) **Root cause**: `_prepare_local_model()` called `submit_model_executor.execute()` which, for `ClientAPILauncherExecutor`, launches a fresh subprocess with no trained model state. The training subprocess had already exited; the model was already saved to disk by `PTFileModelPersistor`. **Fix**: Try the persistor first. Inventory key scan prefers the LAST key containing `"best"` when `model_name` contains `"best"` — taking the last "best" key avoids mistaking an initial checkpoint named `"best_*"` (added first by `PTFileModelPersistor`) for the trained best model. `isinstance` guard replaces `assert isinstance` (safe under `python -O`). Falls back to executor for in-process mode compatibility. ### Bug 3 — Validate results hang 1800s on CSE round 2+ (`flare_agent.py` + `via_downloader.py`) **Root cause**: `FlareAgent._do_submit_result()` unconditionally waited 1800s for `DOWNLOAD_COMPLETE_CB` after every result send when `pass_through_on_send=True`. For validate results (metrics only, no tensors), `_finalize_download_tx()` creates no download transaction and the callback never fires — subprocess blocks indefinitely. **Fix**: Thread-local `was_download_initiated()` flag, set by `_finalize_download_tx()` only when downloadable objects exist. Agent returns immediately if `False` (validate result). Thread-local is required because task pipe and metric pipe share the same `CoreCell` (same `site_name + token + mode` → same FQCN → same `_CellInfo` cache entry → same `fobs_ctx`); a plain flag would be clobbered by concurrent metric serialisation from a different thread. ### Bug 4 — P2P model ACK timeout too short for large models (`swarm.py`) **Root cause**: `SwarmClientConfig` hardcodes `learn_task_ack_timeout=10s` and `final_result_ack_timeout=10s`. For large models (≥2 GB), P2P tensor streaming takes minutes — the ACK times out before download completes. **Fix**: Add `round_timeout: float = 3600` to `SwarmLearningRecipe`. Wires both `learn_task_ack_timeout` and `final_result_ack_timeout`; `learn_task_timeout` is intentionally left `None` (unbounded) to avoid capping per-round training time on slow hardware or 70B+ models. ### Bug 5 — `_max_resends` attribute shadowing (`client_api_launcher_executor.py`) **Root cause**: `ClientAPILauncherExecutor.__init__()` stored `max_resends` as `self._max_resends` but `TaskExchanger` (the parent) uses `self.max_resends`. `PipeHandler` reads `self.max_resends` and saw `None` (the parent default) instead of the configured value. **Fix**: Restore `self._max_resends` as the private attribute throughout `ClientAPILauncherExecutor` (consistent with private-member convention); the executor builds its own config dict explicitly so it does not rely on the inherited attribute. ### Subprocess logging fix (`subprocess_launcher.py` + `ex_process/api.py`) **Problem**: The subprocess had no logging configuration — `logger.info()` calls were silently dropped. Wrapping all stdout via `logger.info()` in the parent caused double-prefixed entries in `log.txt` for NVFlare-formatted log lines. **Fix**: `_configure_subprocess_logging()` in `ex_process/api.py` loads the site's `log_config.json` unchanged, giving the subprocess identical loggers to the parent (both consoleHandler + file handlers). The parent's `log_subprocess_output()` now calls `_route_subprocess_line()` which strips ANSI codes and checks for a `YYYY-MM-DD HH:MM:SS` timestamp: - Formatted NVFlare log line → `print()` to terminal only (file handler already wrote it to `log.txt`) - Raw `print()` from user training script → `logger.info()` so it reaches `log.txt` Regression tests: `test_log_subprocess_output_formatted_lines_not_double_logged` + 5 `TestRouteSubprocessLine` tests. ### Bug 6 — ConnManager crashes on frame arrival after shutdown (`conn_manager.py`) **Root cause**: `process_frame_task()` and `start_connector()` could raise unhandled `RuntimeError` when called after the executor was shut down, causing noisy tracebacks during job teardown. **Fix**: Guard `conn_mgr_executor.submit()` with try/except `RuntimeError` (log debug, skip). Add `if self.stopped: return` early exit in `process_frame_task()`. **Regression tests**: 4 new tests in `test_conn_manager_shutdown_race.py`. ### `pipe_type` / `pipe_root_path` for `SwarmLearningRecipe` `SwarmLearningRecipe` always created a `CellPipe`; users needing `FilePipe` (restricted networks, third-party integrations) had to drop to the low-level `CCWFJob` + `SwarmClientConfig` API. **Change**: Add `pipe_type: str = "cell_pipe"` and `pipe_root_path: Optional[str] = None`: - `"cell_pipe"` (default): unchanged — zero-copy PASS_THROUGH forwarding, ~1 GB RAM overhead - `"file_pipe"`: `FilePipe` created with `root_path` defaulting to `{WORKSPACE}/pipe` (resolved at runtime); custom absolute path validated to exist at recipe construction time - Warnings for `pipe_root_path` ignored with `cell_pipe`, and `file_pipe` with `launch_external_process=False` - `ScriptRunner.__init__` gains `task_pipe` parameter forwarded to `BaseScriptRunner` 9 new tests in `TestSwarmLearningRecipePipeType`. ## Files Changed | File | Bug | Change | |------|-----|--------| | `nvflare/fuel/utils/fobs/decomposers/via_downloader.py` | 1 + 3 | Remove `subscribe_to_msg_root`; add thread-local `_tls`, `was_download_initiated()`, `clear_download_initiated()` | | `nvflare/client/flare_agent.py` | 3 | Check `was_download_initiated()` after `send_to_peer()`; return immediately for validate results | | `nvflare/app_common/ccwf/cse_client_ctl.py` | 2 | Persistor-first `_prepare_local_model()` with last-"best"-key inventory scan and isinstance guard | | `nvflare/app_opt/pt/recipes/swarm.py` | 4 + rename + pipe | Add `round_timeout`, `pipe_type`, `pipe_root_path`; rename to `SwarmLearningRecipe`; backward-compat alias kept | | `nvflare/job_config/script_runner.py` | pipe | Add `task_pipe` param to `ScriptRunner`, forwarded to `BaseScriptRunner` | | `nvflare/app_common/ccwf/recipes/swarm.py` | rename | Export `SwarmLearningRecipe` alongside `SimpleSwarmLearningRecipe` | | `nvflare/app_opt/pt/recipes/__init__.py` | rename | Add `SwarmLearningRecipe` to lazy imports and `__all__` | | `nvflare/app_common/executors/client_api_launcher_executor.py` | 5 | Restore `self._max_resends` (private convention) | | `nvflare/app_common/launchers/subprocess_launcher.py` | logging | `_route_subprocess_line()`: formatted lines → `print()`, raw lines → `logger.info()` | | `nvflare/client/ex_process/api.py` | logging | `_configure_subprocess_logging()`: apply site log config unchanged (same handlers as parent) | | `nvflare/fuel/f3/sfm/conn_manager.py` | 6 | Guard executor submit + frame processing against post-shutdown RuntimeError | | `docs/programming_guide/memory_management.rst` | docs | Add missing `min_clients` to example | | `docs/user_guide/data_scientist_guide/available_recipes.rst` | docs | Add missing `min_clients`; rename to `SwarmLearningRecipe` | | `docs/programming_guide/controllers/client_controlled_workflows.rst` | docs | Add missing `min_clients` (x2) | | `docs/programming_guide/timeouts.rst` | docs | Rename to `SwarmLearningRecipe` | | `examples/advanced/swarm_learning/swarm_pt/README.md` | 4 + rename | Add `round_timeout` to config table; rename to `SwarmLearningRecipe` | | `examples/advanced/swarm_learning/swarm_pt/job.py` | rename | Update import to `SwarmLearningRecipe` | | `tests/unit_test/fuel/utils/fobs/test_via_downloader_msg_root.py` | 1 | 4 new tests | | `tests/unit_test/client/test_download_initiated_gating.py` | 3 | 7 new tests | | `tests/unit_test/app_common/ccwf/test_cse_persistor_fallback.py` | 2 | 10 new tests (incl. initial-ckpt-shadowing regression) | | `tests/unit_test/fuel/f3/sfm/test_conn_manager_shutdown_race.py` | 6 | 4 new regression tests | | `tests/unit_test/app_common/launchers/subprocess_launcher_test.py` | logging | 6 new tests for `_route_subprocess_line` and double-logging prevention | | `tests/unit_test/recipe/swarm_recipe_test.py` | rename + pipe | Updated to `SwarmLearningRecipe`; 9 new `TestSwarmLearningRecipePipeType` tests | | `tests/unit_test/app_common/executors/client_api_launcher_executor_test.py` | 5 | Updated: `_max_resends` assertions | | `tests/unit_test/recipe/component_config_verification_test.py` | rename | Updated to `SwarmLearningRecipe` | ## Test Plan - [x] 250+ unit tests pass across all affected modules - [x] Bug 1: `test_via_downloader_msg_root.py` — verifies `subscribe_to_msg_root` never called, method removed, `DOWNLOAD_COMPLETE_CB` unaffected - [x] Bug 2: `test_cse_persistor_fallback.py` — verifies persistor-first path, last-"best"-key preference, initial-ckpt-shadowing regression, isinstance guard, all fallback paths - [x] Bug 3: `test_download_initiated_gating.py` — verifies thread isolation, validate returns immediately (<1s), train waits, clear-before-send - [x] Bug 4: covered by existing recipe and ccwf tests - [x] Bug 5: `client_api_launcher_executor_test.py` updated (`_max_resends` assertions) - [x] Bug 6: `test_conn_manager_shutdown_race.py` — 4 tests: executor-shutdown no-raise, stop() no-raise, stopped early-return, Prefix.from_bytes not called - [x] Logging: `subprocess_launcher_test.py` — double-logging prevention, ANSI stripping, plain-line capture, partial-timestamp detection - [x] Pipe type: `swarm_recipe_test.py` — 9 tests: default cell_pipe, file_pipe instance, workspace template, custom path, invalid type, warnings, path validation - [x] Rename: backward-compat `SimpleSwarmLearningRecipe` import test passes 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
YuanTingHsieh
pushed a commit
to YuanTingHsieh/NVFlare
that referenced
this pull request
Mar 17, 2026
… validate hang, round_timeout (NVIDIA#4270) Fixes four bugs in NVFlare 2.7.2 RC12 affecting Swarm Learning with `launch_external_process=True` Implementation Reference @GeorgeWang-nv's original PR NVIDIA#4263 Bug 1 -- different implementation from PR4263 Bug 2 -- accept the implementation approach with small changes from PR 4263 Bug 3 -- use different approach Bug 4 -- add timeout to swarm API but consolidate the timeouts from 4 to 2 and expose to Swarm Recipes Bug 5 -- restore `self._max_resends` (private convention); subprocess logging fixed (see below) Bug 6 -- guard ConnManager against RuntimeError after shutdown; 4 regression tests added **Additional (review feedback):** - Rename `SimpleSwarmLearningRecipe` → `SwarmLearningRecipe` everywhere; backward-compat alias kept - Fix docs: add missing required `min_clients` arg to all `SwarmLearningRecipe` examples (TypeError for users copying doc examples) - CSE inventory scan: take LAST "best" key, not first — prevents initial checkpoint named "best_*" from shadowing trained best model - Add `pipe_type` / `pipe_root_path` to `SwarmLearningRecipe` so users can select `FilePipe` without dropping to low-level API (`via_downloader.py`) **Root cause**: `_create_downloader()` subscribed to msg_root deletion to clean up download transactions. The msg_root is deleted as soon as blob envelopes are delivered, but `blob_cb` fires asynchronously — secondary tensor downloads were still in flight when `delete_transaction()` removed the ref from `_ref_table`, causing `"no ref found" FATAL_SYSTEM_ERROR` on large models (≥1B parameters). **Fix**: Remove `subscribe_to_msg_root` from `_create_downloader()` entirely. Transaction lifecycle is now managed solely by `_monitor_tx()` in `download_service.py`, which polls `is_finished()` every 5s and cleans up only after all chunk downloads complete. (`cse_client_ctl.py`) **Root cause**: `_prepare_local_model()` called `submit_model_executor.execute()` which, for `ClientAPILauncherExecutor`, launches a fresh subprocess with no trained model state. The training subprocess had already exited; the model was already saved to disk by `PTFileModelPersistor`. **Fix**: Try the persistor first. Inventory key scan prefers the LAST key containing `"best"` when `model_name` contains `"best"` — taking the last "best" key avoids mistaking an initial checkpoint named `"best_*"` (added first by `PTFileModelPersistor`) for the trained best model. `isinstance` guard replaces `assert isinstance` (safe under `python -O`). Falls back to executor for in-process mode compatibility. (`flare_agent.py` + `via_downloader.py`) **Root cause**: `FlareAgent._do_submit_result()` unconditionally waited 1800s for `DOWNLOAD_COMPLETE_CB` after every result send when `pass_through_on_send=True`. For validate results (metrics only, no tensors), `_finalize_download_tx()` creates no download transaction and the callback never fires — subprocess blocks indefinitely. **Fix**: Thread-local `was_download_initiated()` flag, set by `_finalize_download_tx()` only when downloadable objects exist. Agent returns immediately if `False` (validate result). Thread-local is required because task pipe and metric pipe share the same `CoreCell` (same `site_name + token + mode` → same FQCN → same `_CellInfo` cache entry → same `fobs_ctx`); a plain flag would be clobbered by concurrent metric serialisation from a different thread. (`swarm.py`) **Root cause**: `SwarmClientConfig` hardcodes `learn_task_ack_timeout=10s` and `final_result_ack_timeout=10s`. For large models (≥2 GB), P2P tensor streaming takes minutes — the ACK times out before download completes. **Fix**: Add `round_timeout: float = 3600` to `SwarmLearningRecipe`. Wires both `learn_task_ack_timeout` and `final_result_ack_timeout`; `learn_task_timeout` is intentionally left `None` (unbounded) to avoid capping per-round training time on slow hardware or 70B+ models. (`client_api_launcher_executor.py`) **Root cause**: `ClientAPILauncherExecutor.__init__()` stored `max_resends` as `self._max_resends` but `TaskExchanger` (the parent) uses `self.max_resends`. `PipeHandler` reads `self.max_resends` and saw `None` (the parent default) instead of the configured value. **Fix**: Restore `self._max_resends` as the private attribute throughout `ClientAPILauncherExecutor` (consistent with private-member convention); the executor builds its own config dict explicitly so it does not rely on the inherited attribute. `ex_process/api.py`) **Problem**: The subprocess had no logging configuration — `logger.info()` calls were silently dropped. Wrapping all stdout via `logger.info()` in the parent caused double-prefixed entries in `log.txt` for NVFlare-formatted log lines. **Fix**: `_configure_subprocess_logging()` in `ex_process/api.py` loads the site's `log_config.json` unchanged, giving the subprocess identical loggers to the parent (both consoleHandler + file handlers). The parent's `log_subprocess_output()` now calls `_route_subprocess_line()` which strips ANSI codes and checks for a `YYYY-MM-DD HH:MM:SS` timestamp: - Formatted NVFlare log line → `print()` to terminal only (file handler already wrote it to `log.txt`) - Raw `print()` from user training script → `logger.info()` so it reaches `log.txt` Regression tests: `test_log_subprocess_output_formatted_lines_not_double_logged` + 5 `TestRouteSubprocessLine` tests. (`conn_manager.py`) **Root cause**: `process_frame_task()` and `start_connector()` could raise unhandled `RuntimeError` when called after the executor was shut down, causing noisy tracebacks during job teardown. **Fix**: Guard `conn_mgr_executor.submit()` with try/except `RuntimeError` (log debug, skip). Add `if self.stopped: return` early exit in `process_frame_task()`. **Regression tests**: 4 new tests in `test_conn_manager_shutdown_race.py`. `SwarmLearningRecipe` always created a `CellPipe`; users needing `FilePipe` (restricted networks, third-party integrations) had to drop to the low-level `CCWFJob` + `SwarmClientConfig` API. **Change**: Add `pipe_type: str = "cell_pipe"` and `pipe_root_path: Optional[str] = None`: - `"cell_pipe"` (default): unchanged — zero-copy PASS_THROUGH forwarding, ~1 GB RAM overhead - `"file_pipe"`: `FilePipe` created with `root_path` defaulting to `{WORKSPACE}/pipe` (resolved at runtime); custom absolute path validated to exist at recipe construction time - Warnings for `pipe_root_path` ignored with `cell_pipe`, and `file_pipe` with `launch_external_process=False` - `ScriptRunner.__init__` gains `task_pipe` parameter forwarded to `BaseScriptRunner` 9 new tests in `TestSwarmLearningRecipePipeType`. | File | Bug | Change | |------|-----|--------| | `nvflare/fuel/utils/fobs/decomposers/via_downloader.py` | 1 + 3 | Remove `subscribe_to_msg_root`; add thread-local `_tls`, `was_download_initiated()`, `clear_download_initiated()` | | `nvflare/client/flare_agent.py` | 3 | Check `was_download_initiated()` after `send_to_peer()`; return immediately for validate results | | `nvflare/app_common/ccwf/cse_client_ctl.py` | 2 | Persistor-first `_prepare_local_model()` with last-"best"-key inventory scan and isinstance guard | | `nvflare/app_opt/pt/recipes/swarm.py` | 4 + rename + pipe | Add `round_timeout`, `pipe_type`, `pipe_root_path`; rename to `SwarmLearningRecipe`; backward-compat alias kept | | `nvflare/job_config/script_runner.py` | pipe | Add `task_pipe` param to `ScriptRunner`, forwarded to `BaseScriptRunner` | | `nvflare/app_common/ccwf/recipes/swarm.py` | rename | Export `SwarmLearningRecipe` alongside `SimpleSwarmLearningRecipe` | | `nvflare/app_opt/pt/recipes/__init__.py` | rename | Add `SwarmLearningRecipe` to lazy imports and `__all__` | | `nvflare/app_common/executors/client_api_launcher_executor.py` | 5 | Restore `self._max_resends` (private convention) | | `nvflare/app_common/launchers/subprocess_launcher.py` | logging | `_route_subprocess_line()`: formatted lines → `print()`, raw lines → `logger.info()` | | `nvflare/client/ex_process/api.py` | logging | `_configure_subprocess_logging()`: apply site log config unchanged (same handlers as parent) | | `nvflare/fuel/f3/sfm/conn_manager.py` | 6 | Guard executor submit + frame processing against post-shutdown RuntimeError | | `docs/programming_guide/memory_management.rst` | docs | Add missing `min_clients` to example | | `docs/user_guide/data_scientist_guide/available_recipes.rst` | docs | Add missing `min_clients`; rename to `SwarmLearningRecipe` | | `docs/programming_guide/controllers/client_controlled_workflows.rst` | docs | Add missing `min_clients` (x2) | | `docs/programming_guide/timeouts.rst` | docs | Rename to `SwarmLearningRecipe` | | `examples/advanced/swarm_learning/swarm_pt/README.md` | 4 + rename | Add `round_timeout` to config table; rename to `SwarmLearningRecipe` | | `examples/advanced/swarm_learning/swarm_pt/job.py` | rename | Update import to `SwarmLearningRecipe` | | `tests/unit_test/fuel/utils/fobs/test_via_downloader_msg_root.py` | 1 | 4 new tests | | `tests/unit_test/client/test_download_initiated_gating.py` | 3 | 7 new tests | | `tests/unit_test/app_common/ccwf/test_cse_persistor_fallback.py` | 2 | 10 new tests (incl. initial-ckpt-shadowing regression) | | `tests/unit_test/fuel/f3/sfm/test_conn_manager_shutdown_race.py` | 6 | 4 new regression tests | | `tests/unit_test/app_common/launchers/subprocess_launcher_test.py` | logging | 6 new tests for `_route_subprocess_line` and double-logging prevention | | `tests/unit_test/recipe/swarm_recipe_test.py` | rename + pipe | Updated to `SwarmLearningRecipe`; 9 new `TestSwarmLearningRecipePipeType` tests | | `tests/unit_test/app_common/executors/client_api_launcher_executor_test.py` | 5 | Updated: `_max_resends` assertions | | `tests/unit_test/recipe/component_config_verification_test.py` | rename | Updated to `SwarmLearningRecipe` | - [x] 250+ unit tests pass across all affected modules - [x] Bug 1: `test_via_downloader_msg_root.py` — verifies `subscribe_to_msg_root` never called, method removed, `DOWNLOAD_COMPLETE_CB` unaffected - [x] Bug 2: `test_cse_persistor_fallback.py` — verifies persistor-first path, last-"best"-key preference, initial-ckpt-shadowing regression, isinstance guard, all fallback paths - [x] Bug 3: `test_download_initiated_gating.py` — verifies thread isolation, validate returns immediately (<1s), train waits, clear-before-send - [x] Bug 4: covered by existing recipe and ccwf tests - [x] Bug 5: `client_api_launcher_executor_test.py` updated (`_max_resends` assertions) - [x] Bug 6: `test_conn_manager_shutdown_race.py` — 4 tests: executor-shutdown no-raise, stop() no-raise, stopped early-return, Prefix.from_bytes not called - [x] Logging: `subprocess_launcher_test.py` — double-logging prevention, ANSI stripping, plain-line capture, partial-timestamp detection - [x] Pipe type: `swarm_recipe_test.py` — 9 tests: default cell_pipe, file_pipe instance, workspace template, custom path, invalid type, warnings, path validation - [x] Rename: backward-compat `SimpleSwarmLearningRecipe` import test passes 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
chesterxgchen
added a commit
to chesterxgchen/NVFlare
that referenced
this pull request
Mar 29, 2026
… validate hang, round_timeout (NVIDIA#4270) Fixes four bugs in NVFlare 2.7.2 RC12 affecting Swarm Learning with `launch_external_process=True` Implementation Reference @GeorgeWang-nv's original PR NVIDIA#4263 Bug 1 -- different implementation from PR4263 Bug 2 -- accept the implementation approach with small changes from PR 4263 Bug 3 -- use different approach Bug 4 -- add timeout to swarm API but consolidate the timeouts from 4 to 2 and expose to Swarm Recipes Bug 5 -- restore `self._max_resends` (private convention); subprocess logging fixed (see below) Bug 6 -- guard ConnManager against RuntimeError after shutdown; 4 regression tests added **Additional (review feedback):** - Rename `SimpleSwarmLearningRecipe` → `SwarmLearningRecipe` everywhere; backward-compat alias kept - Fix docs: add missing required `min_clients` arg to all `SwarmLearningRecipe` examples (TypeError for users copying doc examples) - CSE inventory scan: take LAST "best" key, not first — prevents initial checkpoint named "best_*" from shadowing trained best model - Add `pipe_type` / `pipe_root_path` to `SwarmLearningRecipe` so users can select `FilePipe` without dropping to low-level API (`via_downloader.py`) **Root cause**: `_create_downloader()` subscribed to msg_root deletion to clean up download transactions. The msg_root is deleted as soon as blob envelopes are delivered, but `blob_cb` fires asynchronously — secondary tensor downloads were still in flight when `delete_transaction()` removed the ref from `_ref_table`, causing `"no ref found" FATAL_SYSTEM_ERROR` on large models (≥1B parameters). **Fix**: Remove `subscribe_to_msg_root` from `_create_downloader()` entirely. Transaction lifecycle is now managed solely by `_monitor_tx()` in `download_service.py`, which polls `is_finished()` every 5s and cleans up only after all chunk downloads complete. (`cse_client_ctl.py`) **Root cause**: `_prepare_local_model()` called `submit_model_executor.execute()` which, for `ClientAPILauncherExecutor`, launches a fresh subprocess with no trained model state. The training subprocess had already exited; the model was already saved to disk by `PTFileModelPersistor`. **Fix**: Try the persistor first. Inventory key scan prefers the LAST key containing `"best"` when `model_name` contains `"best"` — taking the last "best" key avoids mistaking an initial checkpoint named `"best_*"` (added first by `PTFileModelPersistor`) for the trained best model. `isinstance` guard replaces `assert isinstance` (safe under `python -O`). Falls back to executor for in-process mode compatibility. (`flare_agent.py` + `via_downloader.py`) **Root cause**: `FlareAgent._do_submit_result()` unconditionally waited 1800s for `DOWNLOAD_COMPLETE_CB` after every result send when `pass_through_on_send=True`. For validate results (metrics only, no tensors), `_finalize_download_tx()` creates no download transaction and the callback never fires — subprocess blocks indefinitely. **Fix**: Thread-local `was_download_initiated()` flag, set by `_finalize_download_tx()` only when downloadable objects exist. Agent returns immediately if `False` (validate result). Thread-local is required because task pipe and metric pipe share the same `CoreCell` (same `site_name + token + mode` → same FQCN → same `_CellInfo` cache entry → same `fobs_ctx`); a plain flag would be clobbered by concurrent metric serialisation from a different thread. (`swarm.py`) **Root cause**: `SwarmClientConfig` hardcodes `learn_task_ack_timeout=10s` and `final_result_ack_timeout=10s`. For large models (≥2 GB), P2P tensor streaming takes minutes — the ACK times out before download completes. **Fix**: Add `round_timeout: float = 3600` to `SwarmLearningRecipe`. Wires both `learn_task_ack_timeout` and `final_result_ack_timeout`; `learn_task_timeout` is intentionally left `None` (unbounded) to avoid capping per-round training time on slow hardware or 70B+ models. (`client_api_launcher_executor.py`) **Root cause**: `ClientAPILauncherExecutor.__init__()` stored `max_resends` as `self._max_resends` but `TaskExchanger` (the parent) uses `self.max_resends`. `PipeHandler` reads `self.max_resends` and saw `None` (the parent default) instead of the configured value. **Fix**: Restore `self._max_resends` as the private attribute throughout `ClientAPILauncherExecutor` (consistent with private-member convention); the executor builds its own config dict explicitly so it does not rely on the inherited attribute. `ex_process/api.py`) **Problem**: The subprocess had no logging configuration — `logger.info()` calls were silently dropped. Wrapping all stdout via `logger.info()` in the parent caused double-prefixed entries in `log.txt` for NVFlare-formatted log lines. **Fix**: `_configure_subprocess_logging()` in `ex_process/api.py` loads the site's `log_config.json` unchanged, giving the subprocess identical loggers to the parent (both consoleHandler + file handlers). The parent's `log_subprocess_output()` now calls `_route_subprocess_line()` which strips ANSI codes and checks for a `YYYY-MM-DD HH:MM:SS` timestamp: - Formatted NVFlare log line → `print()` to terminal only (file handler already wrote it to `log.txt`) - Raw `print()` from user training script → `logger.info()` so it reaches `log.txt` Regression tests: `test_log_subprocess_output_formatted_lines_not_double_logged` + 5 `TestRouteSubprocessLine` tests. (`conn_manager.py`) **Root cause**: `process_frame_task()` and `start_connector()` could raise unhandled `RuntimeError` when called after the executor was shut down, causing noisy tracebacks during job teardown. **Fix**: Guard `conn_mgr_executor.submit()` with try/except `RuntimeError` (log debug, skip). Add `if self.stopped: return` early exit in `process_frame_task()`. **Regression tests**: 4 new tests in `test_conn_manager_shutdown_race.py`. `SwarmLearningRecipe` always created a `CellPipe`; users needing `FilePipe` (restricted networks, third-party integrations) had to drop to the low-level `CCWFJob` + `SwarmClientConfig` API. **Change**: Add `pipe_type: str = "cell_pipe"` and `pipe_root_path: Optional[str] = None`: - `"cell_pipe"` (default): unchanged — zero-copy PASS_THROUGH forwarding, ~1 GB RAM overhead - `"file_pipe"`: `FilePipe` created with `root_path` defaulting to `{WORKSPACE}/pipe` (resolved at runtime); custom absolute path validated to exist at recipe construction time - Warnings for `pipe_root_path` ignored with `cell_pipe`, and `file_pipe` with `launch_external_process=False` - `ScriptRunner.__init__` gains `task_pipe` parameter forwarded to `BaseScriptRunner` 9 new tests in `TestSwarmLearningRecipePipeType`. | File | Bug | Change | |------|-----|--------| | `nvflare/fuel/utils/fobs/decomposers/via_downloader.py` | 1 + 3 | Remove `subscribe_to_msg_root`; add thread-local `_tls`, `was_download_initiated()`, `clear_download_initiated()` | | `nvflare/client/flare_agent.py` | 3 | Check `was_download_initiated()` after `send_to_peer()`; return immediately for validate results | | `nvflare/app_common/ccwf/cse_client_ctl.py` | 2 | Persistor-first `_prepare_local_model()` with last-"best"-key inventory scan and isinstance guard | | `nvflare/app_opt/pt/recipes/swarm.py` | 4 + rename + pipe | Add `round_timeout`, `pipe_type`, `pipe_root_path`; rename to `SwarmLearningRecipe`; backward-compat alias kept | | `nvflare/job_config/script_runner.py` | pipe | Add `task_pipe` param to `ScriptRunner`, forwarded to `BaseScriptRunner` | | `nvflare/app_common/ccwf/recipes/swarm.py` | rename | Export `SwarmLearningRecipe` alongside `SimpleSwarmLearningRecipe` | | `nvflare/app_opt/pt/recipes/__init__.py` | rename | Add `SwarmLearningRecipe` to lazy imports and `__all__` | | `nvflare/app_common/executors/client_api_launcher_executor.py` | 5 | Restore `self._max_resends` (private convention) | | `nvflare/app_common/launchers/subprocess_launcher.py` | logging | `_route_subprocess_line()`: formatted lines → `print()`, raw lines → `logger.info()` | | `nvflare/client/ex_process/api.py` | logging | `_configure_subprocess_logging()`: apply site log config unchanged (same handlers as parent) | | `nvflare/fuel/f3/sfm/conn_manager.py` | 6 | Guard executor submit + frame processing against post-shutdown RuntimeError | | `docs/programming_guide/memory_management.rst` | docs | Add missing `min_clients` to example | | `docs/user_guide/data_scientist_guide/available_recipes.rst` | docs | Add missing `min_clients`; rename to `SwarmLearningRecipe` | | `docs/programming_guide/controllers/client_controlled_workflows.rst` | docs | Add missing `min_clients` (x2) | | `docs/programming_guide/timeouts.rst` | docs | Rename to `SwarmLearningRecipe` | | `examples/advanced/swarm_learning/swarm_pt/README.md` | 4 + rename | Add `round_timeout` to config table; rename to `SwarmLearningRecipe` | | `examples/advanced/swarm_learning/swarm_pt/job.py` | rename | Update import to `SwarmLearningRecipe` | | `tests/unit_test/fuel/utils/fobs/test_via_downloader_msg_root.py` | 1 | 4 new tests | | `tests/unit_test/client/test_download_initiated_gating.py` | 3 | 7 new tests | | `tests/unit_test/app_common/ccwf/test_cse_persistor_fallback.py` | 2 | 10 new tests (incl. initial-ckpt-shadowing regression) | | `tests/unit_test/fuel/f3/sfm/test_conn_manager_shutdown_race.py` | 6 | 4 new regression tests | | `tests/unit_test/app_common/launchers/subprocess_launcher_test.py` | logging | 6 new tests for `_route_subprocess_line` and double-logging prevention | | `tests/unit_test/recipe/swarm_recipe_test.py` | rename + pipe | Updated to `SwarmLearningRecipe`; 9 new `TestSwarmLearningRecipePipeType` tests | | `tests/unit_test/app_common/executors/client_api_launcher_executor_test.py` | 5 | Updated: `_max_resends` assertions | | `tests/unit_test/recipe/component_config_verification_test.py` | rename | Updated to `SwarmLearningRecipe` | - [x] 250+ unit tests pass across all affected modules - [x] Bug 1: `test_via_downloader_msg_root.py` — verifies `subscribe_to_msg_root` never called, method removed, `DOWNLOAD_COMPLETE_CB` unaffected - [x] Bug 2: `test_cse_persistor_fallback.py` — verifies persistor-first path, last-"best"-key preference, initial-ckpt-shadowing regression, isinstance guard, all fallback paths - [x] Bug 3: `test_download_initiated_gating.py` — verifies thread isolation, validate returns immediately (<1s), train waits, clear-before-send - [x] Bug 4: covered by existing recipe and ccwf tests - [x] Bug 5: `client_api_launcher_executor_test.py` updated (`_max_resends` assertions) - [x] Bug 6: `test_conn_manager_shutdown_race.py` — 4 tests: executor-shutdown no-raise, stop() no-raise, stopped early-return, Prefix.from_bytes not called - [x] Logging: `subprocess_launcher_test.py` — double-logging prevention, ANSI stripping, plain-line capture, partial-timestamp detection - [x] Pipe type: `swarm_recipe_test.py` — 9 tests: default cell_pipe, file_pipe instance, workspace template, custom path, invalid type, warnings, path validation - [x] Rename: backward-compat `SimpleSwarmLearningRecipe` import test passes 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
chesterxgchen
added a commit
to chesterxgchen/NVFlare
that referenced
this pull request
Mar 29, 2026
… validate hang, round_timeout (NVIDIA#4270) Fixes four bugs in NVFlare 2.7.2 RC12 affecting Swarm Learning with `launch_external_process=True` Implementation Reference @GeorgeWang-nv's original PR NVIDIA#4263 Bug 1 -- different implementation from PR4263 Bug 2 -- accept the implementation approach with small changes from PR 4263 Bug 3 -- use different approach Bug 4 -- add timeout to swarm API but consolidate the timeouts from 4 to 2 and expose to Swarm Recipes Bug 5 -- restore `self._max_resends` (private convention); subprocess logging fixed (see below) Bug 6 -- guard ConnManager against RuntimeError after shutdown; 4 regression tests added **Additional (review feedback):** - Rename `SimpleSwarmLearningRecipe` → `SwarmLearningRecipe` everywhere; backward-compat alias kept - Fix docs: add missing required `min_clients` arg to all `SwarmLearningRecipe` examples (TypeError for users copying doc examples) - CSE inventory scan: take LAST "best" key, not first — prevents initial checkpoint named "best_*" from shadowing trained best model - Add `pipe_type` / `pipe_root_path` to `SwarmLearningRecipe` so users can select `FilePipe` without dropping to low-level API (`via_downloader.py`) **Root cause**: `_create_downloader()` subscribed to msg_root deletion to clean up download transactions. The msg_root is deleted as soon as blob envelopes are delivered, but `blob_cb` fires asynchronously — secondary tensor downloads were still in flight when `delete_transaction()` removed the ref from `_ref_table`, causing `"no ref found" FATAL_SYSTEM_ERROR` on large models (≥1B parameters). **Fix**: Remove `subscribe_to_msg_root` from `_create_downloader()` entirely. Transaction lifecycle is now managed solely by `_monitor_tx()` in `download_service.py`, which polls `is_finished()` every 5s and cleans up only after all chunk downloads complete. (`cse_client_ctl.py`) **Root cause**: `_prepare_local_model()` called `submit_model_executor.execute()` which, for `ClientAPILauncherExecutor`, launches a fresh subprocess with no trained model state. The training subprocess had already exited; the model was already saved to disk by `PTFileModelPersistor`. **Fix**: Try the persistor first. Inventory key scan prefers the LAST key containing `"best"` when `model_name` contains `"best"` — taking the last "best" key avoids mistaking an initial checkpoint named `"best_*"` (added first by `PTFileModelPersistor`) for the trained best model. `isinstance` guard replaces `assert isinstance` (safe under `python -O`). Falls back to executor for in-process mode compatibility. (`flare_agent.py` + `via_downloader.py`) **Root cause**: `FlareAgent._do_submit_result()` unconditionally waited 1800s for `DOWNLOAD_COMPLETE_CB` after every result send when `pass_through_on_send=True`. For validate results (metrics only, no tensors), `_finalize_download_tx()` creates no download transaction and the callback never fires — subprocess blocks indefinitely. **Fix**: Thread-local `was_download_initiated()` flag, set by `_finalize_download_tx()` only when downloadable objects exist. Agent returns immediately if `False` (validate result). Thread-local is required because task pipe and metric pipe share the same `CoreCell` (same `site_name + token + mode` → same FQCN → same `_CellInfo` cache entry → same `fobs_ctx`); a plain flag would be clobbered by concurrent metric serialisation from a different thread. (`swarm.py`) **Root cause**: `SwarmClientConfig` hardcodes `learn_task_ack_timeout=10s` and `final_result_ack_timeout=10s`. For large models (≥2 GB), P2P tensor streaming takes minutes — the ACK times out before download completes. **Fix**: Add `round_timeout: float = 3600` to `SwarmLearningRecipe`. Wires both `learn_task_ack_timeout` and `final_result_ack_timeout`; `learn_task_timeout` is intentionally left `None` (unbounded) to avoid capping per-round training time on slow hardware or 70B+ models. (`client_api_launcher_executor.py`) **Root cause**: `ClientAPILauncherExecutor.__init__()` stored `max_resends` as `self._max_resends` but `TaskExchanger` (the parent) uses `self.max_resends`. `PipeHandler` reads `self.max_resends` and saw `None` (the parent default) instead of the configured value. **Fix**: Restore `self._max_resends` as the private attribute throughout `ClientAPILauncherExecutor` (consistent with private-member convention); the executor builds its own config dict explicitly so it does not rely on the inherited attribute. `ex_process/api.py`) **Problem**: The subprocess had no logging configuration — `logger.info()` calls were silently dropped. Wrapping all stdout via `logger.info()` in the parent caused double-prefixed entries in `log.txt` for NVFlare-formatted log lines. **Fix**: `_configure_subprocess_logging()` in `ex_process/api.py` loads the site's `log_config.json` unchanged, giving the subprocess identical loggers to the parent (both consoleHandler + file handlers). The parent's `log_subprocess_output()` now calls `_route_subprocess_line()` which strips ANSI codes and checks for a `YYYY-MM-DD HH:MM:SS` timestamp: - Formatted NVFlare log line → `print()` to terminal only (file handler already wrote it to `log.txt`) - Raw `print()` from user training script → `logger.info()` so it reaches `log.txt` Regression tests: `test_log_subprocess_output_formatted_lines_not_double_logged` + 5 `TestRouteSubprocessLine` tests. (`conn_manager.py`) **Root cause**: `process_frame_task()` and `start_connector()` could raise unhandled `RuntimeError` when called after the executor was shut down, causing noisy tracebacks during job teardown. **Fix**: Guard `conn_mgr_executor.submit()` with try/except `RuntimeError` (log debug, skip). Add `if self.stopped: return` early exit in `process_frame_task()`. **Regression tests**: 4 new tests in `test_conn_manager_shutdown_race.py`. `SwarmLearningRecipe` always created a `CellPipe`; users needing `FilePipe` (restricted networks, third-party integrations) had to drop to the low-level `CCWFJob` + `SwarmClientConfig` API. **Change**: Add `pipe_type: str = "cell_pipe"` and `pipe_root_path: Optional[str] = None`: - `"cell_pipe"` (default): unchanged — zero-copy PASS_THROUGH forwarding, ~1 GB RAM overhead - `"file_pipe"`: `FilePipe` created with `root_path` defaulting to `{WORKSPACE}/pipe` (resolved at runtime); custom absolute path validated to exist at recipe construction time - Warnings for `pipe_root_path` ignored with `cell_pipe`, and `file_pipe` with `launch_external_process=False` - `ScriptRunner.__init__` gains `task_pipe` parameter forwarded to `BaseScriptRunner` 9 new tests in `TestSwarmLearningRecipePipeType`. | File | Bug | Change | |------|-----|--------| | `nvflare/fuel/utils/fobs/decomposers/via_downloader.py` | 1 + 3 | Remove `subscribe_to_msg_root`; add thread-local `_tls`, `was_download_initiated()`, `clear_download_initiated()` | | `nvflare/client/flare_agent.py` | 3 | Check `was_download_initiated()` after `send_to_peer()`; return immediately for validate results | | `nvflare/app_common/ccwf/cse_client_ctl.py` | 2 | Persistor-first `_prepare_local_model()` with last-"best"-key inventory scan and isinstance guard | | `nvflare/app_opt/pt/recipes/swarm.py` | 4 + rename + pipe | Add `round_timeout`, `pipe_type`, `pipe_root_path`; rename to `SwarmLearningRecipe`; backward-compat alias kept | | `nvflare/job_config/script_runner.py` | pipe | Add `task_pipe` param to `ScriptRunner`, forwarded to `BaseScriptRunner` | | `nvflare/app_common/ccwf/recipes/swarm.py` | rename | Export `SwarmLearningRecipe` alongside `SimpleSwarmLearningRecipe` | | `nvflare/app_opt/pt/recipes/__init__.py` | rename | Add `SwarmLearningRecipe` to lazy imports and `__all__` | | `nvflare/app_common/executors/client_api_launcher_executor.py` | 5 | Restore `self._max_resends` (private convention) | | `nvflare/app_common/launchers/subprocess_launcher.py` | logging | `_route_subprocess_line()`: formatted lines → `print()`, raw lines → `logger.info()` | | `nvflare/client/ex_process/api.py` | logging | `_configure_subprocess_logging()`: apply site log config unchanged (same handlers as parent) | | `nvflare/fuel/f3/sfm/conn_manager.py` | 6 | Guard executor submit + frame processing against post-shutdown RuntimeError | | `docs/programming_guide/memory_management.rst` | docs | Add missing `min_clients` to example | | `docs/user_guide/data_scientist_guide/available_recipes.rst` | docs | Add missing `min_clients`; rename to `SwarmLearningRecipe` | | `docs/programming_guide/controllers/client_controlled_workflows.rst` | docs | Add missing `min_clients` (x2) | | `docs/programming_guide/timeouts.rst` | docs | Rename to `SwarmLearningRecipe` | | `examples/advanced/swarm_learning/swarm_pt/README.md` | 4 + rename | Add `round_timeout` to config table; rename to `SwarmLearningRecipe` | | `examples/advanced/swarm_learning/swarm_pt/job.py` | rename | Update import to `SwarmLearningRecipe` | | `tests/unit_test/fuel/utils/fobs/test_via_downloader_msg_root.py` | 1 | 4 new tests | | `tests/unit_test/client/test_download_initiated_gating.py` | 3 | 7 new tests | | `tests/unit_test/app_common/ccwf/test_cse_persistor_fallback.py` | 2 | 10 new tests (incl. initial-ckpt-shadowing regression) | | `tests/unit_test/fuel/f3/sfm/test_conn_manager_shutdown_race.py` | 6 | 4 new regression tests | | `tests/unit_test/app_common/launchers/subprocess_launcher_test.py` | logging | 6 new tests for `_route_subprocess_line` and double-logging prevention | | `tests/unit_test/recipe/swarm_recipe_test.py` | rename + pipe | Updated to `SwarmLearningRecipe`; 9 new `TestSwarmLearningRecipePipeType` tests | | `tests/unit_test/app_common/executors/client_api_launcher_executor_test.py` | 5 | Updated: `_max_resends` assertions | | `tests/unit_test/recipe/component_config_verification_test.py` | rename | Updated to `SwarmLearningRecipe` | - [x] 250+ unit tests pass across all affected modules - [x] Bug 1: `test_via_downloader_msg_root.py` — verifies `subscribe_to_msg_root` never called, method removed, `DOWNLOAD_COMPLETE_CB` unaffected - [x] Bug 2: `test_cse_persistor_fallback.py` — verifies persistor-first path, last-"best"-key preference, initial-ckpt-shadowing regression, isinstance guard, all fallback paths - [x] Bug 3: `test_download_initiated_gating.py` — verifies thread isolation, validate returns immediately (<1s), train waits, clear-before-send - [x] Bug 4: covered by existing recipe and ccwf tests - [x] Bug 5: `client_api_launcher_executor_test.py` updated (`_max_resends` assertions) - [x] Bug 6: `test_conn_manager_shutdown_race.py` — 4 tests: executor-shutdown no-raise, stop() no-raise, stopped early-return, Prefix.from_bytes not called - [x] Logging: `subprocess_launcher_test.py` — double-logging prevention, ANSI stripping, plain-line capture, partial-timestamp detection - [x] Pipe type: `swarm_recipe_test.py` — 9 tests: default cell_pipe, file_pipe instance, workspace template, custom path, invalid type, warnings, path validation - [x] Rename: backward-compat `SimpleSwarmLearningRecipe` import test passes 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes # .
Description
A few sentences describing the changes proposed in this pull request.
Types of changes
./runtest.sh.