Skip to content

Fix swarm ext process bugs 27#4263

Closed
GeorgeWang-nv wants to merge 5 commits intoNVIDIA:2.7from
GeorgeWang-nv:fix_swarm_ext_process_bugs_27
Closed

Fix swarm ext process bugs 27#4263
GeorgeWang-nv wants to merge 5 commits intoNVIDIA:2.7from
GeorgeWang-nv:fix_swarm_ext_process_bugs_27

Conversation

@GeorgeWang-nv
Copy link
Copy Markdown
Contributor

Fixes # .

Description

A few sentences describing the changes proposed in this pull request.

Types of changes

  • Non-breaking change (fix or new feature that would not break existing functionality).
  • Breaking change (fix or new feature that would cause existing functionality to change).
  • New tests added to cover the changes.
  • Quick tests passed locally by running ./runtest.sh.
  • In-line docstrings updated.
  • Documentation updated.

GeorgeWang-nv and others added 5 commits March 4, 2026 10:25
Defer download transaction deletion in _delete_download_tx_on_msg_root()
by 30 seconds using a daemon Timer thread. When a blob envelope is
delivered, blob_cb is queued asynchronously on callback_thread_pool.
The message root can be deleted before blob_cb runs, causing
delete_transaction() to remove refs from _ref_table prematurely.
When blob_cb later initiates secondary tensor downloads,
_handle_download() returns "no ref found" and the transfer fails.

The 30s deferral gives blob_cb time to complete secondary downloads.
The _monitor_tx() daemon thread still cleans up finished transactions
naturally, so the timer is just a safety net.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add a persistor-based model retrieval path before falling back to
submit_model_executor.execute(). With external process executors
(e.g. ClientAPILauncherExecutor), execute() launches a fresh
subprocess that has no trained model state, so it cannot return the
locally trained model. The best model is already saved by
client_controller_executor._process_final_result() via persistor.save(),
so loading it from the persistor is both correct and consistent with
how _prepare_global_model() works.

The persistor path tries the exact model_name first, then falls back
to iterating the persistor inventory in case model_name does not
match the stored key (e.g. "best_model" vs "FL_global_model.pt").

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When pass_through_on_send is enabled, FlareAgent._do_submit_result()
unconditionally waits up to 1800s for DOWNLOAD_COMPLETE_CB after
sending any result. For validate results (metrics only, no tensors),
ViaDownloader never creates a download transaction, so the callback
is never fired. The subprocess blocks until timeout, missing
subsequent task assignments and causing CSE round 2+ to hang.

Add a DOWNLOAD_STARTED_CB mechanism: ViaDownloader._finalize_download_tx()
invokes this callback when a download transaction is actually created.
FlareAgent registers a threading.Event via this callback before sending
the result. After the send completes, if download_started is not set,
the result contained no large objects — proceed immediately instead of
blocking on download_done.wait().

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
New test files (20 tests):
- test_via_downloader_deferred_deletion.py: 6 tests for Bug 1 deferred
  deletion (timer scheduling, daemon flag, target function)
- test_cse_persistor_fallback.py: 6 tests for Bug 2 persistor-based
  model retrieval (direct get, inventory fallback, exception fallback,
  no persistor, no executor, empty inventory)
- test_download_started_gating.py: 8 tests for Bug 3 DOWNLOAD_STARTED_CB
  (registration, skip-wait for validate, wait for train, cleanup) and
  ViaDownloader callback invocation

Updated existing tests:
- test_download_complete_gating.py: Fix 3 tests that broke due to Bug 3
  changes — now fire DOWNLOAD_STARTED_CB in send_to_peer side_effect so
  the code reaches the download_done.wait() path instead of returning
  early on the new "no download transaction" fast path.

Source files reformatted with black (no logic changes).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…Recipe

Expose learn_task_ack_timeout, learn_task_timeout, and final_result_ack_timeout
in SimpleSwarmLearningRecipe so callers can configure P2P broadcast timeouts for
large models (e.g. Llama 1B ~2.8GB). The defaults (10s) are too short for tensor
streaming transfers that take 14-20s per site.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Mar 4, 2026

Greptile Summary

This PR fixes three bugs in the swarm/external-process training flow in NVFlare:

  • Bug 1 (via_downloader.py): A race condition in P2P broadcast where the message root deletion could fire delete_transaction() before asynchronous blob_cb callbacks completed their secondary tensor downloads. Fixed by deferring the deletion by 30 seconds via a daemon threading.Timer.
  • Bug 2 (cse_client_ctl.py): CrossSiteEvalClientController._prepare_local_model() called submit_model_executor.execute() which spawns a fresh subprocess with no retained training state when using ClientAPILauncherExecutor. Fixed by adding a persistor-based retrieval path (matching _prepare_global_model()) before falling back to the executor.
  • Bug 3 (flare_agent.py): _do_submit_result() unconditionally blocked for download_complete_timeout (up to 1800 s) even for validate results that carry no tensors, because ViaDownloader never creates a download transaction for metric-only results. Fixed by introducing DOWNLOAD_STARTED_CB that ViaDownloader fires when a download transaction is actually created; if the event is not set after send_to_peer() returns, the subprocess proceeds immediately.

Key issues:

  • learn_task_timeout: float = None in swarm.py (line 168) uses an incorrect type annotation — None is not a float. It should be Optional[float] = None (the import is already present).

All three fixes come with well-written unit tests covering the specified contracts.

Confidence Score: 4/5

  • Safe to merge after correcting the type annotation in swarm.py. The three bug fixes are well-tested and logically sound.
  • The three underlying bug fixes are well-designed and thoroughly tested. The only concrete issue remaining is a definite syntax error in the type annotation for learn_task_timeout at line 168 of swarm.py, which must be corrected before merge. Once fixed, the PR is safe to merge.
  • nvflare/app_opt/pt/recipes/swarm.py (type annotation fix required)

Sequence Diagram

sequenceDiagram
    participant FA as FlareAgent (_do_submit_result)
    participant CP as CellPipe (send_to_peer)
    participant VD as ViaDownloaderDecomposer (_finalize_download_tx)
    participant CJ as ClientJob (peer ACK)
    participant SRV as FL Server (tensor pull)

    FA->>FA: register DOWNLOAD_STARTED_CB & DOWNLOAD_COMPLETE_CB
    FA->>CP: send_to_peer(reply, timeout)
    CP->>VD: serialize → _finalize_download_tx()
    alt result has large tensors (train path)
        VD->>FA: DOWNLOAD_STARTED_CB() — sets download_started event
        VD->>VD: subscribe_to_msg_root (→ deferred delete after 30s)
    end
    CP->>CJ: send message
    CJ-->>CP: ACK
    CP-->>FA: sent=True
    alt download_started NOT set (validate metrics path)
        FA->>FA: return True immediately
    else download_started IS set (train path)
        FA->>FA: wait(download_done, timeout=download_complete_timeout)
        SRV->>VD: pull tensors via DownloadService
        VD-->>FA: DOWNLOAD_COMPLETE_CB(tid, FINISHED, objs)
        FA->>FA: return True
    end
    FA->>FA: clear DOWNLOAD_STARTED_CB & DOWNLOAD_COMPLETE_CB (finally)
