Add RCCL and training warmup for HYBRID_SHARD stability#83
Add RCCL and training warmup for HYBRID_SHARD stability#83
Conversation
oyazdanb
commented
Feb 3, 2026
- Add warmup.py with RCCL communicator warmup, manual param sync, and training collectives warmup functions
- Integrate RCCL warmup in build_fsdp_model before FSDP init
- Add training warmup in main() before training loop
- Add TORCH_DIST_INIT_TIMEOUT support for dist.init_process_group and dist.new_group calls
- Add cuda.synchronize() before gradient clipping to prevent race
- Improve exception handling in finally block
- Update set_env_variables.sh with DOCKER_ENV_VARS array for automatic env propagation to Docker containers
- Remove duplicate logging in local_launch.sh (logs now only in logs/ directory via master_launch.sh)
- Add warmup config params to shampoo_opt_multi_node.yaml
- Add warmup.py with RCCL communicator warmup, manual param sync, and training collectives warmup functions - Integrate RCCL warmup in build_fsdp_model before FSDP init - Add training warmup in main() before training loop - Add TORCH_DIST_INIT_TIMEOUT support for dist.init_process_group and dist.new_group calls - Add cuda.synchronize() before gradient clipping to prevent race - Improve exception handling in finally block - Update set_env_variables.sh with DOCKER_ENV_VARS array for automatic env propagation to Docker containers - Remove duplicate logging in local_launch.sh (logs now only in logs/ directory via master_launch.sh) - Add warmup config params to shampoo_opt_multi_node.yaml Co-authored-by: Cursor <cursoragent@cursor.com>
There was a problem hiding this comment.
Pull request overview
This pull request adds RCCL communicator and training warmup functionality to improve stability of HYBRID_SHARD distributed training, particularly addressing race conditions in inter-node RDMA setup during FSDP initialization. The PR also enhances timeout handling, exception management in cleanup code, and improves environment variable management in shell scripts.
Changes:
- Adds
warmup.pymodule with functions for RCCL communicator warmup, manual parameter synchronization, and training collectives warmup - Integrates RCCL warmup before FSDP initialization and training warmup before the main training loop in
fsdp_trainer.py - Adds configurable timeout support using
TORCH_DIST_INIT_TIMEOUTenvironment variable for distributed initialization - Refactors shell scripts to centralize environment variable configuration with automatic Docker environment propagation
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
| src/aorta/utils/warmup.py | New module providing RCCL communicator warmup, manual parameter sync, and training collectives warmup functions |
| src/aorta/utils/init.py | Exports new warmup functions to the public API |
| src/aorta/training/fsdp_trainer.py | Integrates warmup functions, adds timeout configuration, improves exception handling, and adds cuda.synchronize() before gradient clipping |
| scripts/multi_node/set_env_variables.sh | Centralizes NCCL/RCCL environment variables with DOCKER_ENV_VARS array and build_docker_env_flags helper function |
| scripts/multi_node/local_launch.sh | Updates to use centralized environment variables and removes duplicate logging |
| config/multi_node/shampoo_opt_multi_node.yaml | Adds warmup configuration parameters (rccl_warmup_iterations, training_warmup_steps, skip flags) |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Ensure GPU operations are complete before FSDP wrapping | ||
| # This helps prevent race conditions with inter-node communicators | ||
| torch.cuda.synchronize() | ||
| dist.barrier() |
There was a problem hiding this comment.
The dist.barrier() call on line 303 is executed unconditionally for all sharding strategies, not just HYBRID_SHARD. While this is likely safe, it adds unnecessary synchronization overhead for non-HYBRID_SHARD configurations. Consider moving this barrier inside the HYBRID_SHARD if-block (after line 298) to avoid unnecessary synchronization when using other sharding strategies.
| # Ensure GPU operations are complete before FSDP wrapping | |
| # This helps prevent race conditions with inter-node communicators | |
| torch.cuda.synchronize() | |
| dist.barrier() | |
| # Ensure all HYBRID_SHARD ranks reach this point before FSDP wrapping | |
| dist.barrier() | |
| # Ensure GPU operations are complete before FSDP wrapping | |
| # This helps prevent race conditions with inter-node communicators | |
| torch.cuda.synchronize() |
| local flags="" | ||
| for var in "${DOCKER_ENV_VARS[@]}"; do | ||
| local value="${!var}" | ||
| flags+=" -e ${var}=${value}" |
There was a problem hiding this comment.
The build_docker_env_flags function does not quote the environment variable values when building the flags string. If any environment variable contains spaces or special characters (e.g., NCCL_SOCKET_IFNAME=enp49s0f0np0,fenic0), this could cause issues with shell word splitting when the flags are used in the docker exec command. Consider quoting the values: flags+=" -e ${var}="${value}""
| flags+=" -e ${var}=${value}" | |
| flags+=" -e ${var}=$(printf '%q' "$value")" |
| HSA_ENABLE_SDMA | ||
| # Protocol/Channels | ||
| NCCL_PROTO | ||
| NCCL_MIN_NCHANNELS |
There was a problem hiding this comment.
The DOCKER_ENV_VARS array includes NCCL_MIN_NCHANNELS (line 91), but the actual export statement for NCCL_MIN_NCHANNELS is commented out (line 44). This means the build_docker_env_flags function will pass an undefined or empty value for NCCL_MIN_NCHANNELS to the Docker container. Either uncomment the export on line 44 or remove NCCL_MIN_NCHANNELS from the DOCKER_ENV_VARS array.
| NCCL_MIN_NCHANNELS |
| Args: | ||
| model: The FSDP-wrapped model | ||
| replicate_group: Inter-node replicate process group for broadcasting | ||
| """ |
There was a problem hiding this comment.
The function calls dist.get_rank() without first checking if distributed is initialized. If distributed training is not initialized, this will raise an exception. Consider adding a check with dist.is_initialized() at the beginning of the function, or document that this function must only be called when distributed is initialized.
| """ | |
| """ | |
| if not dist.is_available() or not dist.is_initialized(): | |
| log.warning( | |
| "manual_sync_params called but torch.distributed is not initialized; " | |
| "skipping parameter synchronization." | |
| ) | |
| return |
| dist.barrier() | ||
|
|
||
| log.debug("Warmup step %d complete (rank=%d, loss=%.4f)", warmup_step, rank, loss.item()) | ||
|
|
||
| # Reset optimizer state after warmup to not affect actual training | ||
| optimizer.zero_grad(set_to_none=True) | ||
| torch.cuda.synchronize() | ||
| dist.barrier() |
There was a problem hiding this comment.
The function calls dist.barrier() on lines 241 and 248 without checking if distributed is initialized. While line 198 conditionally gets the rank, the barrier calls will fail if distributed is not initialized. Consider adding checks before these barrier calls or documenting that this function requires distributed training to be initialized.
| dist.barrier() | |
| log.debug("Warmup step %d complete (rank=%d, loss=%.4f)", warmup_step, rank, loss.item()) | |
| # Reset optimizer state after warmup to not affect actual training | |
| optimizer.zero_grad(set_to_none=True) | |
| torch.cuda.synchronize() | |
| dist.barrier() | |
| if dist.is_available() and dist.is_initialized(): | |
| dist.barrier() | |
| log.debug("Warmup step %d complete (rank=%d, loss=%.4f)", warmup_step, rank, loss.item()) | |
| # Reset optimizer state after warmup to not affect actual training | |
| optimizer.zero_grad(set_to_none=True) | |
| torch.cuda.synchronize() | |
| if dist.is_available() and dist.is_initialized(): | |
| dist.barrier() |
| # Training warmup: run a few forward/backward/optimizer steps to warm up collectives | ||
| if not fsdp_cfg.skip_training_warmup and fsdp_cfg.training_warmup_steps > 0: |
There was a problem hiding this comment.
The training warmup logic uses fsdp_cfg.skip_training_warmup and fsdp_cfg.training_warmup_steps even when dist_mode is "ddp" (line 909-910). This means the warmup will be skipped or use default values for DDP mode, which may not be the desired behavior. Consider either: (1) adding warmup configuration to DDPConfig, (2) checking dist_mode before applying warmup, or (3) documenting that warmup is only intended for FSDP mode.
| # Training warmup: run a few forward/backward/optimizer steps to warm up collectives | |
| if not fsdp_cfg.skip_training_warmup and fsdp_cfg.training_warmup_steps > 0: | |
| # Training warmup: run a few forward/backward/optimizer steps to warm up collectives. | |
| # This warmup is currently configured via FSDPConfig and is only applied in non-DDP modes. | |
| if dist_mode != "ddp" and not fsdp_cfg.skip_training_warmup and fsdp_cfg.training_warmup_steps > 0: |
There was a problem hiding this comment.
@oyazdanb please take a look at this review comment. I'm not sure if the code design is as you intended.
| @@ -136,18 +155,16 @@ if [ "${ENABLE_ROCPROF}" = "true" ]; then | |||
| log "Running with rocprofv3 kernel tracing inside Docker" | |||
| ${DOCKER_EXEC} bash -c "rocprofv3 ${ROCPROF_ARGS} -d ${ROCPROF_DIR} -- \ | |||
| ${BASE_CMD} ${BASE_OVERRIDES} \ | |||
| --override training.output_dir=${OUTPUT_DIR_DOCKER}" \ | |||
| 2>&1 | tee -a "${LOG_FILE}" | |||
| --override training.output_dir=${OUTPUT_DIR_DOCKER}" 2>&1 | |||
| fi | |||
| else | |||
| log "Running inside Docker container" | |||
| log "Command: ${BASE_CMD} ${BASE_OVERRIDES} --override training.output_dir=${OUTPUT_DIR_DOCKER}" | |||
| ${DOCKER_EXEC} bash -c "${BASE_CMD} ${BASE_OVERRIDES} \ | |||
| --override training.output_dir=${OUTPUT_DIR_DOCKER}" \ | |||
| 2>&1 | tee -a "${LOG_FILE}" | |||
| --override training.output_dir=${OUTPUT_DIR_DOCKER}" 2>&1 | |||
There was a problem hiding this comment.
The expansion of DOCKER_EXEC here is vulnerable to shell command injection because it is built from untrusted inputs (the DOCKER_CONTAINER CLI argument and environment-derived values in DOCKER_ENV_FLAGS) and then expanded unquoted in a command context. An attacker who can influence DOCKER_CONTAINER or one of the environment variables (e.g., by including characters like ;, && or $(...)) can break out of the intended docker exec invocation and execute arbitrary additional commands on the host. To mitigate this, avoid constructing a full command string in DOCKER_EXEC and instead pass docker exec and its arguments as separate, properly quoted words (or an array), and ensure DOCKER_CONTAINER is strictly validated/whitelisted to be a simple container name.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
| """ | ||
| rank = dist.get_rank() | ||
| # Use a larger tensor for more thorough warmup | ||
| warmup_tensor = torch.ones(8192, device=device, dtype=torch.float32) |
There was a problem hiding this comment.
instead of magic numbers, use code something like:
# At module level
WARMUP_TENSOR_SIZE = 8192 # 32KB per GPU (float32)
# Chosen to exercise memory channels without excessive overhead
# In function:
warmup_tensor = torch.ones(WARMUP_TENSOR_SIZE, device=device, dtype=torch.float32)
| # ----------------------------------------------------------------------------- | ||
| export NCCL_TIMEOUT_MS=12000 # 12 second timeout (legacy, not used by PyTorch) | ||
| export NCCL_TIMEOUT=100 # 100 second timeout - first backward can be slow due to JIT/init | ||
| export TORCH_DIST_INIT_TIMEOUT=150 # Match collective timeout for consistency |
There was a problem hiding this comment.
timeout NCCL_TIMEOUT and TORCH_DIST_INIT_TIMEOUT are pointing to different values. Any reason why do we have different values and different environment variables for same thing? Which one is effective in our code?
| # Final synchronization with extra delay | ||
| torch.cuda.synchronize() | ||
| dist.barrier() | ||
| torch.cuda.synchronize() |
There was a problem hiding this comment.
double calls of sync and barrier. Need to remove one set
| log.debug("Warmup step %d complete (rank=%d, loss=%.4f)", warmup_step, rank, loss.item()) | ||
|
|
||
| # Reset optimizer state after warmup to not affect actual training | ||
| optimizer.zero_grad(set_to_none=True) |
There was a problem hiding this comment.
Issue: zero_grad() only clears gradients, not optimizer state (momentum, learning rate schedule, etc.). Optimizers like Adam/Shampoo maintain state that's updated during warmup steps.
Problems:
- Warmup steps consume learning rate schedule steps
- Momentum buffers are initialized with warmup gradients
- Shampoo preconditioner is updated with warmup data
There are 3 options:
- save optimizer state before warm up and restore after warmup.
# After warmup, reset optimizer to initial state
optimizer_state_backup = optimizer.state_dict()
# ... run warmup ...
# Restore initial state (but this is tricky with FSDP)
- Create fresh optimizer after warmup
- Document that warmup affects optimizer state
| def warmup_training_collectives( | ||
| model: nn.Module, | ||
| optimizer: torch.optim.Optimizer, | ||
| dataloader, |