Loading

Last reviewed commit: 67e8de6

progress_timeout: float = 3600,
max_status_report_interval: float = 300,
learn_task_ack_timeout: float = 10,
learn_task_timeout: float = None,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect type annotation for learn_task_timeout

learn_task_timeout: float = None is invalid — the default value None is not a float. This will fail strict type-checkers (mypy, pyright) and can mislead users. Optional is already imported in this file (line 15).

Suggested change
learn_task_timeout: float = None,
learn_task_timeout: Optional[float] = None,

chesterxgchen added a commit that referenced this pull request Mar 6, 2026
… validate hang, round_timeout (#4270)

## Summary

Fixes four bugs in NVFlare 2.7.2 RC12 affecting Swarm Learning with
`launch_external_process=True`
Implementation Reference @GeorgeWang-nv's original PR
#4263
Bug 1  -- different implementation from PR4263
Bug 2 -- accept the implementation approach with small changes from PR
4263
Bug 3 -- use different approach
Bug 4 -- add timeout to swarm API but consolidate the timeouts from 4 to
2 and expose to Swarm Recipes
Bug 5 -- restore `self._max_resends` (private convention); subprocess
logging fixed (see below)
Bug 6 -- guard ConnManager against RuntimeError after shutdown; 4
regression tests added

**Additional (review feedback):**
- Rename `SimpleSwarmLearningRecipe` → `SwarmLearningRecipe` everywhere;
backward-compat alias kept
- Fix docs: add missing required `min_clients` arg to all
`SwarmLearningRecipe` examples (TypeError for users copying doc
examples)
- CSE inventory scan: take LAST "best" key, not first — prevents initial
checkpoint named "best_*" from shadowing trained best model
- Add `pipe_type` / `pipe_root_path` to `SwarmLearningRecipe` so users
can select `FilePipe` without dropping to low-level API



### Bug 1 — Download ref deleted before tensor receivers finish
(`via_downloader.py`)

**Root cause**: `_create_downloader()` subscribed to msg_root deletion
to clean up download transactions. The msg_root is deleted as soon as
blob envelopes are delivered, but `blob_cb` fires asynchronously —
secondary tensor downloads were still in flight when
`delete_transaction()` removed the ref from `_ref_table`, causing `"no
ref found" FATAL_SYSTEM_ERROR` on large models (≥1B parameters).

**Fix**: Remove `subscribe_to_msg_root` from `_create_downloader()`
entirely. Transaction lifecycle is now managed solely by `_monitor_tx()`
in `download_service.py`, which polls `is_finished()` every 5s and
cleans up only after all chunk downloads complete.

### Bug 2 — CSE fails to load local model after ext-process training
(`cse_client_ctl.py`)

**Root cause**: `_prepare_local_model()` called
`submit_model_executor.execute()` which, for
`ClientAPILauncherExecutor`, launches a fresh subprocess with no trained
model state. The training subprocess had already exited; the model was
already saved to disk by `PTFileModelPersistor`.

**Fix**: Try the persistor first. Inventory key scan prefers the LAST
key containing `"best"` when `model_name` contains `"best"` — taking the
last "best" key avoids mistaking an initial checkpoint named `"best_*"`
(added first by `PTFileModelPersistor`) for the trained best model.
`isinstance` guard replaces `assert isinstance` (safe under `python
-O`). Falls back to executor for in-process mode compatibility.

### Bug 3 — Validate results hang 1800s on CSE round 2+
(`flare_agent.py` + `via_downloader.py`)

**Root cause**: `FlareAgent._do_submit_result()` unconditionally waited
1800s for `DOWNLOAD_COMPLETE_CB` after every result send when
`pass_through_on_send=True`. For validate results (metrics only, no
tensors), `_finalize_download_tx()` creates no download transaction and
the callback never fires — subprocess blocks indefinitely.

**Fix**: Thread-local `was_download_initiated()` flag, set by
`_finalize_download_tx()` only when downloadable objects exist. Agent
returns immediately if `False` (validate result). Thread-local is
required because task pipe and metric pipe share the same `CoreCell`
(same `site_name + token + mode` → same FQCN → same `_CellInfo` cache
entry → same `fobs_ctx`); a plain flag would be clobbered by concurrent
metric serialisation from a different thread.

### Bug 4 — P2P model ACK timeout too short for large models
(`swarm.py`)

**Root cause**: `SwarmClientConfig` hardcodes
`learn_task_ack_timeout=10s` and `final_result_ack_timeout=10s`. For
large models (≥2 GB), P2P tensor streaming takes minutes — the ACK times
out before download completes.

**Fix**: Add `round_timeout: float = 3600` to `SwarmLearningRecipe`.
Wires both `learn_task_ack_timeout` and `final_result_ack_timeout`;
`learn_task_timeout` is intentionally left `None` (unbounded) to avoid
capping per-round training time on slow hardware or 70B+ models.

### Bug 5 — `_max_resends` attribute shadowing
(`client_api_launcher_executor.py`)

**Root cause**: `ClientAPILauncherExecutor.__init__()` stored
`max_resends` as `self._max_resends` but `TaskExchanger` (the parent)
uses `self.max_resends`. `PipeHandler` reads `self.max_resends` and saw
`None` (the parent default) instead of the configured value.

**Fix**: Restore `self._max_resends` as the private attribute throughout
`ClientAPILauncherExecutor` (consistent with private-member convention);
the executor builds its own config dict explicitly so it does not rely
on the inherited attribute.

### Subprocess logging fix (`subprocess_launcher.py` +
`ex_process/api.py`)

**Problem**: The subprocess had no logging configuration —
`logger.info()` calls were silently dropped. Wrapping all stdout via
`logger.info()` in the parent caused double-prefixed entries in
`log.txt` for NVFlare-formatted log lines.

**Fix**: `_configure_subprocess_logging()` in `ex_process/api.py` loads
the site's `log_config.json` unchanged, giving the subprocess identical
loggers to the parent (both consoleHandler + file handlers). The
parent's `log_subprocess_output()` now calls `_route_subprocess_line()`
which strips ANSI codes and checks for a `YYYY-MM-DD HH:MM:SS`
timestamp:
- Formatted NVFlare log line → `print()` to terminal only (file handler
already wrote it to `log.txt`)
- Raw `print()` from user training script → `logger.info()` so it
reaches `log.txt`

Regression tests:
`test_log_subprocess_output_formatted_lines_not_double_logged` + 5
`TestRouteSubprocessLine` tests.

### Bug 6 — ConnManager crashes on frame arrival after shutdown
(`conn_manager.py`)

**Root cause**: `process_frame_task()` and `start_connector()` could
raise unhandled `RuntimeError` when called after the executor was shut
down, causing noisy tracebacks during job teardown.

**Fix**: Guard `conn_mgr_executor.submit()` with try/except
`RuntimeError` (log debug, skip). Add `if self.stopped: return` early
exit in `process_frame_task()`.

**Regression tests**: 4 new tests in
`test_conn_manager_shutdown_race.py`.

### `pipe_type` / `pipe_root_path` for `SwarmLearningRecipe`

`SwarmLearningRecipe` always created a `CellPipe`; users needing
`FilePipe` (restricted networks, third-party integrations) had to drop
to the low-level `CCWFJob` + `SwarmClientConfig` API.

**Change**: Add `pipe_type: str = "cell_pipe"` and `pipe_root_path:
Optional[str] = None`:
- `"cell_pipe"` (default): unchanged — zero-copy PASS_THROUGH
forwarding, ~1 GB RAM overhead
- `"file_pipe"`: `FilePipe` created with `root_path` defaulting to
`{WORKSPACE}/pipe` (resolved at runtime); custom absolute path validated
to exist at recipe construction time
- Warnings for `pipe_root_path` ignored with `cell_pipe`, and
`file_pipe` with `launch_external_process=False`
- `ScriptRunner.__init__` gains `task_pipe` parameter forwarded to
`BaseScriptRunner`

9 new tests in `TestSwarmLearningRecipePipeType`.

## Files Changed

| File | Bug | Change |
|------|-----|--------|
| `nvflare/fuel/utils/fobs/decomposers/via_downloader.py` | 1 + 3 |
Remove `subscribe_to_msg_root`; add thread-local `_tls`,
`was_download_initiated()`, `clear_download_initiated()` |
| `nvflare/client/flare_agent.py` | 3 | Check `was_download_initiated()`
after `send_to_peer()`; return immediately for validate results |
| `nvflare/app_common/ccwf/cse_client_ctl.py` | 2 | Persistor-first
`_prepare_local_model()` with last-"best"-key inventory scan and
isinstance guard |
| `nvflare/app_opt/pt/recipes/swarm.py` | 4 + rename + pipe | Add
`round_timeout`, `pipe_type`, `pipe_root_path`; rename to
`SwarmLearningRecipe`; backward-compat alias kept |
| `nvflare/job_config/script_runner.py` | pipe | Add `task_pipe` param
to `ScriptRunner`, forwarded to `BaseScriptRunner` |
| `nvflare/app_common/ccwf/recipes/swarm.py` | rename | Export
`SwarmLearningRecipe` alongside `SimpleSwarmLearningRecipe` |
| `nvflare/app_opt/pt/recipes/__init__.py` | rename | Add
`SwarmLearningRecipe` to lazy imports and `__all__` |
| `nvflare/app_common/executors/client_api_launcher_executor.py` | 5 |
Restore `self._max_resends` (private convention) |
| `nvflare/app_common/launchers/subprocess_launcher.py` | logging |
`_route_subprocess_line()`: formatted lines → `print()`, raw lines →
`logger.info()` |
| `nvflare/client/ex_process/api.py` | logging |
`_configure_subprocess_logging()`: apply site log config unchanged (same
handlers as parent) |
| `nvflare/fuel/f3/sfm/conn_manager.py` | 6 | Guard executor submit +
frame processing against post-shutdown RuntimeError |
| `docs/programming_guide/memory_management.rst` | docs | Add missing
`min_clients` to example |
| `docs/user_guide/data_scientist_guide/available_recipes.rst` | docs |
Add missing `min_clients`; rename to `SwarmLearningRecipe` |
| `docs/programming_guide/controllers/client_controlled_workflows.rst` |
docs | Add missing `min_clients` (x2) |
| `docs/programming_guide/timeouts.rst` | docs | Rename to
`SwarmLearningRecipe` |
| `examples/advanced/swarm_learning/swarm_pt/README.md` | 4 + rename |
Add `round_timeout` to config table; rename to `SwarmLearningRecipe` |
| `examples/advanced/swarm_learning/swarm_pt/job.py` | rename | Update
import to `SwarmLearningRecipe` |
| `tests/unit_test/fuel/utils/fobs/test_via_downloader_msg_root.py` | 1
| 4 new tests |
| `tests/unit_test/client/test_download_initiated_gating.py` | 3 | 7 new
tests |
| `tests/unit_test/app_common/ccwf/test_cse_persistor_fallback.py` | 2 |
10 new tests (incl. initial-ckpt-shadowing regression) |
| `tests/unit_test/fuel/f3/sfm/test_conn_manager_shutdown_race.py` | 6 |
4 new regression tests |
| `tests/unit_test/app_common/launchers/subprocess_launcher_test.py` |
logging | 6 new tests for `_route_subprocess_line` and double-logging
prevention |
| `tests/unit_test/recipe/swarm_recipe_test.py` | rename + pipe |
Updated to `SwarmLearningRecipe`; 9 new
`TestSwarmLearningRecipePipeType` tests |
|
`tests/unit_test/app_common/executors/client_api_launcher_executor_test.py`
| 5 | Updated: `_max_resends` assertions |
| `tests/unit_test/recipe/component_config_verification_test.py` |
rename | Updated to `SwarmLearningRecipe` |

## Test Plan

- [x] 250+ unit tests pass across all affected modules
- [x] Bug 1: `test_via_downloader_msg_root.py` — verifies
`subscribe_to_msg_root` never called, method removed,
`DOWNLOAD_COMPLETE_CB` unaffected
- [x] Bug 2: `test_cse_persistor_fallback.py` — verifies persistor-first
path, last-"best"-key preference, initial-ckpt-shadowing regression,
isinstance guard, all fallback paths
- [x] Bug 3: `test_download_initiated_gating.py` — verifies thread
isolation, validate returns immediately (<1s), train waits,
clear-before-send
- [x] Bug 4: covered by existing recipe and ccwf tests
- [x] Bug 5: `client_api_launcher_executor_test.py` updated
(`_max_resends` assertions)
- [x] Bug 6: `test_conn_manager_shutdown_race.py` — 4 tests:
executor-shutdown no-raise, stop() no-raise, stopped early-return,
Prefix.from_bytes not called
- [x] Logging: `subprocess_launcher_test.py` — double-logging
prevention, ANSI stripping, plain-line capture, partial-timestamp
detection
- [x] Pipe type: `swarm_recipe_test.py` — 9 tests: default cell_pipe,
file_pipe instance, workspace template, custom path, invalid type,
warnings, path validation
- [x] Rename: backward-compat `SimpleSwarmLearningRecipe` import test
passes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
@pcnudde pcnudde closed this Mar 6, 2026
YuanTingHsieh pushed a commit to YuanTingHsieh/NVFlare that referenced this pull request Mar 17, 2026
… validate hang, round_timeout (NVIDIA#4270)

Fixes four bugs in NVFlare 2.7.2 RC12 affecting Swarm Learning with
`launch_external_process=True`
Implementation Reference @GeorgeWang-nv's original PR
NVIDIA#4263
Bug 1  -- different implementation from PR4263
Bug 2 -- accept the implementation approach with small changes from PR
4263
Bug 3 -- use different approach
Bug 4 -- add timeout to swarm API but consolidate the timeouts from 4 to
2 and expose to Swarm Recipes
Bug 5 -- restore `self._max_resends` (private convention); subprocess
logging fixed (see below)
Bug 6 -- guard ConnManager against RuntimeError after shutdown; 4
regression tests added

**Additional (review feedback):**
- Rename `SimpleSwarmLearningRecipe` → `SwarmLearningRecipe` everywhere;
backward-compat alias kept
- Fix docs: add missing required `min_clients` arg to all
`SwarmLearningRecipe` examples (TypeError for users copying doc
examples)
- CSE inventory scan: take LAST "best" key, not first — prevents initial
checkpoint named "best_*" from shadowing trained best model
- Add `pipe_type` / `pipe_root_path` to `SwarmLearningRecipe` so users
can select `FilePipe` without dropping to low-level API

(`via_downloader.py`)

**Root cause**: `_create_downloader()` subscribed to msg_root deletion
to clean up download transactions. The msg_root is deleted as soon as
blob envelopes are delivered, but `blob_cb` fires asynchronously —
secondary tensor downloads were still in flight when
`delete_transaction()` removed the ref from `_ref_table`, causing `"no
ref found" FATAL_SYSTEM_ERROR` on large models (≥1B parameters).

**Fix**: Remove `subscribe_to_msg_root` from `_create_downloader()`
entirely. Transaction lifecycle is now managed solely by `_monitor_tx()`
in `download_service.py`, which polls `is_finished()` every 5s and
cleans up only after all chunk downloads complete.

(`cse_client_ctl.py`)

**Root cause**: `_prepare_local_model()` called
`submit_model_executor.execute()` which, for
`ClientAPILauncherExecutor`, launches a fresh subprocess with no trained
model state. The training subprocess had already exited; the model was
already saved to disk by `PTFileModelPersistor`.

**Fix**: Try the persistor first. Inventory key scan prefers the LAST
key containing `"best"` when `model_name` contains `"best"` — taking the
last "best" key avoids mistaking an initial checkpoint named `"best_*"`
(added first by `PTFileModelPersistor`) for the trained best model.
`isinstance` guard replaces `assert isinstance` (safe under `python
-O`). Falls back to executor for in-process mode compatibility.

(`flare_agent.py` + `via_downloader.py`)

**Root cause**: `FlareAgent._do_submit_result()` unconditionally waited
1800s for `DOWNLOAD_COMPLETE_CB` after every result send when
`pass_through_on_send=True`. For validate results (metrics only, no
tensors), `_finalize_download_tx()` creates no download transaction and
the callback never fires — subprocess blocks indefinitely.

**Fix**: Thread-local `was_download_initiated()` flag, set by
`_finalize_download_tx()` only when downloadable objects exist. Agent
returns immediately if `False` (validate result). Thread-local is
required because task pipe and metric pipe share the same `CoreCell`
(same `site_name + token + mode` → same FQCN → same `_CellInfo` cache
entry → same `fobs_ctx`); a plain flag would be clobbered by concurrent
metric serialisation from a different thread.

(`swarm.py`)

**Root cause**: `SwarmClientConfig` hardcodes
`learn_task_ack_timeout=10s` and `final_result_ack_timeout=10s`. For
large models (≥2 GB), P2P tensor streaming takes minutes — the ACK times
out before download completes.

**Fix**: Add `round_timeout: float = 3600` to `SwarmLearningRecipe`.
Wires both `learn_task_ack_timeout` and `final_result_ack_timeout`;
`learn_task_timeout` is intentionally left `None` (unbounded) to avoid
capping per-round training time on slow hardware or 70B+ models.

(`client_api_launcher_executor.py`)

**Root cause**: `ClientAPILauncherExecutor.__init__()` stored
`max_resends` as `self._max_resends` but `TaskExchanger` (the parent)
uses `self.max_resends`. `PipeHandler` reads `self.max_resends` and saw
`None` (the parent default) instead of the configured value.

**Fix**: Restore `self._max_resends` as the private attribute throughout
`ClientAPILauncherExecutor` (consistent with private-member convention);
the executor builds its own config dict explicitly so it does not rely
on the inherited attribute.

`ex_process/api.py`)

**Problem**: The subprocess had no logging configuration —
`logger.info()` calls were silently dropped. Wrapping all stdout via
`logger.info()` in the parent caused double-prefixed entries in
`log.txt` for NVFlare-formatted log lines.

**Fix**: `_configure_subprocess_logging()` in `ex_process/api.py` loads
the site's `log_config.json` unchanged, giving the subprocess identical
loggers to the parent (both consoleHandler + file handlers). The
parent's `log_subprocess_output()` now calls `_route_subprocess_line()`
which strips ANSI codes and checks for a `YYYY-MM-DD HH:MM:SS`
timestamp:
- Formatted NVFlare log line → `print()` to terminal only (file handler
already wrote it to `log.txt`)
- Raw `print()` from user training script → `logger.info()` so it
reaches `log.txt`

Regression tests:
`test_log_subprocess_output_formatted_lines_not_double_logged` + 5
`TestRouteSubprocessLine` tests.

(`conn_manager.py`)

**Root cause**: `process_frame_task()` and `start_connector()` could
raise unhandled `RuntimeError` when called after the executor was shut
down, causing noisy tracebacks during job teardown.

**Fix**: Guard `conn_mgr_executor.submit()` with try/except
`RuntimeError` (log debug, skip). Add `if self.stopped: return` early
exit in `process_frame_task()`.

**Regression tests**: 4 new tests in
`test_conn_manager_shutdown_race.py`.

`SwarmLearningRecipe` always created a `CellPipe`; users needing
`FilePipe` (restricted networks, third-party integrations) had to drop
to the low-level `CCWFJob` + `SwarmClientConfig` API.

**Change**: Add `pipe_type: str = "cell_pipe"` and `pipe_root_path:
Optional[str] = None`:
- `"cell_pipe"` (default): unchanged — zero-copy PASS_THROUGH
forwarding, ~1 GB RAM overhead
- `"file_pipe"`: `FilePipe` created with `root_path` defaulting to
`{WORKSPACE}/pipe` (resolved at runtime); custom absolute path validated
to exist at recipe construction time
- Warnings for `pipe_root_path` ignored with `cell_pipe`, and
`file_pipe` with `launch_external_process=False`
- `ScriptRunner.__init__` gains `task_pipe` parameter forwarded to
`BaseScriptRunner`

9 new tests in `TestSwarmLearningRecipePipeType`.

| File | Bug | Change |
|------|-----|--------|
| `nvflare/fuel/utils/fobs/decomposers/via_downloader.py` | 1 + 3 |
Remove `subscribe_to_msg_root`; add thread-local `_tls`,
`was_download_initiated()`, `clear_download_initiated()` |
| `nvflare/client/flare_agent.py` | 3 | Check `was_download_initiated()`
after `send_to_peer()`; return immediately for validate results |
| `nvflare/app_common/ccwf/cse_client_ctl.py` | 2 | Persistor-first
`_prepare_local_model()` with last-"best"-key inventory scan and
isinstance guard |
| `nvflare/app_opt/pt/recipes/swarm.py` | 4 + rename + pipe | Add
`round_timeout`, `pipe_type`, `pipe_root_path`; rename to
`SwarmLearningRecipe`; backward-compat alias kept |
| `nvflare/job_config/script_runner.py` | pipe | Add `task_pipe` param
to `ScriptRunner`, forwarded to `BaseScriptRunner` |
| `nvflare/app_common/ccwf/recipes/swarm.py` | rename | Export
`SwarmLearningRecipe` alongside `SimpleSwarmLearningRecipe` |
| `nvflare/app_opt/pt/recipes/__init__.py` | rename | Add
`SwarmLearningRecipe` to lazy imports and `__all__` |
| `nvflare/app_common/executors/client_api_launcher_executor.py` | 5 |
Restore `self._max_resends` (private convention) |
| `nvflare/app_common/launchers/subprocess_launcher.py` | logging |
`_route_subprocess_line()`: formatted lines → `print()`, raw lines →
`logger.info()` |
| `nvflare/client/ex_process/api.py` | logging |
`_configure_subprocess_logging()`: apply site log config unchanged (same
handlers as parent) |
| `nvflare/fuel/f3/sfm/conn_manager.py` | 6 | Guard executor submit +
frame processing against post-shutdown RuntimeError |
| `docs/programming_guide/memory_management.rst` | docs | Add missing
`min_clients` to example |
| `docs/user_guide/data_scientist_guide/available_recipes.rst` | docs |
Add missing `min_clients`; rename to `SwarmLearningRecipe` |
| `docs/programming_guide/controllers/client_controlled_workflows.rst` |
docs | Add missing `min_clients` (x2) |
| `docs/programming_guide/timeouts.rst` | docs | Rename to
`SwarmLearningRecipe` |
| `examples/advanced/swarm_learning/swarm_pt/README.md` | 4 + rename |
Add `round_timeout` to config table; rename to `SwarmLearningRecipe` |
| `examples/advanced/swarm_learning/swarm_pt/job.py` | rename | Update
import to `SwarmLearningRecipe` |
| `tests/unit_test/fuel/utils/fobs/test_via_downloader_msg_root.py` | 1
| 4 new tests |
| `tests/unit_test/client/test_download_initiated_gating.py` | 3 | 7 new
tests |
| `tests/unit_test/app_common/ccwf/test_cse_persistor_fallback.py` | 2 |
10 new tests (incl. initial-ckpt-shadowing regression) |
| `tests/unit_test/fuel/f3/sfm/test_conn_manager_shutdown_race.py` | 6 |
4 new regression tests |
| `tests/unit_test/app_common/launchers/subprocess_launcher_test.py` |
logging | 6 new tests for `_route_subprocess_line` and double-logging
prevention |
| `tests/unit_test/recipe/swarm_recipe_test.py` | rename + pipe |
Updated to `SwarmLearningRecipe`; 9 new
`TestSwarmLearningRecipePipeType` tests |
|
`tests/unit_test/app_common/executors/client_api_launcher_executor_test.py`
| 5 | Updated: `_max_resends` assertions |
| `tests/unit_test/recipe/component_config_verification_test.py` |
rename | Updated to `SwarmLearningRecipe` |

- [x] 250+ unit tests pass across all affected modules
- [x] Bug 1: `test_via_downloader_msg_root.py` — verifies
`subscribe_to_msg_root` never called, method removed,
`DOWNLOAD_COMPLETE_CB` unaffected
- [x] Bug 2: `test_cse_persistor_fallback.py` — verifies persistor-first
path, last-"best"-key preference, initial-ckpt-shadowing regression,
isinstance guard, all fallback paths
- [x] Bug 3: `test_download_initiated_gating.py` — verifies thread
isolation, validate returns immediately (<1s), train waits,
clear-before-send
- [x] Bug 4: covered by existing recipe and ccwf tests
- [x] Bug 5: `client_api_launcher_executor_test.py` updated
(`_max_resends` assertions)
- [x] Bug 6: `test_conn_manager_shutdown_race.py` — 4 tests:
executor-shutdown no-raise, stop() no-raise, stopped early-return,
Prefix.from_bytes not called
- [x] Logging: `subprocess_launcher_test.py` — double-logging
prevention, ANSI stripping, plain-line capture, partial-timestamp
detection
- [x] Pipe type: `swarm_recipe_test.py` — 9 tests: default cell_pipe,
file_pipe instance, workspace template, custom path, invalid type,
warnings, path validation
- [x] Rename: backward-compat `SimpleSwarmLearningRecipe` import test
passes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
chesterxgchen added a commit to chesterxgchen/NVFlare that referenced this pull request Mar 29, 2026
… validate hang, round_timeout (NVIDIA#4270)

Fixes four bugs in NVFlare 2.7.2 RC12 affecting Swarm Learning with
`launch_external_process=True`
Implementation Reference @GeorgeWang-nv's original PR
NVIDIA#4263
Bug 1  -- different implementation from PR4263
Bug 2 -- accept the implementation approach with small changes from PR
4263
Bug 3 -- use different approach
Bug 4 -- add timeout to swarm API but consolidate the timeouts from 4 to
2 and expose to Swarm Recipes
Bug 5 -- restore `self._max_resends` (private convention); subprocess
logging fixed (see below)
Bug 6 -- guard ConnManager against RuntimeError after shutdown; 4
regression tests added

**Additional (review feedback):**
- Rename `SimpleSwarmLearningRecipe` → `SwarmLearningRecipe` everywhere;
backward-compat alias kept
- Fix docs: add missing required `min_clients` arg to all
`SwarmLearningRecipe` examples (TypeError for users copying doc
examples)
- CSE inventory scan: take LAST "best" key, not first — prevents initial
checkpoint named "best_*" from shadowing trained best model
- Add `pipe_type` / `pipe_root_path` to `SwarmLearningRecipe` so users
can select `FilePipe` without dropping to low-level API

(`via_downloader.py`)

**Root cause**: `_create_downloader()` subscribed to msg_root deletion
to clean up download transactions. The msg_root is deleted as soon as
blob envelopes are delivered, but `blob_cb` fires asynchronously —
secondary tensor downloads were still in flight when
`delete_transaction()` removed the ref from `_ref_table`, causing `"no
ref found" FATAL_SYSTEM_ERROR` on large models (≥1B parameters).

**Fix**: Remove `subscribe_to_msg_root` from `_create_downloader()`
entirely. Transaction lifecycle is now managed solely by `_monitor_tx()`
in `download_service.py`, which polls `is_finished()` every 5s and
cleans up only after all chunk downloads complete.

(`cse_client_ctl.py`)

**Root cause**: `_prepare_local_model()` called
`submit_model_executor.execute()` which, for
`ClientAPILauncherExecutor`, launches a fresh subprocess with no trained
model state. The training subprocess had already exited; the model was
already saved to disk by `PTFileModelPersistor`.

**Fix**: Try the persistor first. Inventory key scan prefers the LAST
key containing `"best"` when `model_name` contains `"best"` — taking the
last "best" key avoids mistaking an initial checkpoint named `"best_*"`
(added first by `PTFileModelPersistor`) for the trained best model.
`isinstance` guard replaces `assert isinstance` (safe under `python
-O`). Falls back to executor for in-process mode compatibility.

(`flare_agent.py` + `via_downloader.py`)

**Root cause**: `FlareAgent._do_submit_result()` unconditionally waited
1800s for `DOWNLOAD_COMPLETE_CB` after every result send when
`pass_through_on_send=True`. For validate results (metrics only, no
tensors), `_finalize_download_tx()` creates no download transaction and
the callback never fires — subprocess blocks indefinitely.

**Fix**: Thread-local `was_download_initiated()` flag, set by
`_finalize_download_tx()` only when downloadable objects exist. Agent
returns immediately if `False` (validate result). Thread-local is
required because task pipe and metric pipe share the same `CoreCell`
(same `site_name + token + mode` → same FQCN → same `_CellInfo` cache
entry → same `fobs_ctx`); a plain flag would be clobbered by concurrent
metric serialisation from a different thread.

(`swarm.py`)

**Root cause**: `SwarmClientConfig` hardcodes
`learn_task_ack_timeout=10s` and `final_result_ack_timeout=10s`. For
large models (≥2 GB), P2P tensor streaming takes minutes — the ACK times
out before download completes.

**Fix**: Add `round_timeout: float = 3600` to `SwarmLearningRecipe`.
Wires both `learn_task_ack_timeout` and `final_result_ack_timeout`;
`learn_task_timeout` is intentionally left `None` (unbounded) to avoid
capping per-round training time on slow hardware or 70B+ models.

(`client_api_launcher_executor.py`)

**Root cause**: `ClientAPILauncherExecutor.__init__()` stored
`max_resends` as `self._max_resends` but `TaskExchanger` (the parent)
uses `self.max_resends`. `PipeHandler` reads `self.max_resends` and saw
`None` (the parent default) instead of the configured value.

**Fix**: Restore `self._max_resends` as the private attribute throughout
`ClientAPILauncherExecutor` (consistent with private-member convention);
the executor builds its own config dict explicitly so it does not rely
on the inherited attribute.

`ex_process/api.py`)

**Problem**: The subprocess had no logging configuration —
`logger.info()` calls were silently dropped. Wrapping all stdout via
`logger.info()` in the parent caused double-prefixed entries in
`log.txt` for NVFlare-formatted log lines.

**Fix**: `_configure_subprocess_logging()` in `ex_process/api.py` loads
the site's `log_config.json` unchanged, giving the subprocess identical
loggers to the parent (both consoleHandler + file handlers). The
parent's `log_subprocess_output()` now calls `_route_subprocess_line()`
which strips ANSI codes and checks for a `YYYY-MM-DD HH:MM:SS`
timestamp:
- Formatted NVFlare log line → `print()` to terminal only (file handler
already wrote it to `log.txt`)
- Raw `print()` from user training script → `logger.info()` so it
reaches `log.txt`

Regression tests:
`test_log_subprocess_output_formatted_lines_not_double_logged` + 5
`TestRouteSubprocessLine` tests.

(`conn_manager.py`)

**Root cause**: `process_frame_task()` and `start_connector()` could
raise unhandled `RuntimeError` when called after the executor was shut
down, causing noisy tracebacks during job teardown.

**Fix**: Guard `conn_mgr_executor.submit()` with try/except
`RuntimeError` (log debug, skip). Add `if self.stopped: return` early
exit in `process_frame_task()`.

**Regression tests**: 4 new tests in
`test_conn_manager_shutdown_race.py`.

`SwarmLearningRecipe` always created a `CellPipe`; users needing
`FilePipe` (restricted networks, third-party integrations) had to drop
to the low-level `CCWFJob` + `SwarmClientConfig` API.

**Change**: Add `pipe_type: str = "cell_pipe"` and `pipe_root_path:
Optional[str] = None`:
- `"cell_pipe"` (default): unchanged — zero-copy PASS_THROUGH
forwarding, ~1 GB RAM overhead
- `"file_pipe"`: `FilePipe` created with `root_path` defaulting to
`{WORKSPACE}/pipe` (resolved at runtime); custom absolute path validated
to exist at recipe construction time
- Warnings for `pipe_root_path` ignored with `cell_pipe`, and
`file_pipe` with `launch_external_process=False`
- `ScriptRunner.__init__` gains `task_pipe` parameter forwarded to
`BaseScriptRunner`

9 new tests in `TestSwarmLearningRecipePipeType`.

| File | Bug | Change |
|------|-----|--------|
| `nvflare/fuel/utils/fobs/decomposers/via_downloader.py` | 1 + 3 |
Remove `subscribe_to_msg_root`; add thread-local `_tls`,
`was_download_initiated()`, `clear_download_initiated()` |
| `nvflare/client/flare_agent.py` | 3 | Check `was_download_initiated()`
after `send_to_peer()`; return immediately for validate results |
| `nvflare/app_common/ccwf/cse_client_ctl.py` | 2 | Persistor-first
`_prepare_local_model()` with last-"best"-key inventory scan and
isinstance guard |
| `nvflare/app_opt/pt/recipes/swarm.py` | 4 + rename + pipe | Add
`round_timeout`, `pipe_type`, `pipe_root_path`; rename to
`SwarmLearningRecipe`; backward-compat alias kept |
| `nvflare/job_config/script_runner.py` | pipe | Add `task_pipe` param
to `ScriptRunner`, forwarded to `BaseScriptRunner` |
| `nvflare/app_common/ccwf/recipes/swarm.py` | rename | Export
`SwarmLearningRecipe` alongside `SimpleSwarmLearningRecipe` |
| `nvflare/app_opt/pt/recipes/__init__.py` | rename | Add
`SwarmLearningRecipe` to lazy imports and `__all__` |
| `nvflare/app_common/executors/client_api_launcher_executor.py` | 5 |
Restore `self._max_resends` (private convention) |
| `nvflare/app_common/launchers/subprocess_launcher.py` | logging |
`_route_subprocess_line()`: formatted lines → `print()`, raw lines →
`logger.info()` |
| `nvflare/client/ex_process/api.py` | logging |
`_configure_subprocess_logging()`: apply site log config unchanged (same
handlers as parent) |
| `nvflare/fuel/f3/sfm/conn_manager.py` | 6 | Guard executor submit +
frame processing against post-shutdown RuntimeError |
| `docs/programming_guide/memory_management.rst` | docs | Add missing
`min_clients` to example |
| `docs/user_guide/data_scientist_guide/available_recipes.rst` | docs |
Add missing `min_clients`; rename to `SwarmLearningRecipe` |
| `docs/programming_guide/controllers/client_controlled_workflows.rst` |
docs | Add missing `min_clients` (x2) |
| `docs/programming_guide/timeouts.rst` | docs | Rename to
`SwarmLearningRecipe` |
| `examples/advanced/swarm_learning/swarm_pt/README.md` | 4 + rename |
Add `round_timeout` to config table; rename to `SwarmLearningRecipe` |
| `examples/advanced/swarm_learning/swarm_pt/job.py` | rename | Update
import to `SwarmLearningRecipe` |
| `tests/unit_test/fuel/utils/fobs/test_via_downloader_msg_root.py` | 1
| 4 new tests |
| `tests/unit_test/client/test_download_initiated_gating.py` | 3 | 7 new
tests |
| `tests/unit_test/app_common/ccwf/test_cse_persistor_fallback.py` | 2 |
10 new tests (incl. initial-ckpt-shadowing regression) |
| `tests/unit_test/fuel/f3/sfm/test_conn_manager_shutdown_race.py` | 6 |
4 new regression tests |
| `tests/unit_test/app_common/launchers/subprocess_launcher_test.py` |
logging | 6 new tests for `_route_subprocess_line` and double-logging
prevention |
| `tests/unit_test/recipe/swarm_recipe_test.py` | rename + pipe |
Updated to `SwarmLearningRecipe`; 9 new
`TestSwarmLearningRecipePipeType` tests |
|
`tests/unit_test/app_common/executors/client_api_launcher_executor_test.py`
| 5 | Updated: `_max_resends` assertions |
| `tests/unit_test/recipe/component_config_verification_test.py` |
rename | Updated to `SwarmLearningRecipe` |

- [x] 250+ unit tests pass across all affected modules
- [x] Bug 1: `test_via_downloader_msg_root.py` — verifies
`subscribe_to_msg_root` never called, method removed,
`DOWNLOAD_COMPLETE_CB` unaffected
- [x] Bug 2: `test_cse_persistor_fallback.py` — verifies persistor-first
path, last-"best"-key preference, initial-ckpt-shadowing regression,
isinstance guard, all fallback paths
- [x] Bug 3: `test_download_initiated_gating.py` — verifies thread
isolation, validate returns immediately (<1s), train waits,
clear-before-send
- [x] Bug 4: covered by existing recipe and ccwf tests
- [x] Bug 5: `client_api_launcher_executor_test.py` updated
(`_max_resends` assertions)
- [x] Bug 6: `test_conn_manager_shutdown_race.py` — 4 tests:
executor-shutdown no-raise, stop() no-raise, stopped early-return,
Prefix.from_bytes not called
- [x] Logging: `subprocess_launcher_test.py` — double-logging
prevention, ANSI stripping, plain-line capture, partial-timestamp
detection
- [x] Pipe type: `swarm_recipe_test.py` — 9 tests: default cell_pipe,
file_pipe instance, workspace template, custom path, invalid type,
warnings, path validation
- [x] Rename: backward-compat `SimpleSwarmLearningRecipe` import test
passes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
chesterxgchen added a commit to chesterxgchen/NVFlare that referenced this pull request Mar 29, 2026
… validate hang, round_timeout (NVIDIA#4270)

Fixes four bugs in NVFlare 2.7.2 RC12 affecting Swarm Learning with
`launch_external_process=True`
Implementation Reference @GeorgeWang-nv's original PR
NVIDIA#4263
Bug 1  -- different implementation from PR4263
Bug 2 -- accept the implementation approach with small changes from PR
4263
Bug 3 -- use different approach
Bug 4 -- add timeout to swarm API but consolidate the timeouts from 4 to
2 and expose to Swarm Recipes
Bug 5 -- restore `self._max_resends` (private convention); subprocess
logging fixed (see below)
Bug 6 -- guard ConnManager against RuntimeError after shutdown; 4
regression tests added

**Additional (review feedback):**
- Rename `SimpleSwarmLearningRecipe` → `SwarmLearningRecipe` everywhere;
backward-compat alias kept
- Fix docs: add missing required `min_clients` arg to all
`SwarmLearningRecipe` examples (TypeError for users copying doc
examples)
- CSE inventory scan: take LAST "best" key, not first — prevents initial
checkpoint named "best_*" from shadowing trained best model
- Add `pipe_type` / `pipe_root_path` to `SwarmLearningRecipe` so users
can select `FilePipe` without dropping to low-level API

(`via_downloader.py`)

**Root cause**: `_create_downloader()` subscribed to msg_root deletion
to clean up download transactions. The msg_root is deleted as soon as
blob envelopes are delivered, but `blob_cb` fires asynchronously —
secondary tensor downloads were still in flight when
`delete_transaction()` removed the ref from `_ref_table`, causing `"no
ref found" FATAL_SYSTEM_ERROR` on large models (≥1B parameters).

**Fix**: Remove `subscribe_to_msg_root` from `_create_downloader()`
entirely. Transaction lifecycle is now managed solely by `_monitor_tx()`
in `download_service.py`, which polls `is_finished()` every 5s and
cleans up only after all chunk downloads complete.

(`cse_client_ctl.py`)

**Root cause**: `_prepare_local_model()` called
`submit_model_executor.execute()` which, for
`ClientAPILauncherExecutor`, launches a fresh subprocess with no trained
model state. The training subprocess had already exited; the model was
already saved to disk by `PTFileModelPersistor`.

**Fix**: Try the persistor first. Inventory key scan prefers the LAST
key containing `"best"` when `model_name` contains `"best"` — taking the
last "best" key avoids mistaking an initial checkpoint named `"best_*"`
(added first by `PTFileModelPersistor`) for the trained best model.
`isinstance` guard replaces `assert isinstance` (safe under `python
-O`). Falls back to executor for in-process mode compatibility.

(`flare_agent.py` + `via_downloader.py`)

**Root cause**: `FlareAgent._do_submit_result()` unconditionally waited
1800s for `DOWNLOAD_COMPLETE_CB` after every result send when
`pass_through_on_send=True`. For validate results (metrics only, no
tensors), `_finalize_download_tx()` creates no download transaction and
the callback never fires — subprocess blocks indefinitely.

**Fix**: Thread-local `was_download_initiated()` flag, set by
`_finalize_download_tx()` only when downloadable objects exist. Agent
returns immediately if `False` (validate result). Thread-local is
required because task pipe and metric pipe share the same `CoreCell`
(same `site_name + token + mode` → same FQCN → same `_CellInfo` cache
entry → same `fobs_ctx`); a plain flag would be clobbered by concurrent
metric serialisation from a different thread.

(`swarm.py`)

**Root cause**: `SwarmClientConfig` hardcodes
`learn_task_ack_timeout=10s` and `final_result_ack_timeout=10s`. For
large models (≥2 GB), P2P tensor streaming takes minutes — the ACK times
out before download completes.

**Fix**: Add `round_timeout: float = 3600` to `SwarmLearningRecipe`.
Wires both `learn_task_ack_timeout` and `final_result_ack_timeout`;
`learn_task_timeout` is intentionally left `None` (unbounded) to avoid
capping per-round training time on slow hardware or 70B+ models.

(`client_api_launcher_executor.py`)

**Root cause**: `ClientAPILauncherExecutor.__init__()` stored
`max_resends` as `self._max_resends` but `TaskExchanger` (the parent)
uses `self.max_resends`. `PipeHandler` reads `self.max_resends` and saw
`None` (the parent default) instead of the configured value.

**Fix**: Restore `self._max_resends` as the private attribute throughout
`ClientAPILauncherExecutor` (consistent with private-member convention);
the executor builds its own config dict explicitly so it does not rely
on the inherited attribute.

`ex_process/api.py`)

**Problem**: The subprocess had no logging configuration —
`logger.info()` calls were silently dropped. Wrapping all stdout via
`logger.info()` in the parent caused double-prefixed entries in
`log.txt` for NVFlare-formatted log lines.

**Fix**: `_configure_subprocess_logging()` in `ex_process/api.py` loads
the site's `log_config.json` unchanged, giving the subprocess identical
loggers to the parent (both consoleHandler + file handlers). The
parent's `log_subprocess_output()` now calls `_route_subprocess_line()`
which strips ANSI codes and checks for a `YYYY-MM-DD HH:MM:SS`
timestamp:
- Formatted NVFlare log line → `print()` to terminal only (file handler
already wrote it to `log.txt`)
- Raw `print()` from user training script → `logger.info()` so it
reaches `log.txt`

Regression tests:
`test_log_subprocess_output_formatted_lines_not_double_logged` + 5
`TestRouteSubprocessLine` tests.

(`conn_manager.py`)

**Root cause**: `process_frame_task()` and `start_connector()` could
raise unhandled `RuntimeError` when called after the executor was shut
down, causing noisy tracebacks during job teardown.

**Fix**: Guard `conn_mgr_executor.submit()` with try/except
`RuntimeError` (log debug, skip). Add `if self.stopped: return` early
exit in `process_frame_task()`.

**Regression tests**: 4 new tests in
`test_conn_manager_shutdown_race.py`.

`SwarmLearningRecipe` always created a `CellPipe`; users needing
`FilePipe` (restricted networks, third-party integrations) had to drop
to the low-level `CCWFJob` + `SwarmClientConfig` API.

**Change**: Add `pipe_type: str = "cell_pipe"` and `pipe_root_path:
Optional[str] = None`:
- `"cell_pipe"` (default): unchanged — zero-copy PASS_THROUGH
forwarding, ~1 GB RAM overhead
- `"file_pipe"`: `FilePipe` created with `root_path` defaulting to
`{WORKSPACE}/pipe` (resolved at runtime); custom absolute path validated
to exist at recipe construction time
- Warnings for `pipe_root_path` ignored with `cell_pipe`, and
`file_pipe` with `launch_external_process=False`
- `ScriptRunner.__init__` gains `task_pipe` parameter forwarded to
`BaseScriptRunner`

9 new tests in `TestSwarmLearningRecipePipeType`.

| File | Bug | Change |
|------|-----|--------|
| `nvflare/fuel/utils/fobs/decomposers/via_downloader.py` | 1 + 3 |
Remove `subscribe_to_msg_root`; add thread-local `_tls`,
`was_download_initiated()`, `clear_download_initiated()` |
| `nvflare/client/flare_agent.py` | 3 | Check `was_download_initiated()`
after `send_to_peer()`; return immediately for validate results |
| `nvflare/app_common/ccwf/cse_client_ctl.py` | 2 | Persistor-first
`_prepare_local_model()` with last-"best"-key inventory scan and
isinstance guard |
| `nvflare/app_opt/pt/recipes/swarm.py` | 4 + rename + pipe | Add
`round_timeout`, `pipe_type`, `pipe_root_path`; rename to
`SwarmLearningRecipe`; backward-compat alias kept |
| `nvflare/job_config/script_runner.py` | pipe | Add `task_pipe` param
to `ScriptRunner`, forwarded to `BaseScriptRunner` |
| `nvflare/app_common/ccwf/recipes/swarm.py` | rename | Export
`SwarmLearningRecipe` alongside `SimpleSwarmLearningRecipe` |
| `nvflare/app_opt/pt/recipes/__init__.py` | rename | Add
`SwarmLearningRecipe` to lazy imports and `__all__` |
| `nvflare/app_common/executors/client_api_launcher_executor.py` | 5 |
Restore `self._max_resends` (private convention) |
| `nvflare/app_common/launchers/subprocess_launcher.py` | logging |
`_route_subprocess_line()`: formatted lines → `print()`, raw lines →
`logger.info()` |
| `nvflare/client/ex_process/api.py` | logging |
`_configure_subprocess_logging()`: apply site log config unchanged (same
handlers as parent) |
| `nvflare/fuel/f3/sfm/conn_manager.py` | 6 | Guard executor submit +
frame processing against post-shutdown RuntimeError |
| `docs/programming_guide/memory_management.rst` | docs | Add missing
`min_clients` to example |
| `docs/user_guide/data_scientist_guide/available_recipes.rst` | docs |
Add missing `min_clients`; rename to `SwarmLearningRecipe` |
| `docs/programming_guide/controllers/client_controlled_workflows.rst` |
docs | Add missing `min_clients` (x2) |
| `docs/programming_guide/timeouts.rst` | docs | Rename to
`SwarmLearningRecipe` |
| `examples/advanced/swarm_learning/swarm_pt/README.md` | 4 + rename |
Add `round_timeout` to config table; rename to `SwarmLearningRecipe` |
| `examples/advanced/swarm_learning/swarm_pt/job.py` | rename | Update
import to `SwarmLearningRecipe` |
| `tests/unit_test/fuel/utils/fobs/test_via_downloader_msg_root.py` | 1
| 4 new tests |
| `tests/unit_test/client/test_download_initiated_gating.py` | 3 | 7 new
tests |
| `tests/unit_test/app_common/ccwf/test_cse_persistor_fallback.py` | 2 |
10 new tests (incl. initial-ckpt-shadowing regression) |
| `tests/unit_test/fuel/f3/sfm/test_conn_manager_shutdown_race.py` | 6 |
4 new regression tests |
| `tests/unit_test/app_common/launchers/subprocess_launcher_test.py` |
logging | 6 new tests for `_route_subprocess_line` and double-logging
prevention |
| `tests/unit_test/recipe/swarm_recipe_test.py` | rename + pipe |
Updated to `SwarmLearningRecipe`; 9 new
`TestSwarmLearningRecipePipeType` tests |
|
`tests/unit_test/app_common/executors/client_api_launcher_executor_test.py`
| 5 | Updated: `_max_resends` assertions |
| `tests/unit_test/recipe/component_config_verification_test.py` |
rename | Updated to `SwarmLearningRecipe` |

- [x] 250+ unit tests pass across all affected modules
- [x] Bug 1: `test_via_downloader_msg_root.py` — verifies
`subscribe_to_msg_root` never called, method removed,
`DOWNLOAD_COMPLETE_CB` unaffected
- [x] Bug 2: `test_cse_persistor_fallback.py` — verifies persistor-first
path, last-"best"-key preference, initial-ckpt-shadowing regression,
isinstance guard, all fallback paths
- [x] Bug 3: `test_download_initiated_gating.py` — verifies thread
isolation, validate returns immediately (<1s), train waits,
clear-before-send
- [x] Bug 4: covered by existing recipe and ccwf tests
- [x] Bug 5: `client_api_launcher_executor_test.py` updated
(`_max_resends` assertions)
- [x] Bug 6: `test_conn_manager_shutdown_race.py` — 4 tests:
executor-shutdown no-raise, stop() no-raise, stopped early-return,
Prefix.from_bytes not called
- [x] Logging: `subprocess_launcher_test.py` — double-logging
prevention, ANSI stripping, plain-line capture, partial-timestamp
detection
- [x] Pipe type: `swarm_recipe_test.py` — 9 tests: default cell_pipe,
file_pipe instance, workspace template, custom path, invalid type,
warnings, path validation
- [x] Rename: backward-compat `SimpleSwarmLearningRecipe` import test
passes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants