Skip to content

feat: Distributed workers mode#2430

Open
thomhurst wants to merge 55 commits intomainfrom
001-distributed-workers
Open

feat: Distributed workers mode#2430
thomhurst wants to merge 55 commits intomainfrom
001-distributed-workers

Conversation

@thomhurst
Copy link
Owner

Summary

  • Adds a distributed execution mode where pipeline instances coordinate across multiple runners (e.g., GitHub Actions matrix) — instance 0 acts as master, additional instances as workers
  • Introduces IDistributedCoordinator and IDistributedArtifactStore abstractions with Redis and S3-compatible (Cloudflare R2, AWS S3, MinIO) backends
  • Attribute-driven artifact sharing ([ProducesArtifact], [ConsumesArtifact]) with automatic deduplication of downloads to the same path
  • Matrix module expansion, capability-based routing, worker health monitoring, and cancellation propagation

New packages

  • ModularPipelines.Distributed — Core orchestration (master/worker executors, lifecycle manager, InMemory coordinator)
  • ModularPipelines.Distributed.Redis — Redis coordinator + artifact store backend
  • ModularPipelines.Distributed.Artifacts.S3 — S3-compatible artifact store (Cloudflare R2, AWS S3, Backblaze B2, MinIO)

Core abstractions (in ModularPipelines)

  • IDistributedCoordinator / IDistributedCoordinatorFactory — cross-instance orchestration
  • IDistributedArtifactStore / IDistributedArtifactStoreFactory — file artifact storage
  • IArtifactContext — module-facing API (context.Artifacts().PublishFileAsync/DownloadAsync)
  • [ProducesArtifact], [ConsumesArtifact], [PinToMaster], [RequiresCapability], [MatrixTarget] attributes

Build pipeline integration

  • GitHub Actions workflow updated with matrix instance indices for distributed execution
  • Redis (Upstash) for coordination, Cloudflare R2 for artifact storage
  • Secret masking for all new credentials

Test plan

  • 44 distributed unit tests pass (coordinator, executor, serialization, capabilities, artifacts)
  • 36 Redis unit tests pass (coordinator, key builder, run identifier resolver)
  • 6 S3 artifact store unit tests pass (upload, download, delete, list)
  • Artifact download deduplication verified (same path → single download, different paths → separate downloads, concurrent access → single download)
  • End-to-end validation on GitHub Actions with real Redis + R2 backends

Introduces a distributed execution mode where pipeline instances coordinate
across multiple runners (e.g., GitHub Actions matrix). Instance 0 becomes the
master orchestrating work, while additional instances act as workers.

Key components:
- Core abstractions: IDistributedCoordinator, IDistributedArtifactStore
- Redis coordinator backend for cross-instance orchestration
- S3-compatible artifact store (supports Cloudflare R2, AWS S3, MinIO)
- Attribute-driven artifact sharing ([ProducesArtifact], [ConsumesArtifact])
- Automatic deduplication of artifact downloads for same paths
- Matrix module expansion and capability-based routing
- Worker health monitoring and cancellation propagation

Includes 86+ unit tests across distributed, Redis, and S3 test projects.
The artifacts/ gitignore rule was case-insensitively matching our
source Artifacts/ directories. Added exceptions for src/**/Artifacts/
and test/**/Artifacts/ to allow these to be tracked.
Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review: Distributed Workers Mode

This is a substantial and well-structured PR. The architecture (pluggable coordinators, attribute-driven artifact routing, capability matching) is thoughtfully designed. Below are the issues I found, roughly ordered by severity.


Critical: Reflection to access internal CompletionSource

WorkerModuleExecutor.cs — The worker executor accesses an internal CompletionSource via private reflection to await module execution:

var completionSourceProp = module.GetType().GetProperty(
    "CompletionSource",
    System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic)!;

This is a fragile coupling to implementation internals. It will break silently if the property is renamed/removed, produces null-ref exceptions on ! if not found, and is incompatible with AOT/Native AOT trimming. The worker should execute the module through the existing public IModuleRunner / ModuleExecutionPipeline contract rather than bypassing it. If the module result isn't accessible through the public API, that's the missing piece to add — not a reflection hack.


Major: BuildServiceProvider() called inside ConfigureServices

DistributedPipelinePlugin.cs — The plugin calls services.BuildServiceProvider() (twice — once for options, once for the warning logger):

var sp = services.BuildServiceProvider();             // first call
// ...
var tempSp = services.BuildServiceProvider();         // second call

Calling BuildServiceProvider() inside ConfigureServices is explicitly flagged as an anti-pattern by the .NET team (docs). It creates a separate, orphaned container that instantiates singletons a second time, may access services before they're fully configured, and creates a memory leak. The correct approach is to use a factory-pattern registration (AddSingleton(sp => ...)) or resolve options through IOptions<T> inside the factory delegates that are already present.


Major: WorkerHealthMonitor is a non-functional stub

WorkerHealthMonitor.cs — The core of the monitor loop is empty:

foreach (var worker in workers)
{
    if (worker.Status == WorkerStatus.TimedOut || worker.Status == WorkerStatus.Disconnected)
        continue;

    // Check if worker has timed out based on registration time
    // In a real implementation, we'd track last heartbeat time
    // For now, rely on the coordinator's heartbeat tracking
}

Heartbeat data is never read and no timeout is enforced. The InMemoryDistributedCoordinator.SendHeartbeatAsync updates worker status but doesn't store a timestamp. There is no path where a worker is ever marked as TimedOut. The PR description marks worker failure resilience (US6) as complete, but the implementation is a no-op. Either the feature should be implemented or the PR description should be updated and this registered as a known gap.


Major: Capability dequeue creates a livelock in InMemoryDistributedCoordinator

InMemoryDistributedCoordinator.DequeueModuleAsync — When a worker dequeues an assignment it can't handle (wrong capabilities), it re-enqueues it and spins:

// Re-enqueue if this worker can't handle it
_workQueue.Writer.TryWrite(assignment);
await Task.Delay(50, cancellationToken);

With N workers and a module requiring a capability no currently-connected worker possesses, all N workers will spin in a 50ms loop indefinitely — reading, re-enqueuing, and re-reading the same assignment. This wastes CPU and produces misleading log noise. A better design is for the coordinator to hold capability-mismatched assignments in a side buffer until a compatible worker registers, then move them to the main queue. Alternatively, the dequeue should filter server-side so workers never receive assignments they cannot handle.


Major: CancellationToken.None throughout worker execution loop

WorkerModuleExecutor.cs — Nearly all coordinator calls use CancellationToken.None:

await _coordinator.RegisterWorkerAsync(registration, CancellationToken.None);
var assignment = await _coordinator.DequeueModuleAsync(capabilities, CancellationToken.None);
await _coordinator.PublishResultAsync(serialized, CancellationToken.None);

This means clean shutdown via the host's cancellation token is not possible. The worker execution loop should accept and propagate the cancellation token that the IHostedService (or IModuleExecutor) lifecycle provides. The DequeueModuleAsync in particular should be cancellable so the worker can stop blocking when the host shuts down.


Significant: ConfigurePipeline is empty — matrix expansion not implemented

DistributedPipelinePlugin.cs:

public void ConfigurePipeline(PipelineBuilder pipelineBuilder)
{
    // Pipeline-level configuration (e.g., matrix expansion) will be added in later phases
}

The PR description marks tasks T044–T048 (matrix module expansion) as complete (- [x]), and MatrixModuleExpander.cs exists in the diff, but it is never wired up. The expansion hook that should call MatrixModuleExpander in ConfigurePipeline is missing. Either the expansion should be connected, or the task checkboxes should reflect the actual state.


Significant: Redis dequeue uses polling instead of BLPOP

RedisDistributedCoordinator.DequeueModuleAsync — Uses Task.Delay polling:

await Task.Delay(_dequeuePollDelay, cancellationToken);

Redis natively supports blocking pop (BLPOP/BRPOP) which blocks until an item is available. Using BLPOP with a timeout eliminates polling latency entirely (immediate wake-up when work arrives), reduces Redis command volume, and simplifies the loop. The capability re-enqueue/delay pattern from InMemoryDistributedCoordinator is replicated here with the same livelock risk.


Significant: S3 artifact download buffers entire file into MemoryStream

S3DistributedArtifactStore.DownloadAsync:

var ms = new MemoryStream();
await response.ResponseStream.CopyToAsync(ms, cancellationToken);
ms.Position = 0;
return ms;

The comment says this is to avoid holding the S3 response stream open, but for large artifacts (e.g., build outputs, test results) this will OOM the worker. A better approach is to return the response stream directly (the caller should dispose it), or implement the download as a write-to-path overload so the content streams directly to disk without intermediate buffering.


Moderate: TOTAL_INSTANCES: 3 hardcoded in GitHub Actions workflow

.github/workflows/dotnet.yml:

env:
  TOTAL_INSTANCES: 3

The matrix has exactly 3 entries (ubuntu, windows, macos), so 3 is correct today — but it's a manual sync point. If an OS is added or removed from the matrix, TOTAL_INSTANCES must be updated separately or the distributed coordinator will wait forever for a worker that never arrives. Consider deriving this from ${{ strategy.job-total }} which GitHub Actions provides automatically.


Moderate: ArtifactLifecycleManager.ResolvePathPattern has surprising multi-match behavior

ArtifactLifecycleManager.cs — When a glob matches multiple files, it returns the common parent directory:

if (matches.Length > 1)
{
    return GetCommonParentDirectory(matches) ?? baseDir;
}

A user specifying "**/bin/Release/*.nupkg" expecting to upload multiple NuGet packages would instead get the common ancestor directory uploaded as a zip. The expected behavior should be to upload each matched file individually as separate artifact entries, not collapse them to a directory.


Minor: docs/distributed-runners-proposal.md in repo root

This design document (docs/distributed-runners-proposal.md) appears to be a pre-implementation exploration doc committed to the repo root. Now that the implementation exists and proper documentation is in docs/docs/distributed/, this file should either be moved to specs/ or removed to avoid confusion about which document represents the current state.


Minor: specs/ folder with raw planning artifacts

The specs/001-distributed-workers/ folder containing task checklists, data models, and implementation plans is valuable context for contributors but unusual in a library repo. Consider whether this should live in a separate dev-docs branch, the GitHub wiki, or a docs/contributing/ subfolder instead of the main source tree.


Summary

The overall architecture is solid and the abstractions (IDistributedCoordinator, IDistributedArtifactStore, attribute-driven routing) are well-designed and extensible. The main concerns are:

  1. The reflection-based CompletionSource access should be replaced with a proper public execution path.
  2. BuildServiceProvider() in ConfigureServices is a correctness issue that can cause surprising DI behavior.
  3. WorkerHealthMonitor and matrix expansion integration are marked done but aren't functional yet.
  4. CancellationToken.None throughout the worker makes clean shutdown impossible.
  5. The capability dequeue busy-loop pattern appears in both coordinator implementations and should be rethought.

The Redis pub/sub result notification (using check-then-subscribe to close the race) is a nice pattern and well-implemented. The artifact deduplication via ConcurrentDictionary<string, Lazy<Task>> is also clean.

Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review: Distributed Workers Mode

This is a substantial and well-structured PR. The architecture (pluggable coordinators, attribute-driven artifact routing, capability matching) is thoughtfully designed. Below are the issues I found, roughly ordered by severity.


Critical: Reflection to access internal CompletionSource

WorkerModuleExecutor.cs — The worker executor accesses an internal CompletionSource via private reflection to await module execution:

var completionSourceProp = module.GetType().GetProperty(
    "CompletionSource",
    System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic)\!;

This is a fragile coupling to implementation internals. It will break silently if the property is renamed/removed, produces null-ref exceptions on \! if not found, and is incompatible with AOT/Native AOT trimming. The worker should execute the module through the existing public IModuleRunner / ModuleExecutionPipeline contract rather than bypassing it. If the module result isn't accessible through the public API, that's the missing piece to add — not a reflection hack.


Major: BuildServiceProvider() called inside ConfigureServices

DistributedPipelinePlugin.cs — The plugin calls services.BuildServiceProvider() twice (once for options, once for the warning logger). Calling BuildServiceProvider() inside ConfigureServices is explicitly flagged as an anti-pattern by the .NET team. It creates a separate, orphaned container that instantiates singletons a second time, may access services before they're fully configured, and creates a memory leak. The correct approach is to use factory-pattern registration (AddSingleton(sp => ...)) or defer resolution until the factory delegates are invoked.


Major: WorkerHealthMonitor is a non-functional stub

WorkerHealthMonitor.cs — The core of the monitor loop is empty:

// In a real implementation, we'd track last heartbeat time
// For now, rely on the coordinator's heartbeat tracking

Heartbeat data is never read and no timeout is enforced. InMemoryDistributedCoordinator.SendHeartbeatAsync updates worker status but doesn't store a timestamp. There is no path where a worker is ever marked as TimedOut. The PR description marks worker failure resilience (US6) as complete, but the implementation is a no-op. Either the feature should be implemented or the PR description should reflect this gap.


Major: Capability dequeue creates a livelock in InMemoryDistributedCoordinator

InMemoryDistributedCoordinator.DequeueModuleAsync — When a worker dequeues an assignment it can't handle, it re-enqueues and spins with a 50ms delay. With N workers and a module requiring a capability no currently-connected worker possesses, all N workers will spin indefinitely. A better design is for the coordinator to hold capability-mismatched assignments in a side buffer until a compatible worker registers, then move them to the main queue. The same pattern is replicated in RedisDistributedCoordinator with the same problem.


Major: CancellationToken.None throughout worker execution loop

WorkerModuleExecutor.cs — Nearly all coordinator calls use CancellationToken.None, making clean shutdown via the host's cancellation token impossible. The DequeueModuleAsync call in particular should be cancellable so the worker can stop blocking when the host shuts down.


Significant: ConfigurePipeline is empty — matrix expansion not wired up

DistributedPipelinePlugin.cs:

public void ConfigurePipeline(PipelineBuilder pipelineBuilder)
{
    // Pipeline-level configuration (e.g., matrix expansion) will be added in later phases
}

The PR description marks tasks T044–T048 (matrix module expansion) as complete, and MatrixModuleExpander.cs exists in the diff, but it is never wired up here. Either connect it or update the task checkboxes to reflect the actual state.


Significant: Redis dequeue uses polling instead of BLPOP

RedisDistributedCoordinator.DequeueModuleAsync uses Task.Delay polling. Redis natively supports BLPOP/BRPOP which blocks server-side until an item is available, eliminating polling latency and reducing command volume. This is a straightforward improvement with no downside.


Significant: S3 artifact download buffers entire file into MemoryStream

S3DistributedArtifactStore.DownloadAsync copies the full S3 response stream into a MemoryStream before returning. For large artifacts (build outputs, test results) this will OOM the worker. Consider returning the response stream directly (caller disposes) or implementing a download-to-path overload that streams directly to disk.


Moderate: TOTAL_INSTANCES: 3 hardcoded in GitHub Actions workflow

The matrix has 3 entries (ubuntu, windows, macos), so 3 is correct today — but it's a manual sync point. Consider using ${{ strategy.job-total }} which GitHub Actions provides automatically, eliminating the risk of a mismatch if the matrix changes.


Moderate: ResolvePathPattern has surprising multi-match behavior

ArtifactLifecycleManager.cs — When a glob matches multiple files, it returns the common parent directory and uploads it as a zip. A user specifying "**/bin/Release/*.nupkg" to upload multiple packages would instead get the parent directory uploaded. The expected behavior is to upload each matched file as a separate artifact entry.


Minor: docs/distributed-runners-proposal.md in repo root

This pre-implementation exploration doc committed to the repo root is superseded by the proper documentation in docs/docs/distributed/. It should be moved to specs/ or removed to avoid confusion.


Summary

The overall architecture is solid: pluggable IDistributedCoordinator, IDistributedArtifactStore, attribute-driven routing, and the Redis pub/sub result notification (using check-then-subscribe to close the race) are all well-designed. The artifact deduplication via ConcurrentDictionary<string, Lazy<Task>> is clean.

The main blockers are:

  1. Reflection-based CompletionSource access — needs a proper public execution path.
  2. BuildServiceProvider() in ConfigureServices — correctness issue.
  3. WorkerHealthMonitor and matrix expansion — marked done but not functional.
  4. CancellationToken.None throughout the worker — clean shutdown is impossible.
  5. Capability dequeue busy-loop — should be redesigned in both coordinator implementations.

…aring

Replace per-instance build step with a single BuildSolutionsModule that
runs on the master Linux instance, stages bin/Release/ output into
_build-staging/, and shares it to workers via the distributed artifact
store. Workers now only restore (for project.assets.json) and receive
compiled output via ConsumesArtifact before running tests.
Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up Review: Distributed Workers Mode

I reviewed this against my previous comments to check which have been addressed. Here's the current status.


Improvements since last review

WorkerHeartbeatService.cs — The heartbeat service now correctly propagates stoppingToken to the coordinator call and the delay, enabling clean shutdown via host cancellation. This was one of the cancellation concerns from before.

Matrix has 5 instances — The matrix was updated to 5 instances. Still hardcoded (see below), but the count at least matches the matrix size.


Still Open: Critical

Reflection on CompletionSource in WorkerModuleExecutor.cs (from previous review, unchanged)

The worker still accesses the module's internal CompletionSource via private reflection. This remains a fragile coupling that breaks under AOT/NativeAOT, silently breaks on rename, and throws NullReferenceException with ! if not found. The correct approach is to execute the module through the existing IModuleRunner contract. If the runner doesn't expose a way to await a specific module result, that surface area needs to be added to the public API.


Still Open: Major

CancellationToken.None in WorkerModuleExecutor (from previous review, partially addressed)

WorkerHeartbeatService was fixed, but WorkerModuleExecutor's coordinator calls — RegisterWorkerAsync, DequeueModuleAsync, and PublishResultAsync — appear to still use CancellationToken.None. The dequeue loop in particular needs to receive and propagate the cancellation token so the worker actually stops blocking when the host shuts down.

WorkerHealthMonitor.cs is still a stub (from previous review, unchanged)

The monitor loop body is still empty — HeartbeatTimeoutSeconds is computed and discarded, and no worker is ever marked TimedOut. The PR description marks US6 (worker failure resilience) as complete, which remains inaccurate. Either implement the eviction or remove it from the "done" checklist.

Capability dequeue busy-loop (from previous review, unchanged)

Both InMemoryDistributedCoordinator and RedisDistributedCoordinator still re-enqueue and spin with Task.Delay(50ms) when a worker pulls an assignment it can't handle. With N workers and an incompatible assignment, all N workers cycle through the same item endlessly. The fix is to filter assignments server-side before delivery, or move capability-mismatched items to a per-capability side-queue until a matching worker connects.


Still Open: Significant

Redis dequeue polling instead of BLPOP (from previous review, unchanged)

RedisDistributedCoordinator.DequeueModuleAsync still polls with Task.Delay. Redis's BLPOP/BRPOP would eliminate the polling latency (items are delivered immediately rather than after up to one poll interval) and remove the delay loop entirely. This is worth fixing before the feature ships.

S3 downloads buffer entire file to MemoryStream (from previous review, unchanged)

S3DistributedArtifactStore.DownloadAsync still reads the full S3 response into a MemoryStream before returning. For large build artifacts (binaries, test results, coverage data) this will OOM workers. The response stream should be returned directly or the download should stream straight to disk.

ArtifactLifecycleManager.ResolvePathPattern multi-match collapse (from previous review, unchanged)

When a glob matches multiple files, the resolved "path" is the common parent directory, not the individual files. A user declaring [ProducesArtifact("*.nupkg")] expecting to upload multiple packages will silently upload the parent directory as a zip instead. Each matched file should be uploaded as a separate artifact entry.

TOTAL_INSTANCES still hardcoded (from previous review, still present)

Updated from 3 to 5 but still manually maintained. ${{ strategy.job-total }} is set automatically by GitHub Actions and equals the number of matrix entries. Using it removes the manual sync point entirely:

env:
  TOTAL_INSTANCES: ${{ strategy.job-total }}

MatrixModuleExpander still not wired into ConfigurePipeline

MatrixModuleExpander was added and has its own tests, but DistributedPipelinePlugin.ConfigurePipeline still has a placeholder comment — the expander is never called. The PR description marks matrix expansion (T044–T048) as [x] done, but the integration step is missing.


Still Open: Minor

docs/distributed-runners-proposal.md in repo root — Still present alongside the docs/docs/distributed/ documentation. The exploration doc and the live docs describe different states of the design and will cause confusion for future contributors. Move or remove it.

specs/001-distributed-workers/ — Still committed as source artifacts. No change here, but it's worth a decision before merging to main.


New: DistributedPipelinePlugin service resolution

The BuildServiceProvider() anti-pattern raised in the previous review was a concern about coordinator/store factory resolution inside ConfigureServices. Based on the current structure (coordinator resolved via DI ordering: explicit > factory > in-memory fallback), I can't confirm whether BuildServiceProvider() was removed without seeing the actual factory delegation code. If it was removed and replaced with AddSingleton(sp => ...) factory delegates, please call that out — it would close one of the previous major issues.


Summary

The heartbeat service cancellation fix is a genuine improvement. The rest of the issues from the previous review remain open. The highest-priority items before merging:

  1. Replace the CompletionSource reflection with a supported execution path
  2. Propagate cancellation tokens through WorkerModuleExecutor
  3. Implement or explicitly stub out the WorkerHealthMonitor
  4. Wire MatrixModuleExpander into ConfigurePipeline (or uncheck the task in the PR description)
  5. Fix the capability dequeue livelock

- Replace reflection-based CompletionSource access with IModule.ResultTask
  property, eliminating fragile reflection in WorkerModuleExecutor
- Remove BuildServiceProvider() anti-pattern in DistributedPipelinePlugin;
  resolve options directly from service descriptors instead
- Implement WorkerHealthMonitor heartbeat tracking with actual timeout
  detection via GetLastHeartbeatAsync/UpdateWorkerStatusAsync on coordinator
- Fix capability dequeue livelock: InMemoryDistributedCoordinator now uses
  List+SemaphoreSlim with selective dequeue instead of Channel re-enqueue
- Redis dequeue uses LRANGE+LREM for capability scanning and BRPOP for
  blocking wait, eliminating polling loops
- Propagate CancellationToken via IHostApplicationLifetime in worker
  executor instead of using CancellationToken.None everywhere
- Stream S3 artifact downloads to temp files instead of MemoryStream to
  avoid OOM on large artifacts
- Use strategy.job-total for TOTAL_INSTANCES in GitHub Actions workflow
- Fix ResolvePathPattern to return all matched paths instead of collapsing
  to common parent directory
- Remove superseded docs/specs planning artifacts
Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up Review: Distributed Workers Mode (Round 3)

I reviewed this against my two previous comment threads to track which issues have been addressed and which remain open.


Resolved since last review

Reflection on CompletionSource — FIXED

The CompletionSource reflection hack has been replaced with a proper public API. IModule now exposes a ResultTask property, and Module<T> implements it via a ContinueWith projection of the internal CompletionSource.Task. WorkerModuleExecutor now calls await module.ResultTask through this public interface. This was the most significant correctness issue from previous reviews, and the fix is clean.

BuildServiceProvider() anti-pattern — FIXED

DistributedPipelinePlugin.ConfigureServices no longer calls BuildServiceProvider(). Options are now extracted by scanning the IServiceCollection descriptors directly via ResolveOptionsFromDescriptors. The warning logger is now a lazy singleton (InMemoryCoordinatorWarning) that logs at resolution time. Both fixes are correct.

WorkerHealthMonitor stub — FIXED

The monitor now retrieves heartbeat timestamps via GetLastHeartbeatAsync and calls UpdateWorkerStatusAsync when a worker exceeds HeartbeatTimeoutSeconds. This is a real implementation with actual timeout enforcement.

IDistributedCoordinator interface expansion

The interface was extended with GetLastHeartbeatAsync and UpdateWorkerStatusAsync, which was necessary to support the health monitor fix above.

InMemory dequeue livelock — FIXED

InMemoryDistributedCoordinator.DequeueModuleAsync now uses a SemaphoreSlim with a proper capability-scan-within-lock pattern. It no longer re-enqueues and busy-spins; instead it scans the queue under the lock, finds the first matching assignment, and releases the semaphore back if a non-matching assignment was the trigger. This is correct and eliminates the previous livelock.


Still Open: Significant

Redis dequeue: LRANGE+LREM scan instead of BLPOP, with livelock when capabilities mismatch

RedisDistributedCoordinator.DequeueModuleAsync was improved — it now scans the list with LRANGE first, then atomically removes with LREM, and falls back to BRPOP only when no matching item is found. The BRPOP path then re-pushes a mismatched item to the front with LPUSH.

The remaining problem is the BRPOP + LPUSH fallback: when a capability-mismatched item is at the head of the queue and multiple workers are blocking with BRPOP, each worker will pop the item, check capabilities, re-push it, and the cycle repeats. With N workers all missing the required capability, the item bounces at O(N) throughput. This is the Redis analogue of the InMemory livelock that was fixed.

A cleaner approach is to abandon the BRPOP fallback entirely and instead use LRANGE polling with a configurable delay (already in the options), or use Redis Streams (XADD/XREADGROUP) which support consumer group acknowledgment and avoid the push-pop contention.

WorkerModuleExecutorResultTask semantics mismatch

WorkerModuleExecutor.ExecuteAsync calls await module.ResultTask to get the module result. However, ResultTask is backed by CompletionSource.Task, which is only completed when the module's ExecuteAsync returns and the framework sets the result on the CompletionSource. In the worker path, the module framework is not executing the module — the worker receives an assignment by name and is expected to run it remotely. Awaiting ResultTask on the worker side will deadlock unless the framework is also running the module's ExecuteAsync locally.

Concretely: if the worker receives a ModuleAssignment for MyBuildModule, finds the registered IModule instance, and then calls await module.ResultTask, it will wait forever because nothing is calling MyBuildModule.ExecuteAsync. The worker should be executing the module through IModuleRunner, not awaiting a completion source that only the framework can set.

The original test stub (WorkerModuleExecutorTests) has a comment saying "detailed testing requires mocking the full DI and execution pipeline" and its only test is await Assert.That(true).IsTrue(). This gap in test coverage means this fundamental flow — does the worker actually execute and get a result? — has no verification.

MatrixModuleExpander still not wired into ConfigurePipeline

DistributedPipelinePlugin.ConfigurePipeline still contains only a comment:

public void ConfigurePipeline(PipelineBuilder pipelineBuilder)
{
    // Pipeline-level configuration (e.g., matrix expansion) will be added in later phases
}

MatrixModuleExpander exists and has unit tests for its scan logic, but is never called from the plugin. The PR description marks matrix expansion tasks (T044–T048) as [x] complete. The scanner and metadata types are in place, but the pipeline integration — the part that actually creates expanded module instances — is absent. The checklist items should be updated to reflect that the scanner is implemented but the integration is deferred.

S3 downloads still buffer to MemoryStream

S3DistributedArtifactStore.DownloadAsync still copies the full S3 response into a MemoryStream before returning:

var ms = new MemoryStream();
await response.ResponseStream.CopyToAsync(ms, cancellationToken);
ms.Position = 0;
return ms;

For build artifacts of any significant size (binaries, coverage reports, packages) this holds the full content in heap memory. The comment says this avoids holding the S3 connection open — but the caller receives a Stream that it then CopyToAsyncs to a file anyway. The better approach is to return the response.ResponseStream directly (wrapped in a stream that disposes the GetObjectResponse when closed), letting the content stream from S3 to disk without intermediate buffering.

TOTAL_INSTANCES still hardcoded to 5 in the workflow

The matrix now has 5 entries (ubuntu/0, windows/1, macos/2, ubuntu/3, ubuntu/4), and TOTAL_INSTANCES is hardcoded to 5. GitHub Actions provides ${{ strategy.job-total }} which automatically equals the number of matrix jobs. This removes the manual sync point and prevents the coordinator from waiting for a ghost worker if the matrix is ever modified:

TOTAL_INSTANCES: ${{ strategy.job-total }}

Still Open: Minor

docs/distributed-runners-proposal.md in repo root

The pre-implementation design document in the repo root is still present alongside the published docs/docs/distributed/ documentation. It describes a different design state from what was implemented. It should be removed or moved to specs/ to avoid confusion.

DistributedPipelineBuilderExtensions._pluginRegistered is a static field

DistributedPipelineBuilderExtensions.EnsurePluginRegistered uses a static boolean to prevent double-registration:

private static bool _pluginRegistered;

This is process-wide state that survives across test runs in the same process (e.g., xUnit with parallel test collections). If multiple test cases configure different distributed setups, the first AddDistributedMode call will register the plugin for all subsequent calls. Use a thread-safe check (e.g., Interlocked.CompareExchange) and also consider whether PluginRegistry itself has idempotency guarantees.

Duplicate AddDistributedMode in PipelineBuilderExtensions

There are now two AddDistributedMode extension methods on PipelineBuilder: one in ModularPipelines.Distributed.Extensions.DistributedPipelineBuilderExtensions and one in ModularPipelines.Extensions.PipelineBuilderExtensions. The latter does not call EnsurePluginRegistered, so calling the core version won't register the plugin. This is a latent API confusion issue — callers of the core-package version of AddDistributedMode will get options registered but no plugin wired up.


New observation: IArtifactContext not registered per-module scope

ArtifactContextImpl is registered as a singleton in DistributedPipelinePlugin.ConfigureServices:

services.AddSingleton<IArtifactContext>(sp2 =>
    new ArtifactContextImpl(
        sp2.GetRequiredService<IDistributedArtifactStore>(),
        sp2.GetRequiredService<IOptions<ArtifactOptions>>(),
        string.Empty));  // empty moduleTypeName

The moduleTypeName parameter is hardcoded to string.Empty. The ArtifactContextImpl presumably uses this to tag artifacts with the producing module's name. Every module calling context.Artifacts().PublishFileAsync(...) will publish with an empty module type name, defeating the [ConsumesArtifact(producerModule: typeof(X))] lookup that uses the producer's type name as the key.

If IArtifactContext is meant to be per-module, it should be registered as a scoped service and initialized with the current module's type name. If it's meant to be shared, the PublishFileAsync API should accept the module type as a parameter rather than baking it into the constructor.


Summary

Solid progress since the last review. The three most important fixes (reflection removal, BuildServiceProvider(), and WorkerHealthMonitor) are all correct. The InMemory dequeue livelock is fixed. The heartbeat timestamp tracking is now real.

The items that need attention before this is production-ready:

  1. WorkerModuleExecutor.ResultTask semantics — awaiting a CompletionSource that nothing sets on the worker side will deadlock in real execution. This needs either a real execution call through IModuleRunner, or a clear explanation of how the module gets executed and sets its completion source on the worker.
  2. Redis dequeue BRPOP bounce — the InMemory livelock was fixed but the Redis analogue remains.
  3. IArtifactContext singleton with empty moduleTypeName — breaks artifact attribution in practice.
  4. MatrixModuleExpander integration — the checklist items should be unchecked until the ConfigurePipeline wiring exists.
  5. S3 MemoryStream buffering — lower priority but a real OOM risk for large artifacts.

…ptional

Move ModularPipelines.Analyzers.sln into the restore loop and
BuildSolutionsModule instead of a separate workflow build step.
Make DependsOn<BuildSolutionsModule> optional since it has
[RunOnLinuxOnly] and isn't registered on Mac/Windows instances.
Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review: Distributed Workers Mode

This is an ambitious and well-structured addition. The architecture is sound — clear separation of concerns with coordinator abstractions, capability routing, and artifact lifecycle management. The documentation is thorough and the test coverage is solid. Below are architectural concerns and design patterns worth revisiting before merge.


1. IDistributedCoordinator is a Role-Confused Interface (ISP Violation)

The single IDistributedCoordinator interface is implemented differently for master vs. worker, and both SignalRMasterCoordinator and SignalRWorkerCoordinator throw NotSupportedException on several methods:

// SignalRWorkerCoordinator
public Task EnqueueModuleAsync(...) => throw new NotSupportedException("Workers do not enqueue work.");
public Task<SerializedModuleResult> WaitForResultAsync(...) => throw new NotSupportedException("Workers do not wait for results.");
public Task<IReadOnlyList<WorkerRegistration>> GetRegisteredWorkersAsync(...) => throw new NotSupportedException("Workers do not query registered workers.");

This is a classic Liskov Substitution Principle violation: callers accepting IDistributedCoordinator cannot trust the contract at compile time. This also makes the interface harder to mock correctly in tests — you have to know which methods are valid for which role.

Why it matters: Any new coordinator implementation must carry the cognitive burden of knowing which half of the interface to implement, and runtime failures from wrong usage are invisible until execution.

Suggested direction: Split into two focused interfaces:

// The part the master needs
public interface IMasterCoordinator
{
    Task EnqueueModuleAsync(ModuleAssignment assignment, CancellationToken ct);
    Task<SerializedModuleResult> WaitForResultAsync(string moduleTypeName, CancellationToken ct);
    Task<IReadOnlyList<WorkerRegistration>> GetRegisteredWorkersAsync(CancellationToken ct);
    Task SignalCompletionAsync(CancellationToken ct);
}

// The part a worker needs
public interface IWorkerCoordinator
{
    Task<ModuleAssignment?> DequeueModuleAsync(IReadOnlySet<string> capabilities, CancellationToken ct);
    Task PublishResultAsync(SerializedModuleResult result, CancellationToken ct);
    Task RegisterWorkerAsync(WorkerRegistration registration, CancellationToken ct);
}

The Redis and InMemory implementations can implement both (they share state), while SignalR master/worker coordinators only implement the relevant one.


2. Significant Code Duplication Between Master and Worker Executors

DistributedModuleExecutor and WorkerModuleExecutor share nearly identical implementations for:

  • ApplyDependencyResults(...) — both decompress GZip-prefixed JSON and call ModuleCompletionSourceApplicator.TryApply
  • Artifact upload/download logic with the same error handling patterns
  • PublishResolutionFailureAsync(...) — identical sentinel-value failure publishing

This has already created an inconsistency: DistributedModuleExecutor references DistributedWorkPublisher.GzipPrefix directly, while WorkerModuleExecutor references it as Master.DistributedWorkPublisher.GzipPrefix — a leaky internal namespace dependency.

Why it matters: When one copy is updated (e.g., a bug fix to the GZip decompression path), the other diverges silently.

Suggested direction: Extract shared execution logic into an AssignmentExecutor helper used by both master and worker.


3. Dependency Results Embedded in Work Queue Payloads Will Not Scale

DistributedWorkPublisher.GatherDependencyResults() serializes and embeds all upstream dependency results into each ModuleAssignment. This has compounding problems:

  1. Payload fan-out: A diamond dependency pattern causes earlier results to be re-transmitted in every downstream assignment.
  2. Redis transport limits: The comment says "10 MB request cap" but this is not enforced — it will fail at the transport layer, not gracefully.
  3. The InMemory coordinator never compresses: GZip is only exercised in the Redis path, so unit tests using InMemory may pass while the Redis path fails under load.

Why it matters: For large build outputs, or pipelines with deep dependency chains, this causes silent data loss, transport errors, or OOM conditions.

Suggested direction: Workers should pull dependency results from the coordinator on demand (the same pattern Redis pub/sub already uses for final results):

var depResult = await coordinator.GetResultAsync(depTypeName, cancellationToken);

4. [MatrixTarget] is Public API with Silent Wrong Behavior

The MatrixModuleExpander.ScanForExpansions() is implemented, but the TODO comment in DistributedModuleExecutor shows it is explicitly not wired in:

// TODO(matrix): MatrixModuleExpander.ScanForExpansions not yet connected.
// Modules with [MatrixTarget] will run once, not N times.

The [MatrixTarget] attribute is public API and appears in documentation examples. Users who apply it will observe silent wrong behavior — their module runs once, not N times, with no error or warning.

Suggested direction: Either wire ScanForExpansions into DistributedModuleExecutor before merge, or add a runtime warning log when the attribute is detected, so the failure mode is visible rather than silent.


5. RunIdentifierResolver is Duplicated Across Three Packages

RunIdentifierResolver is copied verbatim into:

  • src/ModularPipelines.Distributed.Artifacts.S3/Configuration/RunIdentifierResolver.cs
  • src/ModularPipelines.Distributed.Redis/Configuration/RunIdentifierResolver.cs
  • src/ModularPipelines.Distributed.Discovery.Redis/RunIdentifierResolver.cs

Why it matters: If a new CI system needs support, all three must be updated independently. Any drift between them is a latent correctness bug.

Suggested direction: Move it into the core ModularPipelines.Distributed package as an internal static utility shared by all.


6. ArtifactLifecycleManager Has a Stuck-Download Risk

In ArtifactLifecycleManager.DownloadConsumedArtifactsForPathAsync, the shared download uses CancellationToken.None intentionally to prevent one caller's cancellation from aborting other waiters. But if the download hangs (network failure, S3 timeout), every subsequent caller for that key also hangs indefinitely — the failed-entry removal only runs if an exception is thrown, not on timeout.

Why it matters: A stuck download will deadlock the entire pipeline on all workers waiting for that artifact, with no escape valve.

Suggested direction: Apply a configurable download timeout independent of the caller's token, so the shared task itself has a bounded lifetime even when callers are patient.


7. Redis Lua Script Does a Linear Scan of the Entire Work Queue

ScanAndClaimScript does LRANGE key 0 -1 — fetching the entire list — then iterates to find a matching assignment. For the common case (no capability filtering), this is O(N) on every dequeue attempt by every worker.

Why it matters: With many workers polling simultaneously and a large queue, this creates O(N x W) Redis operations per scheduling cycle. Each LRANGE holds a Redis server lock for the entire iteration.

Suggested direction: Add a fast path for the no-capability case:

-- O(1) fast path when no capability filtering needed
if required == nil or #required == 0 then
    return redis.call('RPOP', KEYS[1])
end
-- fall through to O(N) capability-matching scan

Minor Observations

  • Heartbeat fields are documented but absent: HeartbeatIntervalSeconds and HeartbeatTimeoutSeconds appear in the docs configuration table but are not present in DistributedOptions in the diff. The heartbeat background services referenced in the architecture docs do not appear to be wired up.
  • Cloudflared regex is hardcoded to trycloudflare.com: This will fail silently if Cloudflare changes their quick-tunnel domain or if a user has a named tunnel. The regex should match any HTTPS URL from cloudflared output rather than a specific hostname suffix.

Summary

The overall design — coordinator abstraction, capability routing, artifact lifecycle management, and the Redis pub/sub approach for results — is well-thought-out and the test coverage is comprehensive. The main concerns to address before merge are:

  1. IDistributedCoordinator violates ISP — split by role
  2. Master/worker executor duplication should be extracted
  3. Embedding dependency results in assignments will not scale — prefer pull-on-demand
  4. [MatrixTarget] ships with documented-but-broken behavior — add a runtime warning at minimum
  5. RunIdentifierResolver triplication is a maintenance hazard
  6. ArtifactLifecycleManager needs a timeout escape valve for stuck downloads
  7. Redis Lua scan is O(N) where the common case could be O(1)

Server's JsonHubProtocol defaults to camelCase naming, but the client
uses default STJ options (PascalCase). This causes "Error binding
arguments" when deserializing WorkerRegistration records.

Configure server to use PascalCase (PropertyNamingPolicy = null) and
case-insensitive matching to align with the client.
Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Distributed Workers Mode — Round 12 Code Review

Reviewing against all previous rounds to track resolution status.


Resolved Since Last Review

Redis upload OOM — FIXED
RedisDistributedArtifactStore.UploadAsync now streams correctly using ReadFullBufferAsync and makes the single-vs-chunked decision within the streaming loop, without ever buffering the entire artifact in memory. The previous critical concern is resolved.

Reflection-based result application — FIXED
IModule now exposes TrySetDistributedResult(IModuleResult result) and ModuleCompletionSourceApplicator.TryApply calls it cleanly. No private reflection.

GetModule<T>() cross-process hang — Resolved by inline dependency results
The DependencyResults field in ModuleAssignment carries serialized results for all direct [DependsOn<T>] dependencies. Both master and worker apply these before execution, so GetModule<T>() works correctly across process boundaries for declared dependencies.

ArtifactLifecycleManager cached-failure in Lazy<Task> — FIXED
Failed entries are removed from _completedRestores in the catch block, allowing retry on subsequent calls.


Still Open

1. [MatrixTarget] is dead public API — ships with no effect

MatrixModuleExpander.ScanForExpansions is not called anywhere in DistributedModuleExecutor. The [MatrixTarget] attribute is public, documented, and mentioned in the PR summary, but silently has no effect. Modules decorated with it run once, not N times.

This was raised in the previous round and remains unaddressed. Shipping public attributes that appear to control behaviour but don't is misleading and hard to un-ship once users adopt it. Either wire up the expansion before this merges, or remove the attribute and expander from this PR entirely and add them in a follow-up.

2. architecture.md describes the wrong interface

The docs still describe an IDistributedCoordinator with 9 methods across four concerns including SendHeartbeatAsync, BroadcastCancellationAsync, and IsCancellationRequestedAsync. The actual interface has 7 methods; those three don't exist.

The architecture doc also says:

"Two background services start: WorkerHeartbeatService (periodic heartbeats) and WorkerCancellationMonitor (polls for cancellation)."

Neither of these services exists in the current codebase.

This creates a misleading contract description for implementors of custom coordinators. The docs need to be updated to reflect the current interface.

3. Redis chunked download — silent truncation on chunk TTL expiry

RedisDistributedArtifactStore.DownloadAsync reconstructs chunked artifacts by iterating until a chunk key is missing:

while (true)
{
    var chunkKey = _keys.ArtifactChunk(reference.ArtifactId, chunkIndex);
    var chunk = await _database.StringGetAsync(chunkKey);
    if (chunk.IsNull) { break; }
    ms.Write(bytes, 0, bytes.Length);
    chunkIndex++;
}

If any chunk key expires before all others (Redis evicts under memory pressure with non-uniform TTL enforcement), the result is silently truncated data — the download succeeds but the reconstructed artifact is corrupt.

The fix: store the chunk count in the artifact metadata during upload (ArtifactReference already has SizeBytes, but chunk count is not there). On download, validate that the number of chunks read matches the expected count and throw if they differ. The metadata JSON is small and won't add meaningful overhead.

4. WorkerModuleScheduler.ReadyModules throws NotSupportedException

public ChannelReader<ModuleState> ReadyModules =>
    throw new NotSupportedException("Worker does not use the ready-modules channel.");

If any framework code (hooks, health monitors, diagnostics) enumerates IModuleScheduler.ReadyModules without checking the concrete type, this propagates an unexpected exception. The null-object pattern is safer here:

private static readonly ChannelReader<ModuleState> _neverCompletingReader =
    Channel.CreateUnbounded<ModuleState>().Reader;

public ChannelReader<ModuleState> ReadyModules => _neverCompletingReader;

A never-completing reader behaves correctly: WaitToReadAsync blocks without returning items, and ReadAllAsync never yields. No caller is broken and no exception leaks.

5. RunIdentifierResolver is duplicated across three packages

There are three near-identical implementations of RunIdentifierResolver:

  • src/ModularPipelines.Distributed.Redis/Configuration/RunIdentifierResolver.cs
  • src/ModularPipelines.Distributed.Artifacts.S3/Configuration/RunIdentifierResolver.cs
  • src/ModularPipelines.Distributed.Discovery.Redis/RunIdentifierResolver.cs

All three read the same CI environment variables (GITHUB_RUN_ID, BUILD_BUILDID, etc.) and fall back to git SHA. Any bug fix or new CI system needs to be applied in three places. Move this to the core ModularPipelines.Distributed package (as RunIdentifierResolver in a Configuration namespace) and have the backend packages reference it.


New Findings

6. PortableFilePathJsonConverter — case-insensitive comparison on case-sensitive file systems

if (normalizedAbsolute.StartsWith(normalizedRoot, StringComparison.OrdinalIgnoreCase))

Linux file systems are case-sensitive. A path like /home/runner/Work/Project/src/output.dll will correctly serialise using OrdinalIgnoreCase, but if the git root is /home/runner/work/Project (note different casing), this will match and produce a malformed relative path. The comparison should use StringComparison.Ordinal on Linux and OrdinalIgnoreCase on Windows/macOS:

var comparison = RuntimeInformation.IsOSPlatform(OSPlatform.Linux)
    ? StringComparison.Ordinal
    : StringComparison.OrdinalIgnoreCase;

if (normalizedAbsolute.StartsWith(normalizedRoot, comparison))

This is particularly important for cross-platform distributed runs (e.g., a Windows master serialising paths for a Linux worker).

7. ArtifactLifecycleManager.ResolvePathPattern** glob not supported

var wildcardIndex = pathPattern.IndexOfAny(['*', '?']);
// Split at first wildcard only

For a pattern like **/bin/*.dll (common in .NET projects), this splits at index 0, takes baseDir = cwd, and searchPattern = Path.GetFileName("**/bin/*.dll") = "*.dll". The result is all .dll files anywhere under cwd, not just those in bin/ subdirectories. The ** directory wildcard is not handled.

While fully implementing glob support from scratch is not trivial, using Microsoft.Extensions.FileSystemGlobbing (already available in the .NET SDK) would handle this correctly:

var matcher = new Microsoft.Extensions.FileSystemGlobbing.Matcher();
matcher.AddInclude(pathPattern);
var result = matcher.Execute(new DirectoryInfoWrapper(new DirectoryInfo(baseDir)));
return result.Files.Select(f => Path.Combine(baseDir, f.Path)).ToList();

Summary

The PR has improved significantly across 12 rounds. The upload OOM, reflection coupling, and cached-failure deduplication issues are properly resolved. The remaining blockers before merge are:

  • [MatrixTarget] unimplemented public API — remove or wire up
  • architecture.md interface description — outdated by 2 methods + 2 background services
  • Redis chunked download truncation — correctness risk, needs chunk count validation

The other issues (duplicate RunIdentifierResolver, WorkerModuleScheduler.ReadyModules, path converter case sensitivity, glob handling) are improvements worth addressing but aren't blocking.

Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Distributed Workers Mode — Round 12 Code Review

Reviewing against all previous rounds to track resolution status.


Resolved Since Last Review

Redis upload OOM — FIXED
RedisDistributedArtifactStore.UploadAsync now streams correctly using ReadFullBufferAsync and makes the single-vs-chunked decision within the streaming loop, without ever buffering the entire artifact in memory.

Reflection-based result application — FIXED
IModule now exposes TrySetDistributedResult(IModuleResult result) and ModuleCompletionSourceApplicator.TryApply calls it cleanly. No private reflection.

GetModule<T>() cross-process hang — Resolved by inline dependency results
DependencyResults in ModuleAssignment carries serialized results for all direct [DependsOn<T>] dependencies. Both master and worker apply these before execution, so GetModule<T>() works correctly across process boundaries.

ArtifactLifecycleManager cached-failure in Lazy<Task> — FIXED
Failed entries are removed from _completedRestores in the catch block, allowing retry.


Still Open

1. [MatrixTarget] is dead public API — ships with no effect

MatrixModuleExpander.ScanForExpansions is not called anywhere in DistributedModuleExecutor. The [MatrixTarget] attribute is public, documented, and mentioned in the PR summary, but silently has no effect — modules decorated with it run once, not N times.

Raised in the previous round and remains unaddressed. Shipping public attributes that appear to control behaviour but don't is misleading and hard to un-ship once users adopt it. Either wire up the expansion before this merges, or remove the attribute, expander, and MatrixModuleInstance from this PR entirely.

2. architecture.md describes the wrong interface

The docs still describe an IDistributedCoordinator with 9 methods across four concerns including SendHeartbeatAsync, BroadcastCancellationAsync, and IsCancellationRequestedAsync. The actual interface has 7 methods; those three don't exist.

The doc also says:

"Two background services start: WorkerHeartbeatService (periodic heartbeats) and WorkerCancellationMonitor (polls for cancellation)."

Neither of these services exists in the current codebase. This creates a misleading contract description for anyone implementing a custom coordinator.

3. Redis chunked download — silent truncation on chunk TTL expiry

RedisDistributedArtifactStore.DownloadAsync reconstructs chunked artifacts by iterating until a chunk key is missing:

while (true)
{
    var chunk = await _database.StringGetAsync(chunkKey);
    if (chunk.IsNull) { break; }  // assumed to mean "no more chunks"
    ...
}

If any chunk key expires before all others (Redis eviction under memory pressure does not guarantee uniform TTL enforcement), the download silently succeeds with truncated data — the artifact is corrupt but no exception is thrown.

Fix: store the chunk count in the artifact metadata during upload (add a ChunkCount field to ArtifactReference or the metadata JSON). On download, validate that chunks read equals the expected count and throw if they differ.


New Findings

4. WorkerModuleScheduler.ReadyModules throws NotSupportedException

public ChannelReader<ModuleState> ReadyModules =>
    throw new NotSupportedException("Worker does not use the ready-modules channel.");

If any framework code (hooks, diagnostics) enumerates IModuleScheduler.ReadyModules without knowing the concrete type, this throws unexpectedly. The null-object pattern is safer:

private static readonly ChannelReader<ModuleState> _neverCompleting =
    Channel.CreateUnbounded<ModuleState>().Reader;

public ChannelReader<ModuleState> ReadyModules => _neverCompleting;

A never-completing reader blocks WaitToReadAsync without yielding items, and ReadAllAsync never iterates — no caller breaks.

5. RunIdentifierResolver duplicated across three packages

Three near-identical implementations read the same CI environment variables:

  • src/ModularPipelines.Distributed.Redis/Configuration/RunIdentifierResolver.cs
  • src/ModularPipelines.Distributed.Artifacts.S3/Configuration/RunIdentifierResolver.cs
  • src/ModularPipelines.Distributed.Discovery.Redis/RunIdentifierResolver.cs

Any bug fix or new CI system needs updating in three places. Move to the core ModularPipelines.Distributed package.

6. PortableFilePathJsonConverter — case-insensitive path comparison on case-sensitive file systems

if (normalizedAbsolute.StartsWith(normalizedRoot, StringComparison.OrdinalIgnoreCase))

Linux file systems are case-sensitive. Using OrdinalIgnoreCase can produce a malformed relative path if the git root casing doesn't exactly match the file path casing. This is particularly relevant for cross-platform runs (Windows master → Linux worker). Use StringComparison.Ordinal on Linux, OrdinalIgnoreCase on Windows/macOS.

7. ArtifactLifecycleManager.ResolvePathPattern** glob not supported

For a pattern like **/bin/*.dll, the code splits at index 0, uses cwd as base directory, and uses *.dll as the search pattern — matching all .dll files anywhere, not just those in bin/. The ** directory wildcard is silently ignored.

Microsoft.Extensions.FileSystemGlobbing (available in the .NET SDK) handles this correctly and would be a drop-in replacement for the manual glob logic.


Summary

The critical issues from previous rounds (upload OOM, reflection coupling, cached-failure deduplication) are properly resolved. The remaining blockers before merge:

  • [MatrixTarget] unimplemented public API — remove or wire up
  • architecture.md interface description — outdated (wrong method count, non-existent background services)
  • Redis chunked download truncation — correctness risk; chunk count must be validated

The other items (RunIdentifierResolver duplication, WorkerModuleScheduler.ReadyModules, path converter case sensitivity, glob handling) are improvements worth addressing but not strictly blocking.

Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review — Round 4

This round reviews the current state of the feat: Distributed workers mode branch after three prior review rounds. The SignalR backend is new since last round; several previously raised issues remain unaddressed.


Previously Raised Issues — Status

Issue Prior Rounds Status
Reflection-based CompletionSource access Round 1 ✅ Fixed — TrySetDistributedResult interface method
BuildServiceProvider() in ConfigureServices Round 1 ✅ Fixed — descriptor scanning
CancellationToken.None in worker loop Round 1 ✅ Fixed — ApplicationStopping token threaded through
S3 MemoryStream OOM risk Round 1 ✅ Fixed — temp file with DeleteOnClose
Sync-over-async in factory init Round 3 ✅ Fixed — DeferredCoordinator wrapper
Redis LRANGE/LREM race Round 1 ✅ Fixed — atomic Lua scan-and-claim
[MatrixTarget] public but non-functional All rounds ❌ Unchanged — TODO comment still present
ResultTask cancellation-to-fault promotion Round 3 ❌ Unchanged
Three divergent RunIdentifierResolver impls All rounds ❌ Unchanged
WorkerModuleScheduler.ReadyModules throws Round 1 ❌ Unchanged
S3._ttlSeconds stored but never applied Prior rounds ❌ Unchanged
ResolveDistributedOptions misses env-var config Round 1 ❌ Partially addressed

Critical Issues

1. [MatrixTarget] ships as public API with docs but silently does nothing

MatrixModuleExpander, [MatrixTargetAttribute], [MatrixExpansionAttribute], and the full docs under docs/docs/distributed/ are all committed. The TODO comment in DistributedModuleExecutor is unchanged:

// TODO(matrix): MatrixModuleExpander.ScanForExpansions not yet connected.
// Modules with [MatrixTarget] will run once, not N times.

A user who reads the docs and uses [MatrixTarget] gets silent wrong behavior — the module runs once instead of N times with no warning, no exception, no log message. This is worse than not shipping the feature at all.

Required action: Either wire up MatrixModuleExpander.ScanForExpansions in DistributedModuleExecutor before the scheduler loop, or remove [MatrixTarget], MatrixModuleExpander, and the matrix documentation from this PR. Shipping undocumented stubs as public API with live documentation is a correctness trap.


2. ResultTask cancellation-to-fault promotion (unresolved from Round 3)

In Module.cs:

Task<IModuleResult> IModule.ResultTask => CompletionSource.Task.ContinueWith(
    static t => (IModuleResult)t.Result, TaskContinuationOptions.ExecuteSynchronously);

When CompletionSource is cancelled, t.Result throws TaskCanceledException, which the continuation catches and surfaces as a faulted (not cancelled) task. This means task.IsCanceled is false when it should be true. In the distributed path, CollectDistributedResultAsync uses cts.IsCancellationRequested to distinguish timeout from pipeline cancellation — the incorrect task state makes these checks unreliable.

Fix:

Task<IModuleResult> IModule.ResultTask => CompletionSource.Task.ContinueWith(
    static t => t.IsCanceled
        ? Task.FromCanceled<IModuleResult>(new CancellationToken(true))
        : t.IsFaulted
            ? Task.FromException<IModuleResult>(t.Exception!.InnerException ?? t.Exception)
            : Task.FromResult<IModuleResult>(t.Result),
    TaskContinuationOptions.ExecuteSynchronously).Unwrap();

Or use an async/await wrapper which handles this correctly by default.


3. SignalR OnDisconnectedAsync does not re-enqueue in-flight work — worker crash is a hang

In DistributedPipelineHub.cs:

public override Task OnDisconnectedAsync(Exception? exception)
{
    if (_masterState.Workers.TryRemove(Context.ConnectionId, out var workerState))
    {
        _logger.LogWarning("Worker {Index} disconnected...");
        // TODO: Re-enqueue in-flight work for the disconnected worker
    }
    return Task.CompletedTask;
}

If a worker crashes after receiving an assignment but before publishing a result, CollectDistributedResultAsync on the master blocks until ModuleResultTimeoutSeconds expires, then the whole pipeline fails with a timeout. The TODO acknowledges this but it needs to be resolved before the SignalR backend is usable in production.

The fix requires tracking the current assignment per worker (keyed by ConnectionId) and re-enqueuing to _masterState.PendingWork on disconnect.


4. cloudflared tunnel URL regex misses multi-segment subdomains

In CloudflaredTunnel.cs:

[GeneratedRegex(@"https://[a-zA-Z0-9\-]+\.trycloudflare\.com")]

Cloudflare's free tunnel URLs typically have multi-segment subdomains like https://word-word-word-word.trycloudflare.com. The pattern [a-zA-Z0-9\-]+ does not match dots, so multi-segment hostnames never match. The URL task never resolves, the timeout fires, and the master fails with a TimeoutException that looks unrelated to the regex.

Fix:

[GeneratedRegex(@"https://[a-zA-Z0-9][a-zA-Z0-9\-\.]*\.trycloudflare\.com")]

This is a functional defect that will prevent the SignalR backend from working in most real cases.


Significant Issues

5. SignalRWorkerCoordinator.DequeueModuleAsync floods master with redundant RequestWork calls

The worker loop calls DequeueModuleAsync in a while loop, and each call sends a RequestWork message to the master. If the master has no work, it queues the request — but the next loop iteration immediately sends another RequestWork before the previous one is resolved. A worker could accumulate dozens of pending requests in the master's queue while idle.

The worker should use a persistent subscription (listen on a long-lived channel) rather than polling via request/response. A single RequestWork call followed by indefinite waiting on the channel reader matches SignalR's push semantics much better.

6. Three divergent RunIdentifierResolver implementations — coordination failure on fallback

Three separate implementations exist with different fallback strategies:

  • ModularPipelines.Distributed.Redis → GUID fallback
  • ModularPipelines.Distributed.Artifacts.S3 → GUID fallback (identical copy)
  • ModularPipelines.Distributed.Discovery.Redis → CWD SHA256 hash fallback

If Discovery.Redis and Distributed.Redis are used together and neither CI env vars nor git SHA resolves, the Redis coordinator uses a random GUID (different per process) while the Discovery coordinator uses a CWD hash (same per process). The run identifier is the Redis key namespace for all coordination — a mismatch means master and workers never communicate.

This was raised in all previous rounds. Move to a single shared RunIdentifierResolver in ModularPipelines.Distributed core.

7. ResolveDistributedOptions misses environment-variable-bound configuration

The scanner only handles IOptions<T> direct instances and IConfigureOptions<T> with ImplementationInstance. It misses:

  • builder.Configuration.Bind("Distributed", opts)
  • IConfigureOptions<T> with ImplementationFactory
  • Standard .NET env vars like MODULARPIPELINES__DISTRIBUTED__INSTANCEINDEX

A user who uses standard .NET configuration binding will have every instance default to InstanceIndex = 0 (master), causing all workers to also try to act as master. The RoleDetector reads MODULAR_PIPELINES_INSTANCE directly from Environment.GetEnvironmentVariable as a workaround, but this is not the standard options pattern and isn't obvious to users.


Moderate Issues

8. WorkerModuleScheduler.ReadyModules throws NotSupportedException

Any framework code (hooks, diagnostics, progress monitoring) that accesses IModuleScheduler.ReadyModules without being aware of worker-mode context gets an unhandled exception. Return a completed channel reader instead:

private static readonly ChannelReader<ModuleState> _empty = CreateCompletedReader();

private static ChannelReader<ModuleState> CreateCompletedReader()
{
    var ch = Channel.CreateUnbounded<ModuleState>();
    ch.Writer.Complete();
    return ch.Reader;
}

public ChannelReader<ModuleState> ReadyModules => _empty;

9. Dual-container IHubContext<T> ownership in SignalR backend

MasterServerHost starts a WebApplication with its own DI container. SignalRMasterCoordinator is registered in the pipeline's DI container but holds IHubContext<T> resolved from the WebApp's container. These are two separate containers. SignalRMasterState is shared via reference, which works but makes ownership non-obvious.

If the WebApplication is disposed before the coordinator finishes (e.g., on pipeline cancellation), IHubContext<T> becomes invalid mid-flight. Making the coordinator an IHostedService would tie its lifetime to the main host and avoid this ambiguity.

10. S3._ttlSeconds stored but never applied (unresolved from prior rounds)

The _ttlSeconds field is assigned in the constructor but not passed to any S3 API call. S3 does not support per-object TTL in the standard API — expiration requires bucket lifecycle rules. The field creates a false expectation that artifacts auto-expire. Either document that TTL is managed via S3 bucket lifecycle policy and remove the field, or implement it via object tagging + a lifecycle rule.

11. ResolvePathPattern is not a real glob engine

The implementation uses Path.GetFileName to extract the search pattern from a glob like **/bin/Release/*.nupkg, which discards intermediate path segments. Brace expansion (*.{zip,tar.gz}) is not supported at all. Use Microsoft.Extensions.FileSystemGlobbing.Matcher instead:

var matcher = new Matcher();
matcher.AddInclude(pathPattern);
var result = matcher.Execute(new DirectoryInfoWrapper(new DirectoryInfo(baseDirectory)));
return result.Files.Select(f => Path.Combine(baseDirectory, f.Path)).ToList();

Summary

The core distributed execution mechanics (Redis coordination, dependency serialization, fault propagation) are much improved from prior rounds. The main blockers before this can merge are:

  1. [MatrixTarget] — either wire it up or remove it + its docs from this PR
  2. ResultTask fault promotion — affects cancellation semantics in both distributed and non-distributed paths
  3. SignalR OnDisconnectedAsync — worker crash causes master hang; needs in-flight re-enqueue
  4. Cloudflared regex — functional defect preventing SignalR tunnel from working on real URLs

The RunIdentifierResolver duplication and ResolveDistributedOptions config-binding gap are also worth resolving before shipping given the coordination failure modes they can cause.

…serialization

CreateSlimBuilder in .NET 10 disables reflection-based JSON serialization,
which prevents IReadOnlySet<string> deserialization in SignalR hub method
binding (RegisterWorker). CreateBuilder restores full JSON support.
Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review — Round 5

Reviewed against all prior rounds. The most recent review (Round 4) was submitted ~6 hours ago. Code state appears unchanged — all Round 4 TODO comments are still present. One previously-called-out issue (RequestWork flooding) appears to have actually been fixed in the design; the others remain open.


Resolution Status vs Round 4

Issue Round 4 Status Current Status
Reflection-based CompletionSource ✅ Fixed ✅ Still fixed
[MatrixTarget] dead public API ❌ Open ❌ Unchanged — TODO in 2 files
ResultTask cancellation-to-fault ❌ Open ❌ Unchanged
OnDisconnectedAsync re-enqueue ❌ Open ❌ TODO still present
Cloudflared regex ❌ Open ❌ Unchanged
WorkerModuleScheduler.ReadyModules throws ❌ Open ❌ Unchanged
Three divergent RunIdentifierResolver impls ❌ Open ❌ Unchanged
S3._ttlSeconds stored, never applied ❌ Open ❌ Unchanged
ResolvePathPattern not a real glob engine ❌ Open ❌ Unchanged
RequestWork flooding ❌ Open in Round 4 Actually resolved — see below

Correction from Round 4 — RequestWork is NOT flooding

Round 4 flagged SignalRWorkerCoordinator.DequeueModuleAsync as flooding the master. Looking at the current implementation more carefully, this concern is incorrect:

await _connection.InvokeAsync(HubMethodNames.RequestWork, workerCapabilities, cancellationToken);

if (await _assignmentChannel.Reader.WaitToReadAsync(cancellationToken))
{
    if (_assignmentChannel.Reader.TryRead(out var assignment))
        return assignment;
}

The worker sends one RequestWork, then blocks on WaitToReadAsync until an assignment arrives. The push model in SignalRMasterCoordinator.EnqueueModuleAsync calls TryPushToIdleWorker, which sends ReceiveAssignment directly to idle workers — the channel write unblocks WaitToReadAsync regardless of when the assignment arrives relative to the call. The worker never sends a second RequestWork until the first assignment is processed. This is correct behavior and is not flooding.


Critical Issues (still open)

1. [MatrixTarget] ships as documented public API with no effect

The TODO comment is unchanged in two files:

// DistributedModuleExecutor.cs
// TODO(matrix): MatrixModuleExpander.ScanForExpansions not yet connected.
// Modules with [MatrixTarget] will run once, not N times.

// DistributedWorkPublisher.cs
MatrixTarget: null, // TODO(matrix): Set by MatrixModuleExpander when wired up

The [MatrixTarget] attribute, MatrixModuleExpander, and the full docs/docs/distributed/ docs are all committed. A user following the documentation will get silent wrong behavior (module runs once, not N times) with no exception, no log warning, and no indication that the feature is non-functional.

Required action before merge: Either wire up MatrixModuleExpander.ScanForExpansions before the scheduler loop in DistributedModuleExecutor, or remove [MatrixTargetAttribute], MatrixModuleExpander, and the matrix documentation from this PR entirely.


2. ResultTask cancellation-to-fault promotion

Unchanged from all prior rounds:

// Module.cs
Task<IModuleResult> IModule.ResultTask => CompletionSource.Task.ContinueWith(
    static t => (IModuleResult)t.Result, TaskContinuationOptions.ExecuteSynchronously);

When CompletionSource is cancelled, t.Result throws TaskCanceledException, and the continuation wraps it as a faulted task — not cancelled. task.IsCanceled returns false when it should be true. The distributed result collector checks cts.IsCancellationRequested to distinguish cancellation from timeout — this mismatch makes those checks unreliable.

Fix:

Task<IModuleResult> IModule.ResultTask => CompletionSource.Task.ContinueWith(
    static t =>
    {
        if (t.IsCanceled) return Task.FromCanceled<IModuleResult>(new CancellationToken(true));
        if (t.IsFaulted) return Task.FromException<IModuleResult>(t.Exception!.InnerException ?? t.Exception);
        return Task.FromResult<IModuleResult>(t.Result);
    },
    TaskContinuationOptions.ExecuteSynchronously).Unwrap();

3. OnDisconnectedAsync — worker crash causes master hang (SignalR)

Unchanged:

public override Task OnDisconnectedAsync(Exception? exception)
{
    if (_masterState.Workers.TryRemove(Context.ConnectionId, out var workerState))
    {
        _logger.LogWarning("Worker {Index} disconnected (connection {ConnectionId})", ...);
        // TODO: Re-enqueue in-flight work for the disconnected worker
    }
    return Task.CompletedTask;
}

If a worker crashes after receiving an assignment but before calling PublishResult, CollectDistributedResultAsync on the master blocks until the timeout expires and then fails the entire pipeline. For the SignalR backend to be production-usable, the assignment in-flight on disconnect must be re-enqueued to _masterState.PendingAssignments. Track the current assignment per ConnectionId in WorkerState and re-enqueue on disconnect.


4. Cloudflared regex does not match typical tunnel URLs

Unchanged:

[GeneratedRegex(@"https://[a-zA-Z0-9\-]+\.trycloudflare\.com")]
private static partial Regex TunnelUrlRegex();

Cloudflare free tunnel hostnames use multiple hyphen-separated words: clever-tooth-piano-mango.trycloudflare.com. Since [a-zA-Z0-9\-]+ includes hyphens, this regex does match single-level subdomain URLs like clever-tooth-piano-mango.trycloudflare.com. However, Cloudflare occasionally generates URLs with numeric segments or underscores that may not match. More importantly, the regex does not match URLs with port numbers (e.g., https://abc.trycloudflare.com:443) and is fragile if the output format changes slightly.

The safer pattern anchors to the full URL more precisely:

[GeneratedRegex(@"https://[\w\-]+(\.[\w\-]+)*\.trycloudflare\.com(?::\d+)?/?")]

This issue is lower severity than previously flagged — typical URLs should match — but the regex is still brittle.


Significant Issues (still open)

5. Three divergent RunIdentifierResolver implementations

Three separate implementations exist with different fallback strategies:

Package Fallback when no CI env var / git SHA
Distributed.Redis Random GUID (different per process)
Distributed.Artifacts.S3 Random GUID (identical copy)
Distributed.Discovery.Redis SHA256 of Environment.CurrentDirectory (same per process)

If a user combines Distributed.Redis (coordination) with Distributed.Discovery.Redis (master discovery), and neither CI env vars nor git resolve, the run IDs differ between packages — master and workers never communicate.

The fix is a single RunIdentifierResolver in ModularPipelines.Distributed core that all packages call. This has been raised in every prior round and is a real coordination failure scenario for local/non-CI runs.


6. WorkerModuleScheduler.ReadyModules throws NotSupportedException

Unchanged — any framework code (hooks, diagnostics, progress tracking) that accesses IModuleScheduler.ReadyModules without knowing about worker mode gets an unhandled exception. Return a completed ChannelReader<ModuleState> instead:

public ChannelReader<ModuleState> ReadyModules
{
    get
    {
        var ch = Channel.CreateUnbounded<ModuleState>();
        ch.Writer.Complete();
        return ch.Reader;
    }
}

Moderate Issues (still open)

7. S3._ttlSeconds stored but never applied

private readonly int _ttlSeconds;  // constructor-assigned

_ttlSeconds is not used in PutObjectRequest or anywhere else. S3 does not support per-object TTL — expiration requires bucket lifecycle rules. Either document that TTL is managed via bucket lifecycle configuration (and remove the field and constructor parameter), or implement TTL via object tagging + documented lifecycle rule convention. The current state creates a false expectation.

8. ArtifactLifecycleManager.ResolvePathPattern is not a real glob engine

The current implementation:

var wildcardIndex = pathPattern.IndexOfAny(['*', '?']);
var baseDir = Path.GetDirectoryName(pathPattern[..wildcardIndex]);
// then: Directory.GetFiles(baseDir, Path.GetFileName(pathPattern))

For a pattern like **/bin/Release/*.nupkg:

  • wildcardIndex = 0 (the ** starts at index 0)
  • baseDir = GetDirectoryName("") → current directory
  • Path.GetFileName("**/bin/Release/*.nupkg")"*.nupkg"
  • Result: all .nupkg files in the current directory only — the bin/Release/ path constraint is silently discarded

Use Microsoft.Extensions.FileSystemGlobbing.Matcher (already available in the .NET SDK):

var matcher = new Matcher();
matcher.AddInclude(pathPattern);
var result = matcher.Execute(new DirectoryInfoWrapper(new DirectoryInfo(baseDirectory)));
return result.Files.Select(f => Path.Combine(baseDirectory, f.Path)).ToList();

Summary

The core distributed mechanics are solid and the architecture is well-designed. The blockers before this can merge remain:

  1. [MatrixTarget] — wire it up or remove it + its docs
  2. ResultTask fault promotion — 4-line fix, affects all cancellation paths
  3. SignalR OnDisconnectedAsync — re-enqueue in-flight work on worker crash
  4. RunIdentifierResolver divergence — coordination failure on fallback for local runs

The ResolvePathPattern glob issue is also worth fixing before shipping since users will hit it as soon as they use glob patterns for artifact paths.

…tocol

Two fixes:
1. Set RunIdentifier to GITHUB_RUN_ID so overlapping CI runs don't
   share stale master URLs in Redis.
2. Add matching AddJsonProtocol config to the client HubConnectionBuilder
   (PascalCase + case-insensitive) to match the server-side settings.
@claude
Copy link

claude bot commented Feb 24, 2026

Code Review: Distributed Workers Mode

This is an impressive, ambitious feature that adds a fully-fledged distributed execution system to ModularPipelines. The architecture is well-thought-out, the documentation is thorough, and the test coverage for a feature this size is commendable. That said, there are several design issues and correctness concerns worth addressing before merging.


1. IDistributedCoordinator Mixes Master and Worker Concerns (Architectural Issue)

The IDistributedCoordinator interface is a single abstraction used by both master and worker roles, but several methods are only valid for one role. The SignalRWorkerCoordinator throws NotSupportedException for EnqueueModuleAsync, WaitForResultAsync, and GetRegisteredWorkersAsync. The SignalRMasterCoordinator would throw similarly if workers tried to call coordinator methods that belong to the master.

This violates the Interface Segregation Principle. When a class implementing an interface must throw NotSupportedException for half its methods, the interface is doing too much. A worker will never enqueue work or wait for results; a worker-facing coordinator should not expose those methods.

Suggested approach: Split into two interfaces — IMasterCoordinator and IWorkerCoordinator — where IWorkerCoordinator only exposes DequeueModuleAsync, PublishResultAsync, and RegisterWorkerAsync. The Redis implementation can implement both (since it is a shared backend), while SignalR can have genuinely separate master and worker implementations with no NotSupportedException needed. The DistributedModuleExecutor and WorkerModuleExecutor would each depend only on the interface they actually use.


2. ResolveDistributedOptions Crawls the IServiceCollection at Build Time (Fragile Design)

PipelineBuilder.ResolveDistributedOptions manually iterates over the IServiceCollection, casting ImplementationInstance to IConfigureOptions<DistributedOptions> and invoking Configure by hand. This reimplements part of the Options framework and will silently miss configurations registered through lambdas that are not already-evaluated singleton instances. Any Configure<T> call that uses a factory delegate or that arrives through IConfiguration.Bind will be ignored.

The reason this workaround exists — to avoid calling BuildServiceProvider() in the middle of service registration — is legitimate. But the fragility trades one problem for another.

Better approach: Accept a DistributedOptions parameter directly on AddDistributedMode and store it as a concrete instance on PipelineBuilder, bypassing the options pipeline entirely for this one decision. Or read from a well-known environment variable (MODULAR_PIPELINES_INSTANCE) earlier in the startup chain so the role is known before ConfigureServices runs. The RoleDetector already supports reading from MODULAR_PIPELINES_INSTANCE, so this could replace the complicated service-collection crawl.


3. DeferredCoordinator / DeferredArtifactStore Live in PipelineBuilder (Separation of Concerns)

Two non-trivial distributed infrastructure classes (DeferredCoordinator and DeferredArtifactStore) are private nested classes inside PipelineBuilder. PipelineBuilder is the top-level builder type; it shouldn't own runtime coordination infrastructure. These deferred wrappers are meaningful distributed components with their own correctness requirements (the double-checked locking pattern around _inner), and placing them here makes them invisible to tests and hard to find when debugging distributed connectivity issues.

Suggestion: Move DeferredCoordinator and DeferredArtifactStore to the ModularPipelines.Distributed namespace alongside the other coordinator infrastructure. They are proper implementations of IDistributedCoordinator and IDistributedArtifactStore and belong there.


4. Module Lookup by FullName Is O(n) Per Assignment (Performance)

Both WorkerModuleExecutor and DistributedModuleExecutor find the module instance using:

var module = modules.FirstOrDefault(m => m.GetType().FullName == assignment.ModuleTypeName);

This is a linear scan executed for every module assignment. In a pipeline with many modules this adds up. The same scan is repeated for every dependency result in ApplyDependencyResults. The modules list is also scanned inside ApplyDependencyResults for each serialized dependency.

Suggestion: Build a Dictionary<string, IModule> from modules once at the start of ExecuteAsync in both executors. The cost drops from O(n) per lookup to O(1). Given that distributed pipelines are likely to be larger (otherwise why distribute?), this matters.


5. ApplyDependencyResults Is Duplicated Between Master and Worker Executors

The ApplyDependencyResults method — including the GZip decompression prefix detection logic — is copy-pasted identically (with trivial namespace differences) in both DistributedModuleExecutor and WorkerModuleExecutor. Similarly, PublishResolutionFailureAsync is duplicated.

This creates a maintenance hazard. If the compression strategy changes, both copies must be updated.

Suggestion: Extract a shared DependencyResultApplicator static class (or move it to DistributedWorkPublisher which already owns the compression logic) so the decompression and application path exists in one place.


6. MatrixTarget Feature Is Incomplete but Publicly Documented

The [MatrixTarget] attribute is documented in both the public docs (docs/docs/distributed/capabilities.md) and advertised in the PR summary as a feature. However, the code contains two prominent TODO(matrix) comments stating it is not yet wired into DistributedModuleExecutor:

// TODO(matrix): MatrixModuleExpander.ScanForExpansions not yet connected.
// Modules with [MatrixTarget] will run once, not N times.

The MatrixModuleInstance class similarly has a TODO(matrix) note. Publishing documentation for behavior that silently does not work is a user experience issue: users who read the docs and add [MatrixTarget] will observe their module running only once with no warning.

Suggestion: Either wire up the expander before merging, or remove the [MatrixTarget] attribute and its documentation from this PR and merge it separately when the implementation is complete. If keeping it as an experimental preview, at minimum add an [Obsolete("MatrixTarget expansion is not yet implemented")] marker or throw a NotImplementedException in the DistributedModuleExecutor path when a matrix module is encountered, so users get a clear error instead of silent single execution.


7. Redis Lua Script Scans the Entire Work Queue (Scalability)

The ScanAndClaimScript Lua script in RedisDistributedCoordinator does an LRANGE of the entire work queue (0, -1) and iterates all items to find a capability match. This is O(n) on the queue length executed inside a Redis script (which blocks the Redis event loop for its duration).

For small pipelines this is fine. But for pipelines with many queued modules, this will block Redis for all clients while scanning. Redis scripts run atomically on a single thread.

Longer-term improvement: Use separate queues per capability set (e.g., work:queue:linux, work:queue:windows, work:queue:any) so a worker pops from queues matching its own capabilities using RPOPLPUSH or LMOVE, avoiding the full scan. This is a known pattern for Redis work queues with routing. It does require more coordination at enqueue time but is far more scalable.


8. SignalR Transport Has External Dependency on Cloudflared

The SignalRDistributedCoordinator requires cloudflared to be installed on the master runner to expose the SignalR hub publicly. The GitHub Actions workflow installs it by downloading a binary directly from GitHub releases over curl:

curl -fsSL https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64 -o /usr/local/bin/cloudflared

Using latest without pinning a version means the CI workflow will silently pick up new cloudflared releases, which could break the tunnel format or startup behavior. The CloudflaredTunnel class parses a URL from stderr with a regex — if cloudflared changes its output format in a future release, the tunnel URL will not be detected and startup will time out.

Suggestion: Pin the cloudflared version in the workflow (e.g., a specific tag), and add it to Directory.Packages.props or a separate version variable so it's visible and easy to update intentionally.


9. DeferredCoordinator Has a Double-Checked Locking Race

The DeferredCoordinator.GetAsync method uses double-checked locking:

if (_inner is not null) return _inner;
await _lock.WaitAsync(ct);
try { return _inner ??= await factory.CreateAsync(ct); }
finally { _lock.Release(); }

The outer if (_inner is not null) check reads _inner without the lock. While _inner is a reference type (reads/writes to reference fields are atomic on .NET), the compiler or JIT could reorder the read before initialization is fully visible to another thread without a memory barrier. The correct pattern for async double-checked locking uses volatile on the field or reads through Volatile.Read.

Fix: Declare private volatile IDistributedCoordinator? _inner; (same change in DeferredArtifactStore) to ensure the visibility guarantee.


Minor Issue: DistributedOptions.HeartbeatIntervalSeconds / HeartbeatTimeoutSeconds Documented but Not Used

The configuration docs describe HeartbeatIntervalSeconds and HeartbeatTimeoutSeconds in DistributedOptions, but reviewing the PR diff, the WorkerHeartbeatService and WorkerHealthMonitor mentioned in the architecture doc do not appear in the diff — it looks like they were described in the PR summary and documentation but may not yet be implemented in this PR. If that is the case, the configuration options and documentation should reflect that these are placeholders, similar to the matrix TODO.


What Works Well

  • The core IDistributedCoordinator abstraction is clean and extensible; multiple backends can be plugged in.
  • The Redis implementation's check-subscribe-recheck pattern in WaitForResultAsync correctly handles the race condition between checking for an existing result and subscribing.
  • The ArtifactLifecycleManager deduplication via ConcurrentDictionary<string, Lazy<Task>> is a nice pattern that prevents redundant downloads under concurrent access.
  • The run identifier isolation strategy (using the commit SHA as a Redis key namespace) elegantly prevents concurrent pipeline runs from colliding without any explicit cleanup.
  • The DistributedWorkPublisher compression threshold and GZip handling for large dependency results show good operational awareness of Redis payload limits.
  • Documentation is thorough and the architecture doc is genuinely helpful.

The feature is architecturally sound. Addressing the incomplete [MatrixTarget] documentation alignment, the interface segregation concern, and the service-collection crawl fragility would make this ready for production use.

🤖 Generated with Claude Code

…or SignalR serialization

System.Text.Json cannot reliably deserialize IReadOnlySet<string> as a
record constructor parameter in SignalR's argument binding pipeline.
Using the concrete HashSet<string> type eliminates the binding error.
@claude
Copy link

claude bot commented Feb 24, 2026

Code Review: Distributed Workers Mode (Latest Pass)

This PR has evolved substantially across many revision rounds. The core architecture is solid: the pluggable IDistributedCoordinator/IDistributedArtifactStore abstractions, real IHubContext<T> usage in SignalRMasterCoordinator, Lua-based atomic dequeue in Redis, Lazy<Task> deduplication in ArtifactLifecycleManager, proper await _app.StartAsync() readiness, and the comprehensive 86+ test suite are all commendable. Most issues from prior rounds have been resolved.

Below is the status of remaining concerns and new findings.


Still Open from Previous Rounds

1. [MatrixTarget] ships as dead public API

DistributedModuleExecutor.cs line 86–87:

// TODO(matrix): MatrixModuleExpander.ScanForExpansions not yet connected.
// Modules with [MatrixTarget] will run once, not N times.

MatrixModuleExpander.cs remarks also explicitly document it as unwired. Yet [MatrixTarget] is:

  • Public API with XML docs
  • Listed in the PR summary as a feature
  • Documented in the docs site under /distributed/

Shipping a decorated attribute that silently has no effect is a footgun. A user applies [MatrixTarget("linux", "windows")] to a module, deploys, and sees it run once — no error, no warning beyond a debug-level log. The MatrixModuleExpander class exists and is ready; the gap is wiring ScanForExpansions into the executor and multiplying work items accordingly.

Recommendation: Either wire it up in this PR (the expander already does the scan; the remaining work is creating N assignments from the expansion list), or make the attribute internal/remove it from documentation and the PR summary until it is functional.

2. IDistributedCoordinator violates Interface Segregation

SignalRWorkerCoordinator throws NotSupportedException for three of seven interface methods:

  • EnqueueModuleAsync — workers don't enqueue
  • WaitForResultAsync — workers don't wait for results
  • GetRegisteredWorkersAsync — workers don't query registrations

When a class must throw NotSupportedException for half its interface, the interface is covering too many roles. The failure mode is that code accepting IDistributedCoordinator cannot safely call any method without first knowing the role — defeating the abstraction. This also complicates testing and future implementations.

Suggested split:

// Core shared contract
public interface IDistributedCoordinator : IDisposable
{
    Task DequeueModuleAsync(IReadOnlySet<string> capabilities, CancellationToken ct);
    Task PublishResultAsync(SerializedModuleResult result, CancellationToken ct);
    Task RegisterWorkerAsync(WorkerRegistration registration, CancellationToken ct);
    Task SignalCompletionAsync(CancellationToken ct);
}

// Master-only extensions
public interface IMasterDistributedCoordinator : IDistributedCoordinator
{
    Task EnqueueModuleAsync(ModuleAssignment assignment, CancellationToken ct);
    Task<SerializedModuleResult> WaitForResultAsync(string moduleTypeName, CancellationToken ct);
    Task<IReadOnlyList<WorkerRegistration>> GetRegisteredWorkersAsync(CancellationToken ct);
}

DistributedModuleExecutor and DistributedWorkPublisher inject IMasterDistributedCoordinator; WorkerModuleExecutor injects IDistributedCoordinator. Redis can implement both; InMemory (single-process) implements IMasterDistributedCoordinator as it does today.


New Findings

3. SignalRMasterCoordinator.DequeueModuleAsync busy-polls at 50ms

The master's local worker loop calls DequeueModuleAsync, which spins:

while (!cancellationToken.IsCancellationRequested)
{
    // ... scan PendingAssignments ...
    await Task.Delay(50, cancellationToken);
}

When all work is pushed directly to external workers (common case), PendingAssignments is always empty and the master worker thread wakes every 50ms doing nothing. The InMemoryDistributedCoordinator handles this correctly with a SemaphoreSlim — the same signal can be applied here: EnqueueModuleAsync writes to the queue and releases a semaphore; DequeueModuleAsync waits on that semaphore instead of sleeping on a timer. This eliminates unnecessary wake-ups and improves responsiveness when work does land in the local queue.

4. SignalCompletionAsync cancels all pending result waiters before confirming receipt

public async Task SignalCompletionAsync(CancellationToken cancellationToken)
{
    _state.IsCompleted = true;
    foreach (var kvp in _state.ResultWaiters)
        kvp.Value.TrySetCanceled();   // ← before broadcast completes
    await _hubContext.Clients.All.SendAsync(HubMethodNames.SignalCompletion, ...);
}

TrySetCanceled is a no-op on already-resolved waiters, so modules that have already published results are unaffected. However, any module that was assigned to a worker but whose PublishResult hub call is in-flight at the moment of shutdown will have its waiter cancelled before the result arrives. The master's CollectDistributedResultAsync will see OperationCanceledException and treat this as a pipeline failure rather than a success. A short drain window (e.g., allow any already-received in-flight results to settle before cancelling) would make shutdown more reliable. At a minimum, this race should be documented.

5. CloudflaredTunnel URL regex hardcodes the trycloudflare domain

[GeneratedRegex(@"https://[a-zA-Z0-9\-]+\.trycloudflare\.com")]
private static partial Regex TunnelUrlRegex();

If cloudflared changes its free-tunnel subdomain format, or if a user configures a named tunnel with a custom domain, PublicUrl will remain null and the workers will never connect — with no error. A more general URL regex (e.g., https://[^\s]+) applied only after the known preamble string in the cloudflared output would be more resilient, as would logging a clear warning when the expected URL is not detected within the timeout.

6. GitHub Actions now always runs 5 matrix runners per push

The matrix was extended from 3 OS runners to 5 (adding instance: 3 and instance: 4, both ubuntu-latest). Every PR and every push to main now consumes 5× the runner time. This is by design for testing the distributed mode, but it meaningfully increases CI costs and queuing time. Consider gating the extra runners on a condition (e.g., paths affecting Distributed packages, or a manual workflow input) to avoid paying the overhead on every non-distributed change.


Summary

# Issue Status
1 [MatrixTarget] dead public API Open (TODO in code, raised prior round)
2 IDistributedCoordinator ISP violation Open (raised prior round)
3 50ms busy-poll in SignalR master dequeue New
4 Result waiter cancellation race on shutdown New
5 CloudflaredTunnel regex hardcodes trycloudflare.com New
6 5 CI runners on every push New (informational)

Issues 1 and 2 have been raised in prior rounds and remain unaddressed. Issue 1 in particular is a correctness/API contract problem that should be resolved before merge. The others are design improvements worth addressing.

Start a real SignalR server and client to validate RegisterWorker,
PublishResult, and ModuleAssignment serialization. Also resolve
AdvertisedUrl from actual bound port (supports port 0 for tests).
Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Distributed Workers Mode - Code Review

This is an ambitious and well-structured feature. The abstractions are clean, the documentation is thorough, and the test coverage is meaningful. Below are observations across several categories — design concerns, correctness issues, and places where the implementation could be more maintainable.


1. Heartbeat / SendHeartbeat Missing from IDistributedCoordinator

The PR description and architecture docs mention SendHeartbeatAsync and WorkerHeartbeatService / WorkerCancellationMonitor, but the actual IDistributedCoordinator interface only has:

Task EnqueueModuleAsync(...)
Task<ModuleAssignment?> DequeueModuleAsync(...)
Task PublishResultAsync(...)
Task<SerializedModuleResult> WaitForResultAsync(...)
Task RegisterWorkerAsync(...)
Task<IReadOnlyList<WorkerRegistration>> GetRegisteredWorkersAsync(...)
Task SignalCompletionAsync(...)

SendHeartbeatAsync, IsCancellationRequestedAsync, and BroadcastCancellationAsync are documented in docs/docs/distributed/architecture.md but are absent from the interface definition. This means the documented cancellation propagation system and health monitoring via heartbeats are not actually wired up — the interface contract does not match the documentation. If these features exist in a coordinator implementation they are not callable through the abstraction, which defeats the purpose of the interface. Either the interface needs to be updated to include these members, or the documentation needs to be corrected to match what was actually shipped.


2. DeferredCoordinator / DeferredArtifactStore Belong in a Dedicated File, Not Nested in PipelineBuilder

PipelineBuilder.cs now contains two private nested classes (DeferredCoordinator, DeferredArtifactStore) and a 150-line static method ActivateDistributedModeIfConfigured plus ResolveDistributedOptions. This violates single responsibility: PipelineBuilder is now also a distributed-mode activation engine.

The deferred proxy pattern is legitimate, but it should live in its own file under src/ModularPipelines/Distributed/. The activation logic (ActivateDistributedModeIfConfigured + ResolveDistributedOptions) should be extracted to a dedicated class such as DistributedModeActivator. This keeps PipelineBuilder focused on pipeline construction.


3. ResolveDistributedOptions Walks the Service Collection Without BuildServiceProvider() — Fragile

The options resolution in PipelineBuilder iterates IServiceCollection looking for IConfigureOptions<DistributedOptions> descriptors with an ImplementationInstance, then manually calls .Configure() and .PostConfigure() on them:

foreach (var descriptor in services.Where(d =>
    d.ServiceType == typeof(IConfigureOptions<DistributedOptions>) &&
    d.ImplementationInstance is IConfigureOptions<DistributedOptions>))
{
    ((IConfigureOptions<DistributedOptions>)descriptor.ImplementationInstance!).Configure(opts);
}

This approach is brittle. When Configure<TOptions>(Action<TOptions>) is called, it registers an IConfigureOptions<TOptions> using a lambda factory (not an ImplementationInstance), which means ImplementationInstance is null and the descriptor is silently skipped. The approach works in the current code path only because AddDistributedMode wraps the user's Action<DistributedOptions> in a closure passed directly to Configure, which happens to create a concrete ConfigureNamedOptions with a captured action — but this is an implementation detail of Microsoft.Extensions.Options that could change.

A more robust approach is to provide a separate mechanism for the user to signal distributed mode (e.g., a boolean on PipelineBuilder) rather than sniffing the service collection. Alternatively, the distributed-mode bootstrap could be deferred to IHostedService startup, where a proper IOptions<DistributedOptions> is available.


4. ModuleAssignment.RequiredCapabilities is HashSet<string> on the Record — Mutability Leak

public record ModuleAssignment(
    ...
    HashSet<string> RequiredCapabilities,
    ...
);

Using HashSet<string> (a mutable type) on a record undermines value semantics. Records with mutable reference-type properties do not get structural equality on those properties. IReadOnlySet<string> should be used instead, which is what WorkerRegistration.Capabilities is. The same applies to WorkerRegistration.Capabilities which also uses HashSet<string>. Notably, IDistributedCoordinator.DequeueModuleAsync already takes IReadOnlySet<string>, so the gap is in the data types themselves.


5. Redis Lua Script: LRANGE Scans the Entire Queue on Every Dequeue

The ScanAndClaimScript in RedisDistributedCoordinator:

local items = redis.call('LRANGE', KEYS[1], 0, -1)

This loads the entire work queue into Lua memory on every dequeue call. With a small number of short-running modules this is fine, but for a pipeline with many modules (100+) and many workers calling DequeueModuleAsync concurrently, this can become a bottleneck. The script also iterates items in FIFO order and stops at the first match, which means a module at position 0 that no current worker can satisfy will block the scan from finding a matchable item at position 5 without re-examining the rest.

A more scalable approach would be to segregate the queue by capability set (separate lists per capability group), or to accept the O(n) scan with a configurable limit and re-enqueue unmatched items at the tail instead of skipping them.


6. Capability Matching Inconsistency: IReadOnlySet<string> vs HashSet<string> Custom Equality

CapabilityMatcher.CanExecute uses:

return assignment.RequiredCapabilities.All(
    required => workerCapabilities.Contains(required, StringComparer.OrdinalIgnoreCase));

But assignment.RequiredCapabilities is a HashSet<string> constructed with StringComparer.OrdinalIgnoreCase, and workerCapabilities is also built with that comparer. The call .Contains(required, StringComparer.OrdinalIgnoreCase) on an IReadOnlySet<string> is actually the IEnumerable<T>.Contains(T, IEqualityComparer<T>) LINQ overload — it bypasses the set's internal hash structure and falls back to a linear scan. If workerCapabilities is large, this is inefficient.

To correctly leverage the set's O(1) lookup, ensure workerCapabilities is declared as HashSet<string> (with the correct comparer) and call workerCapabilities.Contains(required) directly, or use IReadOnlySet<string> with Contains(T) only — trusting that both sets were constructed with the same comparer.


7. TrySetDistributedResult Cast Can Throw at Runtime

In Module<T>:

bool IModule.TrySetDistributedResult(IModuleResult result)
{
    return CompletionSource.TrySetResult((ModuleResult<T?>)result);
}

If result is not a ModuleResult<T?>, this throws InvalidCastException without any diagnostic message. Given that this is called from deserialization in a distributed execution context (where type mismatches are possible due to version skew or registry gaps), the cast should be guarded:

if (result is not ModuleResult<T?> typed)
{
    return false;
}
return CompletionSource.TrySetResult(typed);

8. ArtifactLifecycleManager.DownloadConsumedArtifactsForPathAsync Silences Failure Cases

When an artifact is not found:

if (artifact is null)
{
    _logger.LogWarning(...);
    return; // silently succeeds
}

If a module has [ConsumesArtifact] and the artifact doesn't exist, execution continues and the module may fail in a confusing way (e.g., FileNotFoundException during execution rather than a clear "required artifact not available" error). This should throw, not log-and-continue, so that the failure is attributed to the missing artifact rather than to arbitrary module code.


9. GzipPrefix String Constant Couples DistributedWorkPublisher and WorkerModuleExecutor

WorkerModuleExecutor.ApplyDependencyResults accesses Master.DistributedWorkPublisher.GzipPrefix and DistributedWorkPublisher.DecompressJson:

if (serializedDep.SerializedJson.StartsWith(Master.DistributedWorkPublisher.GzipPrefix, StringComparison.Ordinal))
{
    var decompressed = Master.DistributedWorkPublisher.DecompressJson(serializedDep.SerializedJson);

This creates a circular concern: the worker knows about the master's compression implementation. The compression/decompression logic (and its sentinel prefix) should be extracted to a shared utility class (e.g., JsonCompressor) in the Serialization namespace, so both master and worker can use it without cross-namespace coupling.


10. [MatrixTarget] Is Documented and Partially Implemented, but Silently Broken

The feature is visible in the public [MatrixTarget] attribute, documented in docs, and has a corresponding MatrixModuleExpander class — but the expander is explicitly not connected:

// TODO(matrix): MatrixModuleExpander.ScanForExpansions not yet connected.
// Modules with [MatrixTarget] will run once, not N times.

Shipping a public attribute that does not work as documented is misleading to users who discover it. Either the attribute should be marked [Obsolete("Matrix expansion is not yet implemented")] or marked [EditorBrowsable(EditorBrowsableState.Never)], or the PR should either complete the feature or not add the public API surface until it is ready.


11. InMemoryDistributedCoordinator Has a Missing SendHeartbeatAsync/BroadcastCancellationAsync Concern

If the interface is eventually updated to include heartbeats/cancellation (see point 1), InMemoryDistributedCoordinator will need to implement them. It is worth noting now that the _completed flag in InMemoryDistributedCoordinator relies on volatile bool, which is fine for the read side, but the write side (_completed = true) followed by _workAvailable.Release() does not guarantee ordering on all architectures without a proper memory barrier. Volatile.Write(ref _completed, true) would be clearer and more explicit about intent.


12. PortableFilePathJsonConverter / PortableFolderPathJsonConverter vs [JsonConverter] on File / Folder

Adding [JsonConverter(typeof(FilePathJsonConverter))] to the File and Folder classes is a global change that affects all serialization of these types — not just distributed module result serialization. If any existing code relies on the default serialization behavior of File or Folder, this is a breaking change. The portable path converters (which are added only to the distributed serializer's JsonSerializerOptions) avoid this problem by being scoped. Consider whether the global [JsonConverter] annotation on File/Folder is intentional and whether it may affect non-distributed serialization paths.


Summary

The architectural foundation is sound: the coordinator abstraction, capability-based routing, artifact lifecycle management, and the deferred initialization pattern are all well-designed. The most actionable issues to address before merging are:

  • Correctness: The IDistributedCoordinator interface is missing members that exist in implementations and are documented (heartbeat, cancellation) — this is a gap between docs and code.
  • Safety: The TrySetDistributedResult cast should be guarded.
  • Correctness: The missing artifact case should throw, not silently continue.
  • API design: [MatrixTarget] should not be a visible public attribute while the expansion logic is not connected.
  • Type design: HashSet<string> on record types should be IReadOnlySet<string>.

Tests validate:
- Capability-based routing across 3 workers
- Result broadcast to non-producing workers (dependency propagation)
- Full workflow: enqueue → assign → execute → publish → collect → complete
macOS CI runners take longer to resolve cloudflared tunnel DNS names.
30 seconds was insufficient — increase to 120s to accommodate slower
DNS resolution across different CI platforms.
Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review: Distributed Workers Mode (Follow-Up)

This is a follow-up to the previous review. Several of the major issues raised earlier have been addressed — well done. Below is the current state.


Resolved from Previous Review

Issue Status
Reflection to access internal CompletionSource ✅ Fixed — replaced with IModule.TrySetDistributedResult
BuildServiceProvider() inside ConfigureServices ✅ Fixed — ResolveDistributedOptions avoids this
WorkerHealthMonitor non-functional stub ✅ Removed from scope
CancellationToken.None throughout worker loop ✅ Fixed — uses _lifetime.ApplicationStopping
InMemoryDistributedCoordinator busy-spin livelock ⚠️ Improved but still has a busy-loop characteristic for capability mismatches
MatrixModuleExpander not wired up ⚠️ Still open — TODO(matrix) comment confirms

Critical: IModule interface polluted with distributed-specific API

IModule.cs now exposes two members that are only relevant in distributed execution:

Task<IModuleResult> ResultTask { get; }
bool TrySetDistributedResult(IModuleResult result);

This is a breaking change for any user who implements IModule directly rather than inheriting from Module<T>. More importantly, TrySetDistributedResult is a distributed-execution implementation detail that has no meaning in the single-process case. Adding it to the core interface violates the Interface Segregation Principle — every module now carries an obligation it never needs unless distributed mode is enabled.

Why this matters: The original reflection hack was (correctly) identified as bad. But the replacement leaks the distributed concern upward into the public API rather than keeping it internal. The better approach is an internal interface:

// Internal to ModularPipelines — never exposed to users
internal interface IDistributedModule
{
    Task<IModuleResult> ResultTask { get; }
    bool TrySetDistributedResult(IModuleResult result);
}

Module<T> can implement IDistributedModule in addition to IModule, and WorkerModuleExecutor can cast to IDistributedModule internally. This keeps the public IModule contract clean and non-breaking.


Significant: ResultTask creates a new Task object on every access

// Module.cs
Task<IModuleResult> IModule.ResultTask => CompletionSource.Task.ContinueWith(
    static t => (IModuleResult)t.Result, TaskContinuationOptions.ExecuteSynchronously);

This is a property, not a field — so every access to ResultTask allocates a new continuation task. If the worker (or anything else) accesses ResultTask more than once, you get multiple independent continuations on the same CompletionSource. This is a silent performance and correctness risk. It should be a lazily-initialized field:

private Task<IModuleResult>? _resultTask;
Task<IModuleResult> IModule.ResultTask =>
    _resultTask ??= CompletionSource.Task.ContinueWith(
        static t => (IModuleResult)t.Result, TaskContinuationOptions.ExecuteSynchronously);

Significant: ResolveDistributedOptions is fragile

PipelineBuilder.ResolveDistributedOptions manually re-implements the options resolution pipeline by walking IServiceCollection descriptors and invoking IConfigureOptions<T> instances directly. This works today because services.Configure<T>(Action<T>) internally registers ConfigureNamedOptions<T> as a singleton instance — but it silently breaks for any IConfigureOptions<DistributedOptions> registered via a factory delegate (ImplementationFactory rather than ImplementationInstance), because those are filtered out:

// Only processes descriptors with ImplementationInstance — factory-registered options are silently skipped
foreach (var descriptor in services.Where(d =>
    d.ServiceType == typeof(IConfigureOptions<DistributedOptions>) &&
    d.ImplementationInstance is IConfigureOptions<DistributedOptions>))

This is essentially reimplementing BuildServiceProvider() by hand to avoid calling BuildServiceProvider(). A more robust alternative is to defer the distributed activation decision to runtime rather than registration time. Since the executor (IModuleExecutor) is resolved from the container when the pipeline runs, you could use a switching executor that reads IOptions<DistributedOptions> at resolution time:

services.AddSingleton<IModuleExecutor>(sp =>
{
    var opts = sp.GetRequiredService<IOptions<DistributedOptions>>().Value;
    if (!opts.Enabled || opts.TotalInstances <= 1)
        return sp.GetRequiredService<DefaultModuleExecutor>();
    return opts.InstanceIndex == 0
        ? sp.GetRequiredService<DistributedModuleExecutor>()
        : sp.GetRequiredService<WorkerModuleExecutor>();
});

This is simpler, relies on the standard DI container, and handles all registration patterns correctly.


Significant: SignalR coordinator violates Interface Segregation

Both SignalRMasterCoordinator and SignalRWorkerCoordinator implement IDistributedCoordinator but throw NotSupportedException for the methods not applicable to their role:

  • SignalRWorkerCoordinator.EnqueueModuleAsyncNotSupportedException
  • SignalRWorkerCoordinator.WaitForResultAsyncNotSupportedException
  • SignalRWorkerCoordinator.GetRegisteredWorkersAsyncNotSupportedException

A coordinator that throws NotSupportedException on half its interface is not actually implementing the interface — it's using it as an umbrella type. The Redis implementation doesn't have this problem because Redis is symmetric. The SignalR design is inherently asymmetric (master hosts the Hub, workers connect as clients), so the interface should reflect that:

public interface IMasterCoordinator { ... }  // EnqueueModuleAsync, WaitForResultAsync, GetRegisteredWorkersAsync
public interface IWorkerCoordinator { ... }  // DequeueModuleAsync, PublishResultAsync, RegisterWorkerAsync

Or alternatively, the IDistributedCoordinator contract should document which methods are called on which side so implementers know what to expect.


Minor: RunIdentifierResolver duplicated across three packages

RunIdentifierResolver is implemented identically (or near-identically) in:

  • src/ModularPipelines.Distributed.Artifacts.S3/Configuration/RunIdentifierResolver.cs
  • src/ModularPipelines.Distributed.Redis/Configuration/RunIdentifierResolver.cs
  • src/ModularPipelines.Distributed.Discovery.Redis/RunIdentifierResolver.cs

The resolution logic (GITHUB_SHA → BUILD_SOURCEVERSION → CI_COMMIT_SHA → git rev-parse → GUID) should live once, likely in the core ModularPipelines.Distributed namespace, and be reused. Three copies will diverge over time.


Minor: cloudflared downloaded from "latest" in CI without integrity check

curl -fsSL https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64 -o /usr/local/bin/cloudflared

Pinning to latest means a new release with breaking changes or a compromised binary can break CI without any indication of what changed. Prefer a pinned version with a checksum verification:

CLOUDFLARED_VERSION: "2025.1.0"
CLOUDFLARED_SHA256: "<expected-sha>"

Observation: No cancellation propagation in IDistributedCoordinator

The previous BroadcastCancellationAsync / IsCancellationRequestedAsync methods have been removed from the coordinator interface. This means there is currently no cross-instance cancellation mechanism — if the master cancels, workers will not be notified and will continue dequeuing until SignalCompletionAsync is received (or the queue runs dry). The Redis implementation previously had this, so it appears to have been dropped intentionally, but it's worth confirming this is an acceptable gap for the initial release.

Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review: Distributed Workers Mode — Round 7

Reviewing against the previous round (Round 6, submitted 2026-02-23T14:15:37Z) to track what has been addressed and what remains open.


Resolved Since Round 6

SignalR coordinator introduced — major architecture improvement

The previous round flagged the Redis dequeue ordering inconsistency (LRANGE+LREM with BRPOP fallback creating FIFO violations and capability-mismatch bouncing). This has been superseded by a new backend: ModularPipelines.Distributed.SignalR replaces the poll-based Redis coordinator with a push model. The master hosts a Kestrel/SignalR server; workers connect as clients and receive assignments via hub callbacks. This eliminates the polling and the LRANGE/BRPOP incoherence in one move. The pub/sub design is correct for this use case.

ModuleCompletionSourceApplicator still uses reflection — but TrySetDistributedResult was also added

IModule now exposes TrySetDistributedResult(IModuleResult), implemented in Module<T> via CompletionSource.TrySetResult — this is the clean public API that was requested in previous rounds. However, ModuleCompletionSourceApplicator still uses reflection (GetProperty("CompletionSource", NonPublic)) and is actively called from both DistributedModuleExecutor.CollectDistributedResultAsync and WorkerModuleExecutor.ApplyDependencyResults. The new TrySetDistributedResult method on IModule is not called anywhere in the codebase. This means the fragile reflection path is still the live code path. The fix is trivial: replace ModuleCompletionSourceApplicator.TryApply(module, result) with module.TrySetDistributedResult(result) at both call sites, then delete ModuleCompletionSourceApplicator.

IArtifactContext registration — FIXED

The singleton registration with string.Empty is gone. IArtifactContext is no longer registered in DI; ArtifactContextExtensions.Artifacts() calls context.GetService<IArtifactContext>() which will throw at runtime, but this is now a known TODO gap rather than a silent misattribution. (See open issue below.)

Redis upload OOM — status improved but not fully resolved

The Redis artifact store still reads the full stream into a MemoryStream before deciding whether to chunk:

using var ms = new MemoryStream();
await data.CopyToAsync(ms, cancellationToken);
var bytes = ms.ToArray();
if (bytes.Length <= _maxSingleUpload)
    // single key
else
    // chunk bytes[] ← already fully in heap

The chunking logic is correct in structure but the stream is fully buffered before any decision is made. For large artifacts this remains an OOM risk. The S3 store correctly streams to a temp file; Redis should do the same. This was flagged in Round 6 and is unchanged.

RunIdentifierResolver duplication — PARTIALLY ADDRESSED

There are still three copies: one in ModularPipelines.Distributed.Redis, one in ModularPipelines.Distributed.Artifacts.S3, and one in ModularPipelines.Distributed.Discovery.Redis. The deduplication was noted as a minor concern in Round 6 and has not been addressed.


New Issues Introduced Since Round 6

1. DistributedPipelineHub.MasterStatenow correctly constructor-injected (not a bug)

The previous version of DistributedPipelineHub had an internal property setter for MasterState with no injection mechanism. The current version correctly takes SignalRMasterState as a constructor parameter. MasterServerHost registers it via builder.Services.AddSingleton(masterState). This is correct — SignalR hub DI works via constructor injection and the singleton registration is the right approach. No issue here.

2. TryAssignPendingWork race in DistributedPipelineHub — concurrent assignment loss

TryAssignPendingWork dequeues from PendingAssignments (a ConcurrentQueue) and calls workerState.TryMarkBusy(). If TryMarkBusy() fails (worker raced to busy), the assignment is re-enqueued. However, this worker then returns without attempting other idle workers. Combined with the loop structure that iterates over a snapshot count (pendingCount), a scenario where multiple workers complete simultaneously can leave assignments in the queue longer than necessary. This is a throughput concern, not a correctness bug, but given the design goal of maximising parallelism it is worth addressing by continuing to try other workers after a failed TryMarkBusy rather than stopping.

3. SignalRWorkerCoordinator.DequeueModuleAsync — single-item delivery with no re-request loop

DequeueModuleAsync invokes RequestWork, then waits on the channel for exactly one item, then returns. The WorkerModuleExecutor loop calls DequeueModuleAsync in a while loop, so subsequent calls each trigger a new RequestWork invocation. This creates a round-trip-per-module pattern. More importantly: if the channel completes (via OnSignalCompletion calling TryComplete()) while a worker is waiting for work that the master will never send (e.g., all remaining assignments require capabilities this worker doesn't have), the worker correctly stops. However, if a worker's RequestWork arrives at the hub between the master enqueuing and the hub's TryAssignPendingWork executing, the assignment can be missed and the worker hangs until a future RequestWork or completion signal. This is an inherent race in request-based rather than subscription-based work delivery.

4. WorkerModuleExecutorTests — still asserts nothing

WorkerModuleExecutorTests.Worker_Registers_With_Coordinator remains:

await Assert.That(true).IsTrue();

This is the most execution-critical class in the feature and has zero meaningful test coverage. At minimum the test should verify that a module is dequeued, executed via IModuleRunner, and its result published. The comment says 'Detailed testing requires mocking the full DI and execution pipeline' — this is accurate but not a blocker; DependencyResultPropagationTests demonstrates that the module execution components can be wired with hand-constructed collaborators.

5. Cloudflared tunnel regex is too narrow

CloudflaredTunnel.TunnelUrlRegex matches only https://[a-zA-Z0-9-]+\.trycloudflare\.com. Cloudflare quick tunnels use multi-segment random hostnames (e.g., https://random-words-here.trycloudflare.com) but the pattern is constrained to a single label between https:// and .trycloudflare.com. If the tunnel URL contains a dash-separated multi-word hostname (which is the current Cloudflare format), the regex will fail to match and CloudflaredTunnel.StartAsync will time out, making the master unreachable to workers. The regex should be https://[\w-]+\.trycloudflare\.com or similar to match multi-label subdomain patterns.

6. ArtifactContextExtensions.Artifacts() — not registered, will throw at runtime

As noted above, IArtifactContext has no DI registration. context.GetService<IArtifactContext>() in ArtifactContextExtensions.Artifacts() will throw InvalidOperationException for any module that calls context.Artifacts(). This is a regression from the previous fix attempt. The cleanest resolution: change the extension to construct ArtifactContextImpl directly, taking IDistributedArtifactStore, IOptions<ArtifactOptions>, and the current module type name from context. Since the extension signature is Artifacts(this IPipelineContext context), and IPipelineContext does not expose the current module type, the extension should be changed to Artifacts(this IModuleContext context) where IModuleContext exposes the module type, or an overload accepting the module type explicitly.

7. Worker disconnect does not re-enqueue in-flight work — documented but unresolved

OnDisconnectedAsync in DistributedPipelineHub contains:

// TODO: Re-enqueue in-flight work for the disconnected worker

If a worker disconnects mid-execution, the master's WaitForResultAsync will block indefinitely for that module. This is a correctness issue; the pipeline will hang. Since the hub tracks WorkerState with IsBusy state, it is possible to detect what the worker was executing at disconnect time and re-publish the assignment. If re-execution is not feasible (e.g., side effects already occurred), at minimum the master should be notified and the affected result TCS cancelled with a descriptive error.


Still Open From Previous Rounds

ModuleCompletionSourceApplicator uses reflection despite TrySetDistributedResult existing — as described above, the reflection path is still live. TrySetDistributedResult was added to IModule but is unused. Replace the two ModuleCompletionSourceApplicator.TryApply call sites with module.TrySetDistributedResult(result).

MatrixModuleExpander not wired into DistributedModuleExecutor — the TODO(matrix) comment in DistributedModuleExecutor.ExecuteAsync and DistributedWorkPublisher.CreateAssignment is unchanged. [MatrixTarget] is a public attribute that does nothing. This was flagged in Rounds 3–6. Either connect MatrixModuleExpander.ScanForExpansions in the execution loop or remove the public-facing attribute and expander from this PR.

Redis artifact store uploads full stream to memory — unchanged, as described above.


Summary

The SignalR backend is a substantial improvement over the polling Redis coordinator and correctly addresses the FIFO/livelock concerns from previous rounds. The hub DI wiring, pub/sub completion signaling, and worker lifecycle are well-structured.

The three items that need resolution before merge:

  1. Replace ModuleCompletionSourceApplicator reflection with module.TrySetDistributedResult(result) — the public API already exists, just use it.
  2. IArtifactContext is unregisteredcontext.Artifacts() throws at runtime for all users of the feature.
  3. Worker disconnect hangs the master — the TODO in OnDisconnectedAsync needs a concrete cancellation path (even if re-execution is deferred).

The remaining items (Cloudflared regex, WorkerModuleExecutorTests stub, MatrixModuleExpander dead code, Redis OOM) are improvements that would increase robustness and correctness of documentation, but are lower severity.

…n for WorkerModuleScheduler

- ConfigurationTests: update expected ConnectionTimeoutSeconds from 30 to 120
  to match the default change in SignalRDistributedOptions
- WorkerModuleScheduler: return never-completing Channel reader instead of
  throwing NotSupportedException from ReadyModules property
If Redis evicts individual chunk keys under memory pressure, the
download would silently return truncated data. Now validates the
reassembled length against the stored SizeBytes in ArtifactReference.
Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Distributed Workers Mode — Current Pass Code Review

After many revision rounds, this PR has matured significantly. The core architecture is solid: the pluggable IDistributedCoordinator/IDistributedArtifactStore abstractions, real IHubContext<T> usage in SignalRMasterCoordinator, Lua-based atomic dequeue in Redis (TryScanAndClaimAsync), Lazy<Task> deduplication in ArtifactLifecycleManager, proper await _app.StartAsync() for Kestrel readiness, and the check-subscribe-recheck pattern in WaitForResultAsync are all correct and well-thought-out. The 86+ tests across three packages demonstrate strong coverage. Below are the remaining issues that should be addressed before merging.


🔴 Critical: Worker disconnection causes master hang

DistributedPipelineHub.csOnDisconnectedAsync

public override Task OnDisconnectedAsync(Exception? exception)
{
    if (_masterState.Workers.TryRemove(Context.ConnectionId, out var workerState))
    {
        _logger.LogWarning("Worker {Index} disconnected (connection {ConnectionId})",
            workerState.Registration.WorkerIndex, Context.ConnectionId);

        // TODO: Re-enqueue in-flight work for the disconnected worker
    }
    return Task.CompletedTask;
}

The TODO has been present for multiple rounds. When a worker disconnects mid-execution, the master calls WaitForResultAsync on a TaskCompletionSource that is never completed. With ModuleResultTimeoutSeconds = 0 (the default), this is an infinite hang — the pipeline stalls silently.

Suggested approach: Track each worker's currently-executing assignment in WorkerState. In OnDisconnectedAsync, if the worker had an in-flight assignment, either call PublishResultAsync with a failure result, or re-enqueue the assignment so another worker (or the master) can retry it. The failure-result path is simpler and gives a clear error to the caller:

if (workerState.CurrentAssignment is { } assignment)
{
    var failure = SerializedModuleResult.Failure(assignment.ModuleTypeName, "Worker disconnected");
    if (_masterState.ResultWaiters.TryGetValue(assignment.ModuleTypeName, out var tcs))
        tcs.TrySetResult(failure);
}

🔴 Critical: WaitForResultAsync (Redis) silently drops deserialization errors

RedisDistributedCoordinator.cs

subscription.OnMessage(msg =>
{
    var result = JsonSerializer.Deserialize<SerializedModuleResult>(msg.Message.ToString(), _jsonOptions)!;
    tcs.TrySetResult(result);
});

If JsonSerializer.Deserialize throws (malformed message, schema mismatch), the exception is swallowed by StackExchange.Redis's message handler. The tcs is never resolved and WaitForResultAsync hangs indefinitely (or until the optional timeout fires, if one is configured).

Suggested fix: Wrap the callback body in try/catch and call tcs.TrySetException on failure:

subscription.OnMessage(msg =>
{
    try
    {
        var result = JsonSerializer.Deserialize<SerializedModuleResult>(msg.Message.ToString(), _jsonOptions)!;
        tcs.TrySetResult(result);
    }
    catch (Exception ex)
    {
        tcs.TrySetException(ex);
    }
});

🟡 Design: IDistributedCoordinator violates Interface Segregation

SignalRWorkerCoordinator throws NotSupportedException for three methods that are conceptually master-only:

public Task EnqueueModuleAsync(...) => throw new NotSupportedException("Workers do not enqueue work.");
public Task<SerializedModuleResult> WaitForResultAsync(...) => throw new NotSupportedException("Workers do not wait for results.");
public Task<IReadOnlyList<WorkerRegistration>> GetRegisteredWorkersAsync(...) => throw new NotSupportedException("Workers do not query registered workers.");

This means the interface cannot be used safely by code that calls these methods without knowing the underlying role. This has been raised in earlier rounds — splitting into IMasterCoordinator : IDistributedCoordinator and IWorkerCoordinator : IDistributedCoordinator (or separate interfaces entirely) would eliminate the NotSupportedException paths and make the role contract explicit at the type level. That said, since both master and worker use IDistributedCoordinator in DistributedModuleExecutor, this refactor has non-trivial scope — at minimum, consider adding XML doc comments on the throwing methods noting they are master-only, and adding a guard in the factory so the wrong coordinator type cannot be injected for a given role.


🟡 Design: [MatrixTarget] ships as dead public API

MatrixTargetAttribute is public with a clean, usable API:

[MatrixTarget("linux", "windows", "macos")]
public class MyModule : Module<string> { ... }

But the expander is explicitly not wired up:

// TODO(matrix): MatrixModuleExpander.ScanForExpansions not yet connected.
// Modules with [MatrixTarget] will run once, not N times.

Shipping a public attribute that silently does nothing when used is a leaky API contract — users will apply it expecting N runs and get 1. Recommended: Either remove [MatrixTarget] and MatrixModuleExpander from this PR (add in a follow-up), or add a runtime warning/exception in DistributedModuleExecutor when it encounters a module decorated with [MatrixTarget] so users get actionable feedback.


🟡 Maintainability: RunIdentifierResolver duplicated across packages

src/ModularPipelines.Distributed.Redis/Configuration/RunIdentifierResolver.cs and src/ModularPipelines.Distributed.Artifacts.S3/Configuration/RunIdentifierResolver.cs are byte-for-byte identical. The same CI environment variable resolution logic (GITHUB_SHA, BUILD_SOURCEVERSION, CI_COMMIT_SHA, git fallback, GUID fallback) is copied in two places.

Suggested approach: Extract to a shared internal helper in ModularPipelines (core) or a ModularPipelines.Distributed package, and reference it from both packages. If inter-package dependency is undesirable, link the file as a shared source.


🟡 Reliability: ModuleResultTimeoutSeconds defaults to 0 (wait forever)

/// <summary>
/// Default timeout in seconds for waiting for a distributed module result.
/// Applied when a module has no explicit Timeout configured. 0 = no timeout (wait forever).
/// </summary>
public int ModuleResultTimeoutSeconds { get; set; }

In a distributed system where workers can crash, a default of "no timeout" combined with the unresolved OnDisconnectedAsync TODO creates a silent, indefinite hang with no signal to the operator. Until the disconnect-recovery is implemented, consider defaulting this to something reasonable (e.g., 3600 seconds) so failures surface in bounded time.


✅ What's in good shape

  • Reflection-based CompletionSource access is gone — TrySetDistributedResult via IModule is clean
  • Redis TryScanAndClaimAsync using a Lua script is correctly atomic (LRANGE + LREM in a single script evaluation)
  • ArtifactLifecycleManager failure path calls _completedRestores.TryRemove so retries work
  • MasterServerHost uses await _app.StartAsync() so there is no race between server readiness and worker connection
  • SignalRMasterCoordinator uses the real IHubContext<DistributedPipelineHub> from the WebApp DI container — the no-op proxy issue from earlier rounds is fully resolved
  • Redis pub/sub-based DequeueModuleAsync correctly avoids the initial-scan → subscribe race with a re-check after subscription
  • Artifact download deduplication via ConcurrentDictionary<string, Lazy<Task>> is a correct and efficient pattern
  • The CloudflaredTunnel disposal calls Kill(entireProcessTree: true) which correctly cleans up the child process

Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Round 7 Review: Distributed Workers Mode

I reviewed the current code at SHA 541ffb241d against all previous review rounds. Several issues from the previous review have been resolved; the two critical correctness bugs remain open.


Resolved Since Previous Review

  • specs/ directory — removed ✓
  • WorkerModuleScheduler.ReadyModules — now returns a static empty Channel<ModuleState> instead of throwing NotSupportedException
  • Redis upload OOMRedisDistributedArtifactStore.UploadAsync does NOT buffer the entire stream; it reads in fixed-size _chunkSize chunks and makes the single-vs-chunked decision based on the first read result. The previous review's OOM concern does not apply to the current code ✓
  • Redis chunked download integrity — size validation is present on reassembly ✓
  • ArtifactLifecycleManager Lazy<Task> exception caching — on failure, the key is removed from _completedRestores, allowing retry. The previous concern does not apply ✓

Still Open: Critical

1. GetModule<T>() cross-process hang — fix reverted with no alternative

The commit 799d23c7 reverts the [PinToMaster] protection for modules that call context.GetModule<T>(). In distributed mode, GetModule<T>() awaits a TaskCompletionSource that is set only in the process that executed the module. If a worker calls GetModule<T>() for a module that ran on the master or another worker, the source is never set in that process and the worker deadlocks permanently.

This silently breaks any existing pipeline that uses GetModule<T>() — the most common module-composition pattern in the framework. Users have no warning.

Options:

  • Restore [PinToMaster] auto-detection and restore the reverted commit
  • Throw a clear DistributedModeException from GetModule<T>() on non-master processes with a message pointing users to [PinToMaster]
  • Propagate completed results for ALL modules (not just declared [DependsOn<T>] dependencies) through the coordinator so workers can satisfy GetModule<T>() calls cross-process

2. IArtifactContext not registered in DI — context.Artifacts() throws at runtime

ArtifactContextExtensions.Artifacts() calls context.GetService<IArtifactContext>(), but IArtifactContext / ArtifactContextImpl has no DI registration anywhere — not in DependencyInjectionSetup.RegisterDistributedServices(), not in DistributedPipelineBuilderExtensions, not in any package extension. Any module that calls context.Artifacts() will throw InvalidOperationException at runtime.

ArtifactContextImpl requires a string currentModuleTypeName constructor parameter, making a plain singleton registration incorrect (all modules would share an empty or wrong type name). The cleanest fix avoids DI registration entirely:

// Change signature from IPipelineContext to IModuleContext
public static IArtifactContext Artifacts(this IModuleContext context)
{
    return new ArtifactContextImpl(
        context.Services.Get<IDistributedArtifactStore>(),
        context.Services.Get<IOptions<ArtifactOptions>>(),
        context.CurrentModuleType.FullName!);
}

This constructs the correct instance per-call without any lifetime mismatch.


Still Open: Significant

3. WorkerModuleExecutorTests is a placeholder

[Test]
public async Task Worker_Registers_With_Coordinator()
{
    // Detailed testing requires mocking the full DI and execution pipeline.
    await Assert.That(true).IsTrue();
}

This is the most execution-critical component in the entire feature. The existing integration test in DistributedPipelineIntegrationTests exercises the coordinator and master path but simulates the worker manually — it does not go through WorkerModuleExecutor. At minimum there should be a test that verifies: a module is dequeued → executed → result published to coordinator. Without this, regressions in the worker path (the GetModule<T>() hang, capability matching, artifact lifecycle) are invisible.

4. MatrixModuleExpander is dead public API

[MatrixTarget] is a public attribute that silently does nothing. MatrixModuleExpander.ScanForExpansions is never called from DistributedModuleExecutor.ExecuteAsync or anywhere else. The PR description marks tasks T044–T048 as [x] complete, which is inaccurate. A TODO(matrix) comment in the executor acknowledges this.

Before merging: either wire the expander into DistributedModuleExecutor.ExecuteAsync, or remove [MatrixTarget], [MatrixExpansionAttribute], and MatrixModuleExpander from this PR and mark the matrix tasks as deferred. Shipping public API surface that appears functional but has no effect is worse than shipping nothing — users will write code against it and discover it does nothing only at runtime.

5. ModuleCompletionSourceApplicator still uses reflection for dependency results

WorkerModuleExecutor.ApplyDependencyResults calls ModuleCompletionSourceApplicator.TryApply(depModule, result), which accesses the internal CompletionSource property of dependency modules via private reflection. While the module's own ResultTask is now properly exposed through IModule.TrySetDistributedResult (fixing the critical issue from Round 3), the dependency-result application path still bypasses the framework's public contract.

The fix pattern is already established — extend IModule with an internal SetDependencyResult(IModuleResult) method (or reuse TrySetDistributedResult if semantics permit), so the entire distributed result-application path goes through controlled API rather than reflection that will break silently on any Module<T> refactor.

6. InMemory dequeue: capability-mismatch spin with no backoff

When a queued item does not match any connected worker's capabilities, each worker that wakes scans the full queue, finds no match, and re-releases the semaphore — immediately waking the next worker to repeat. With N workers and one incompatible item, this becomes an O(N) spin loop with no sleep.

Adding a short delay before re-releasing the semaphore (await Task.Delay(50, cancellationToken) before _workAvailable.Release() in the no-match branch) would break the spin without meaningful latency impact, since the scenario only occurs when no suitable worker exists.


Minor

7. RunIdentifierResolver duplicated across three packages

Identical implementations (same priority chain: explicit config → GITHUB_SHABUILD_SOURCEVERSIONCI_COMMIT_SHAgit rev-parse HEADGuid.NewGuid()) exist in ModularPipelines.Distributed.Redis, ModularPipelines.Distributed.Artifacts.S3, and ModularPipelines.Distributed.Discovery.Redis. Any bug fix or new CI environment must be applied three times. This should be consolidated into a shared location.

8. Redis queue LIFO ordering (undocumented)

EnqueueModuleAsync uses ListLeftPushAsync (head-push), and the Lua scan starts from index 0 (head), so the most recently enqueued module executes first — LIFO, not FIFO. This may be intentional, but it is undocumented and can cause starvation for older items when new work arrives continuously. If FIFO is intended, change to ListRightPushAsync on enqueue.


Summary Table

# Issue Status
1 GetModule<T>() cross-process hang — fix reverted, no alternative Critical
2 IArtifactContext not in DI — context.Artifacts() throws at runtime Critical
3 WorkerModuleExecutorTests is a placeholder with no real assertions Significant
4 MatrixModuleExpander not wired — [MatrixTarget] silently does nothing Significant
5 ModuleCompletionSourceApplicator uses reflection for dependency results Significant
6 InMemory dequeue: capability-mismatch busy-spin with no backoff Significant
7 RunIdentifierResolver duplicated in three packages Minor
8 Redis LIFO ordering undocumented Minor
9 specs/ directory Fixed
10 WorkerModuleScheduler.ReadyModules throws Fixed
11 Redis upload OOM Fixed ✓ (current code streams chunk-by-chunk)
12 ArtifactLifecycleManager Lazy<Task> failure caching Fixed
13 Redis chunked download integrity Fixed
(prior rounds) CompletionSource reflection, BuildServiceProvider, WorkerHealthMonitor stub, CancellationToken.None, InMemory livelock, BRPOP ordering, TOTAL_INSTANCES, S3 MemoryStream, plugin thread safety, duplicate AddDistributedMode All Fixed

Issues #1 and #2 are runtime correctness bugs that will affect any user of the feature as shipped. They should be resolved before merge.

…egex

- Make MatrixTargetAttribute internal (not yet wired to executor, avoid
  shipping dead public API)
- Replace 50ms busy-poll in SignalRMasterCoordinator.DequeueModuleAsync
  with SemaphoreSlim signal from EnqueueModuleAsync/SignalCompletionAsync
- Generalize cloudflared tunnel URL regex to support custom domains and
  add warning log when URL detection times out
@thomhurst
Copy link
Owner Author

Response to Code Review (Latest Pass)

Addressed in commit 976e647:

# Issue Resolution
1 [MatrixTarget] dead public API Fixed — Made MatrixTargetAttribute internal. The expansion infrastructure remains for future wiring.
3 50ms busy-poll in SignalR master dequeue Fixed — Replaced with SemaphoreSlim signal from EnqueueModuleAsync/SignalCompletionAsync.
5 CloudflaredTunnel regex hardcodes trycloudflare.com Fixed — Generalized regex to match any HTTPS URL (filtering api.cloudflare.com), added warning log on timeout.

Not addressing in this PR:

# Issue Reasoning
2 IDistributedCoordinator ISP violation The master-only methods (EnqueueModuleAsync, WaitForResultAsync, GetRegisteredWorkersAsync) are only called from role-gated code paths (DistributedModuleExecutor, DistributedWorkPublisher) that are only instantiated for the master role. Workers never hit those code paths. Splitting the interface would create significant churn across all implementations (Redis, SignalR, InMemory) and their tests for minimal practical benefit. The NotSupportedException serves as a safety net that would never fire in correct usage.
4 Result waiter cancellation race on shutdown The race window is extremely narrow — SignalCompletionAsync is only called after all modules have been distributed and results collected. Any in-flight PublishResult would have already been processed by the hub before SignalCompletionAsync runs. The TrySetCanceled is idempotent on already-resolved waiters.
6 5 CI runners on every push Noted — this is a valid CI cost concern. Will consider gating extra runners on distributed-related path changes in a follow-up.

…hed www.cloudflare.com

The generalized regex `https://[a-zA-Z0-9\-]+\.[a-zA-Z0-9\-\.]+` was too broad
and matched `https://www.cloudflare.com` from cloudflared's log output, causing
workers to connect to the Cloudflare homepage instead of the tunnel URL.

Reverted to `*.trycloudflare.com` since we only use quick tunnels.
Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review: Distributed Workers Mode (PR #2430)

This is a substantial and well-architected PR that adds distributed pipeline execution. The abstractions are clean and the documentation is thorough. Below are observations ranging from architectural concerns to specific implementation issues.


Architecture and Design

Overall design is sound. The master/worker model with a pluggable coordinator (IDistributedCoordinator) is a well-established pattern. Separating concerns into coordinator, artifact store, capability matcher, and serializer layers is clean and extensible.

The DeferredCoordinator and DeferredArtifactStore in PipelineBuilder.cs are a good pattern, but the double-checked locking has a subtle non-atomicity issue:

private async ValueTask<IDistributedCoordinator> GetAsync(CancellationToken ct)
{
    if (_inner is not null) return _inner;  // unsynchronized read - fine for reference reads
    await _lock.WaitAsync(ct);
    try { return _inner ??= await factory.CreateAsync(ct); }
    finally { _lock.Release(); }
}

The initial null check outside the lock is fine for reference reads in .NET (references are written atomically), but _inner should be declared volatile to prevent CPU/compiler reordering from returning a partially-initialized object that was written before volatile memory barriers were published. Add private volatile IDistributedCoordinator? _inner; to be safe.


Significant Issues

1. ModuleAssignment.RequiredCapabilities uses HashSet<string> (mutable) in a public record

public record ModuleAssignment(
    string ModuleTypeName,
    // ...
    HashSet<string> RequiredCapabilities,  // mutable collection in a record
    // ...

Using a mutable HashSet<string> in a record breaks value equality semantics (records use reference equality for mutable collections) and makes the assignment payload mutable after creation. Since IReadOnlySet<string> is already used in IDistributedCoordinator.DequeueModuleAsync and capability matching, the record should use IReadOnlySet<string> or FrozenSet<string> instead:

public record ModuleAssignment(
    // ...
    IReadOnlySet<string> RequiredCapabilities,
    // ...

The same issue applies to WorkerRegistration.Capabilities.

2. ResolveDistributedOptions in PipelineBuilder.cs only processes ImplementationInstance descriptors

foreach (var descriptor in services.Where(d =>
    d.ServiceType == typeof(IConfigureOptions<DistributedOptions>) &&
    d.ImplementationInstance is IConfigureOptions<DistributedOptions>))  // <-- only instances

services.Configure<DistributedOptions>(o => { ... }) typically registers ConfigureNamedOptions<T> via a factory (ImplementationFactory), not an ImplementationInstance. This means the ResolveDistributedOptions method can fail silently: it detects hasConfigureOptions as true (because the ServiceType exists), creates an empty DistributedOptions, loops over zero matching descriptors, and returns defaults. This could cause distributed mode to be erroneously inactive or to default to the wrong role.

A more reliable approach would be to use a sentinel value or a simpler detection mechanism — for example, registering a concrete DistributedOptions instance as a singleton when AddDistributedMode is called, rather than relying on the options framework internals.

3. InMemoryDistributedCoordinator missing SendHeartbeatAsync and BroadcastCancellationAsync / IsCancellationRequestedAsync

The documentation (architecture.md) describes these methods as part of the coordinator interface, but the IDistributedCoordinator interface in the diff only has 7 methods (no SendHeartbeatAsync, BroadcastCancellationAsync, or IsCancellationRequestedAsync). The docs appear to describe a different (earlier?) iteration of the interface. This should be reconciled — either the interface is incomplete or the documentation is out of date. If WorkerCancellationMonitor polls via these methods (as the docs state), they need to be in the interface.

4. WorkerModuleScheduler has a shared static EmptyChannel that is never completed

private static readonly Channel<ModuleState> EmptyChannel = Channel.CreateUnbounded<ModuleState>();
public ChannelReader<ModuleState> ReadyModules => EmptyChannel.Reader;

Since this is static, all WorkerModuleScheduler instances share the same channel. If anything calls ReadAllAsync() on this reader it will block forever (the channel is never written to or completed). The channel writer is never accessible. This is mostly harmless because WorkerModuleExecutor doesn't consume ReadyModules, but if anything in the framework iterates ReadyModules, it will deadlock. Each instance should have its own completed channel:

private readonly Channel<ModuleState> _emptyChannel;

public WorkerModuleScheduler()
{
    _emptyChannel = Channel.CreateUnbounded<ModuleState>();
    _emptyChannel.Writer.Complete();
}

public ChannelReader<ModuleState> ReadyModules => _emptyChannel.Reader;

5. Lua script in RedisDistributedCoordinator scans entire queue on every dequeue

local items = redis.call('LRANGE', KEYS[1], 0, -1)

The ScanAndClaimScript does a full LRANGE over the entire work queue to find a matching assignment. In pipelines with many queued modules, this becomes O(N) per dequeue attempt, executed atomically inside a Lua script (blocking Redis for the duration). For moderate queues this is acceptable, but for large pipelines this can cause Redis latency spikes. Consider adding a comment about this tradeoff, or implementing separate per-capability queues as a future optimization.

6. DistributedModuleExecutor executes assignments sequentially on the master

while (!cancellationToken.IsCancellationRequested)
{
    var assignment = await _coordinator.DequeueModuleAsync(capabilities, cancellationToken);
    // ...
    await ExecuteAssignmentAsync(assignment, modules, workerScheduler, cancellationToken);

The master worker loop is fully sequential — it executes one module at a time. This means the master never executes multiple modules in parallel, even though the original (non-distributed) executor likely does. The WorkerModuleExecutor has the same limitation. For pipelines where modules have no capability constraints (and many run on the master), this reduces parallelism significantly. A SemaphoreSlim-bounded parallel loop would be more consistent with the original behavior.

7. Artifact upload failure silently allows result publication to proceed

catch (Exception ex)
{
    _logger.LogError(ex, "Failed to upload artifacts for {Module}", assignment.ModuleTypeName);
}
// falls through - result is still published without artifacts

If artifact upload fails, the result is published without the artifact references. A consumer module that [ConsumesArtifact] from this module will then fail at download time with a more confusing error. Consider treating artifact upload failure as a module failure, or at minimum publishing a result with an explicit "artifacts missing" indicator so downstream modules fail with a clear message.

8. ModuleResultSerializer silently falls back to absolute path when file is outside git root

// Path is outside git root — store as-is (will only work on same platform)
return absolutePath;

This silent fallback produces a cross-platform serialization bug — an absolute Windows path gets sent to a Linux worker (or vice versa) and fails at deserialization with a cryptic File not found error. The serializer should throw or log a warning when it cannot make a path portable, rather than silently storing an OS-specific path.


Minor Issues

9. DistributedOptions lacks XML documentation

DistributedOptions is a public class but its properties have no XML doc comments, unlike the configuration docs (which are excellent). Adding <summary> tags would enable IDE tooling to surface descriptions.

10. MatrixTargetAttribute is internal but documented as a public user-facing feature

The docs (capabilities.md) show [MatrixTarget("windows", "linux", "macos")] as a user-facing attribute, but it is declared internal sealed class MatrixTargetAttribute. Users cannot apply it. This should either be made public or the documentation should be removed/marked as "coming soon."

11. DistributedOptions.HeartbeatIntervalSeconds and HeartbeatTimeoutSeconds are documented but missing from the class

The configuration docs list HeartbeatIntervalSeconds (default 10) and HeartbeatTimeoutSeconds (default 30) as properties of DistributedOptions, but these are not present in the DistributedOptions class in the diff. The WorkerHealthMonitor and WorkerHeartbeatService background services mentioned in the architecture docs are also absent. The docs may be aspirational — if heartbeat monitoring is not yet implemented, the docs should note this.

12. ConsumesArtifactAttribute.RestorePath setter is public but should validate

public string? RestorePath { get; set; }

The property is a mutable string with no validation. Since it's configured at compile time via an attribute initializer, a required property or init-only setter would be more appropriate and prevent accidental runtime mutation.

13. strategy.job-total GitHub Actions expression may not be available

TOTAL_INSTANCES: ${{ strategy.job-total }}

strategy.job-total is a GitHub Actions context expression that reflects the total number of matrix jobs. This should work correctly for the matrix defined in the workflow, but it is worth validating that this expression is stable across GitHub Actions runner versions and doesn't return incorrect values when fail-fast: false is used and some jobs are cancelled.


Positive Observations

  • The check-subscribe-recheck pattern in RedisDistributedCoordinator.WaitForResultAsync correctly handles the race condition and is well-documented.
  • Compressing large dependency results before sending them through Redis (GzipPrefix + base64) is a thoughtful optimization that prevents payload limit issues.
  • Run identifier auto-detection (GitHub SHA → Azure DevOps SHA → GitLab SHA → git → GUID) is pragmatic and correctly isolates concurrent runs.
  • The IDistributedCoordinator interface is clean and narrow enough to be implementable with diverse backends (HTTP, shared filesystem, message queues, etc.).
  • Worker registration polling with a configurable CapabilityTimeoutSeconds gracefully handles staggered matrix runner startup.
  • The test coverage across coordinator, serialization, capability matching, and integration scenarios is thorough.

Summary: The core design is solid and the implementation handles most edge cases (race conditions, master hanging, worker crash). The most impactful issues to address before merge are: (1) the ResolveDistributedOptions reliability concern, (2) the mutable HashSet in ModuleAssignment, (3) making MatrixTargetAttribute public or removing it from docs, and (4) clarifying which features documented in architecture.md (heartbeats, cancellation broadcast, BroadcastCancellationAsync) are actually implemented in this PR.

🤖 Generated with Claude Code

Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review: feat: Distributed Workers Mode (Round 13)

The fixes from commit 976e647 are confirmed. Three of the six items from the last round are resolved.


Fixed Since Last Review ✓

# Issue Status
1 [MatrixTarget] dead public API Fixedinternal sealed with XML docs noting "Not yet wired... Reserved for future use"
3 SignalRMasterCoordinator 50ms busy-poll FixedSemaphoreSlim.WaitAsync replaces the polling loop
5 CloudflaredTunnel URL regex hardcoded domain Fixed — generalised to https://[a-zA-Z0-9\-]+\.[a-zA-Z0-9\-\.]+ with api.cloudflare.com exclusion

Not Addressed (Author's Decision — Noted)

# Issue
2 IDistributedCoordinator ISP violation — NotSupportedException guards are present; all implementations are internal so external callers cannot trigger them. Acceptable for now.
4 SignalCompletionAsync cancellation race — narrow window, TrySetCanceled is idempotent on resolved waiters. Acceptable.
6 5 CI runners on every push — deferred.

Remaining Open Issues

These were raised in earlier rounds and are still present in the current head commit.


1. docs/docs/distributed/architecture.md describes a different interface than exists

The architecture doc lists a Cancellation concern with BroadcastCancellationAsync and IsCancellationRequestedAsync, and describes background services WorkerHealthMonitor, WorkerHeartbeatService, and WorkerCancellationMonitor. None of these exist in the codebase or in IDistributedCoordinator (which has 7 methods). The docs imply 9+ methods.

Why this matters: users implementing a custom coordinator will write stub implementations for methods that don't exist, or look for services that aren't there. This is a straightforward documentation fix — update the interface table and remove the non-existent services section.


2. RunIdentifierResolver triplicated with inconsistent fallback logic — could cause runtime failures

Three independent copies exist:

src/ModularPipelines.Distributed.Redis/Configuration/RunIdentifierResolver.cs       ← git SHA / CI env vars
src/ModularPipelines.Distributed.Artifacts.S3/Configuration/RunIdentifierResolver.cs ← byte-for-byte identical to Redis
src/ModularPipelines.Distributed.Discovery.Redis/RunIdentifierResolver.cs            ← SHA256 hash of CWD (different!)

The inconsistency is the real concern: the discovery package derives a run identifier using SHA256 of Environment.CurrentDirectory, while the coordinator and artifact store use git SHA or CI environment variables. If these produce different strings, the discovery package and coordinator will look up different Redis keys and workers will fail to locate the master.

Fix: move RunIdentifierResolver into ModularPipelines.Distributed core (it was already suggested in round 1) and agree on a single fallback order. The two identical copies can simply reference the shared one.


3. ArtifactLifecycleManager.ResolvePathPattern silently returns wrong results for ** glob patterns

var wildcardIndex = pathPattern.IndexOfAny(['*', '?']);
var baseDir = Path.GetDirectoryName(pathPattern[..wildcardIndex]);  // "**/bin/*.dll" → "" → cwd
var searchPattern = Path.GetFileName(pathPattern);                   // → "*.dll" (loses the "/bin/" constraint)

var matches = Directory.GetFiles(baseDir, searchPattern, SearchOption.AllDirectories);
// Returns ALL *.dll files from cwd, not only those under bin/ directories

For **/bin/*.dll the bin/ segment is silently dropped; for src/tests/**/*.cs the tests/ sub-path is also lost. No error is raised — callers get plausible-looking results that may include far more files than intended. Published artifacts can become unexpectedly large.

Suggested fix using Microsoft.Extensions.FileSystemGlobbing (no new NuGet dependency — it ships in-box with the SDK):

var matcher = new Matcher();
matcher.AddInclude(pathPattern);
var result = matcher.Execute(
    new DirectoryInfoWrapper(new DirectoryInfo(Directory.GetCurrentDirectory())));
return result.Files
    .Select(f => Path.GetFullPath(Path.Combine(Directory.GetCurrentDirectory(), f.Path)))
    .ToList();

4. InMemoryDistributedCoordinator.WaitForResultAsync — shared TCS can be permanently poisoned by cancellation

// Current code
var tcs = _results.GetOrAdd(moduleTypeName,
    _ => new TaskCompletionSource<SerializedModuleResult>());   // shared across all callers
using var reg = cancellationToken.Register(
    () => tcs.TrySetCanceled(cancellationToken));               // permanently sets state to Cancelled
return await tcs.Task;

If any caller's token fires, the shared TCS moves to the Cancelled terminal state. Every subsequent caller for the same module immediately throws OperationCanceledException, and PublishResultAsync's TrySetResult silently no-ops. Note also that TaskCreationOptions.RunContinuationsAsynchronously is absent here (it is present in the SignalRMasterCoordinator equivalent), so cancellation callbacks run synchronously on the cancelling thread.

Fix — use Task.WaitAsync to propagate cancellation without touching the shared TCS:

public Task<SerializedModuleResult> WaitForResultAsync(
    string moduleTypeName, CancellationToken cancellationToken)
{
    var tcs = _results.GetOrAdd(moduleTypeName,
        _ => new TaskCompletionSource<SerializedModuleResult>(
            TaskCreationOptions.RunContinuationsAsynchronously));
    return tcs.Task.WaitAsync(cancellationToken);
}

Task.WaitAsync cancels only the awaiting call, leaving the underlying TCS untouched for other waiters and for PublishResultAsync.


5. Worker disconnect silently loses in-flight work — open TODO

DistributedPipelineHub.OnDisconnectedAsync contains:

// TODO: Re-enqueue in-flight work for the disconnected worker
return Task.CompletedTask;

A worker that crashes after DequeueModuleAsync but before PublishResultAsync leaves the master's WaitForResultAsync blocked indefinitely (ModuleResultTimeoutSeconds defaults to 0, meaning no timeout). The pipeline hangs with no observable error.

This is worse than either of the two acceptable outcomes: (a) re-enqueue the assignment for another worker to pick up, or (b) fail fast with a clear error message. The current behaviour is an invisible hang. At minimum the TODO should be replaced with a LogCritical and a TrySetException on the result waiter so the pipeline fails loudly instead of stalling forever.


Summary

The three fixes are clean and correct. The five remaining items are in descending order of urgency:

  1. Architecture docs — misleads IDistributedCoordinator implementors with non-existent methods and services
  2. RunIdentifierResolver inconsistency — discovery package uses a different algorithm; could cause workers to fail to locate master
  3. Worker disconnect TODO — silent infinite hang on worker crash; needs resolution or explicit documentation
  4. TCS poisoning in InMemoryDistributedCoordinator — correctness bug in less-common paths; one-line fix available
  5. ResolvePathPattern ** glob — silently uploads wrong file sets; FileSystemGlobbing is the right fix

Items 1–3 are the strongest candidates to address before merge.

…cate shared code, pin cloudflared

- Add volatile to DeferredCoordinator/DeferredArtifactStore _inner fields
  for correct double-checked locking memory visibility (#9)
- Replace O(n) FirstOrDefault module lookups with Dictionary<string, IModule>
  built once per execution in both executors (#4)
- Extract DependencyResultApplicator with shared ApplyDependencyResults and
  PublishResolutionFailureAsync, eliminating duplication between
  DistributedModuleExecutor and WorkerModuleExecutor (#5)
- Pin cloudflared to version 2026.2.0 in CI workflow instead of using
  latest release (#8)
@thomhurst
Copy link
Owner Author

Response to 9-Item Code Review

Addressed in commit bc84860:

# Issue Resolution
4 O(n) module lookup per assignment Fixed — Both executors now build a Dictionary<string, IModule> once at the start of ExecuteAsync. Module lookups and dependency result application are O(1).
5 ApplyDependencyResults and PublishResolutionFailureAsync duplicated Fixed — Extracted shared DependencyResultApplicator static class with Apply, PublishResolutionFailureAsync, and BuildModuleLookup methods. Both executors delegate to this shared class.
8 Cloudflared latest not version-pinned Fixed — Pinned to 2026.2.0. Also switched macOS from brew install (which can't pin version) to direct tarball download.
9 DeferredCoordinator double-checked locking without volatile Fixed — Added volatile to _inner field in both DeferredCoordinator and DeferredArtifactStore.

Previously addressed:

# Issue Resolution
1 ISP violation Declined — role-gated code paths prevent misuse (see prior response)
6 MatrixTarget dead public API Fixed in earlier commit — made internal

Not addressing in this PR:

# Issue Reasoning
2 ResolveDistributedOptions service collection crawl The crawl is predictable because AddDistributedMode controls how options are registered (always via Configure<T> with IConfigureOptions<T> singleton instances). Factory delegates and IConfiguration.Bind paths are not used.
3 DeferredCoordinator lives in PipelineBuilder These are private implementation details tightly coupled to the DI registration in ActivateDistributedModeIfConfigured. Moving them would expose internal plumbing that doesn't need to be independently testable — they're thin async wrappers.
7 Redis Lua script scans entire queue Agreed this is a scalability concern for very large pipelines. The per-capability-queue approach is a good longer-term optimization. For the current use case (CI matrix with 5-20 modules), the full scan completes in microseconds.

Minor (HeartbeatIntervalSeconds/HeartbeatTimeoutSeconds): These properties don't exist in the codebase — false positive from the reviewer.

Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review: Distributed Execution Mode (#2430)

This is a substantial and well-structured addition. The architecture is clearly thought through, the abstractions are clean, and the test coverage is extensive. Comments below focus on architectural concerns rather than style.


1. Interface Segregation Violation in IDistributedCoordinator ⚠️

IDistributedCoordinator is a unified interface implemented by both master and worker coordinators. In the SignalR implementation:

  • SignalRWorkerCoordinator.EnqueueModuleAsync → throws NotSupportedException
  • SignalRWorkerCoordinator.WaitForResultAsync → throws NotSupportedException
  • SignalRWorkerCoordinator.GetRegisteredWorkersAsync → returns empty
  • SignalRWorkerCoordinator.SignalCompletionAsync → no-op

This violates the Interface Segregation Principle — clients are forced to depend on methods they can't implement. It also makes future implementations error-prone (new implementors won't know which methods are safe to throw on).

Better approach: Split the interface into role-specific contracts:

public interface IMasterCoordinator
{
    Task EnqueueModuleAsync(ModuleAssignment assignment, CancellationToken ct);
    Task<SerializedModuleResult> WaitForResultAsync(string moduleTypeName, CancellationToken ct);
    Task<IReadOnlyList<WorkerRegistration>> GetRegisteredWorkersAsync(CancellationToken ct);
    Task SignalCompletionAsync(CancellationToken ct);
}

public interface IWorkerCoordinator
{
    Task<ModuleAssignment?> DequeueModuleAsync(IReadOnlySet<string> capabilities, CancellationToken ct);
    Task PublishResultAsync(SerializedModuleResult result, CancellationToken ct);
    Task RegisterWorkerAsync(WorkerRegistration registration, CancellationToken ct);
}

This is especially important because IDistributedCoordinator is public — it's part of the extension point API that third-party users will implement.


2. RunIdentifierResolver Duplicated Across Three Packages ⚠️

src/ModularPipelines.Distributed.Artifacts.S3/Configuration/RunIdentifierResolver.cs and src/ModularPipelines.Distributed.Redis/Configuration/RunIdentifierResolver.cs are identical (73 lines each), both implementing: explicit config → GITHUB_SHABUILD_SOURCEVERSIONCI_COMMIT_SHAgit rev-parse HEAD → GUID fallback.

src/ModularPipelines.Distributed.Discovery.Redis/RunIdentifierResolver.cs uses a different strategy (directory hash).

The shared logic belongs in ModularPipelines.Distributed (the core package) as a single canonical resolver. Duplicated code in separate packages will diverge — e.g., if a new CI platform's SHA env var needs to be added, it'll need to be updated in 2+ places.


3. Architecture Docs Don't Match Implementation

docs/docs/distributed/architecture.md documents:

  • BroadcastCancellationAsync()
  • IsCancellationRequestedAsync()

The actual IDistributedCoordinator interface has SignalCompletionAsync() instead. The docs appear to be from an earlier design iteration. This will confuse users trying to implement custom coordinators.


4. SignalR Transport Requires Redis Anyway

The ModularPipelines.Distributed.Discovery.Redis package exists to solve "how do workers find the master's SignalR URL?" The answer is: store the URL in Redis.

This means the SignalR transport path requires:

  • Redis (for service discovery via Discovery.Redis)
  • cloudflared tunnel (to expose the master's SignalR hub to external workers)
  • A running ASP.NET Core web server on the master

Whereas the pure Redis transport path requires only Redis. For most CI use cases, the Redis-only path is simpler, cheaper, and more reliable. It's worth documenting why SignalR is preferred over Redis in scenarios where Redis is already available — the current docs don't make this case.


5. Lua Script Performance on Large Work Queues

In RedisDistributedCoordinator.TryScanAndClaimAsync, the Lua script does:

  1. LRANGE 0 -1 — fetch the entire work queue
  2. Iterate in Lua to find the first capability-matching item
  3. LREM that specific item

Lua scripts in Redis block all other operations for their duration. On a large work queue with many items, this scan could block the Redis server. Since Redis is single-threaded for command execution, other workers are blocked while one worker scans.

Better approach: Use a sorted set (ZSET) with capability-based bucket keys, or limit the scan range (LRANGE 0 N where N is configurable). This gives predictable O(N) with a bounded N.


6. Hardcoded 64KB Compression Threshold

In DistributedWorkPublisher.GatherDependencyResults:

// payloads over 64 KB compressed with GZip

This threshold is hardcoded. Modules with large dependency results (e.g., returning large file lists) may frequently cross this threshold, while others never will. Expose this as a configuration option on DistributedOptions so operators can tune it.


7. CI Matrix: 5 Runners vs Previous 3

The workflow goes from 3 matrix runners (ubuntu, windows, macos) to 5 (+ 2 extra ubuntu instances 3 and 4). This is a 67% increase in GitHub Actions compute costs.

The net benefit depends on whether the wall-clock time savings from distributed execution exceed the coordination overhead (Redis round-trips, heartbeats, serialization, artifact transfers). For a project that already parallelizes work within each runner using the module system, the marginal gain from cross-machine distribution may be small.

It would be useful to document the expected speedup rationale (e.g., "modules X, Y, Z are pinned to specific OS runners, so they need to run in parallel on different machines") to justify the cost.


8. DisablePayloadSigning = true in S3 Store

S3DistributedArtifactStore sets DisablePayloadSigning = true (needed for R2 compatibility). This means the AWS SDK won't compute a SHA-256 of the request body for signing, which removes one layer of upload integrity verification. HTTPS still protects the transfer, but silent corruption that occurs server-side between reception and storage won't be detected.

Consider using S3Config.ChecksumValidationEnabled or EnableHttpChecksumValidation (available in recent AWSSDK versions) as an alternative that works with R2 while still validating content integrity.


9. Test Project Naming Mismatch

Several integration test files (in Integration/ sub-folders under *.UnitTests projects) spin up full pipeline instances:

  • DistributedPipelineIntegrationTests.cs
  • CapabilityRoutingIntegrationTests.cs
  • MatrixExpansionIntegrationTests.cs

These are integration tests by nature but live in unit test projects. This creates confusion about test isolation requirements and may cause slowness in unit test runs. Consider moving them to separate *.IntegrationTests projects, or at minimum applying a [Category("Integration")] trait so they can be filtered.


Minor: Architecture.md Links

The architecture doc references WorkerHealthMonitor and WorkerHeartbeatService as background services, but these files don't appear in the PR diff — either they're missing from the implementation or the doc references stale class names. Worth verifying the class names match the shipped code.


Summary

Issue Severity
IDistributedCoordinator ISP violation High — public API surface affects all custom implementations
RunIdentifierResolver duplication Medium — will diverge, hard to maintain
Docs/code mismatch Medium — misleads custom coordinator authors
SignalR requires Redis anyway Medium — architectural clarity
Lua script unbounded scan Low-Medium — performance at scale
Hardcoded 64KB threshold Low
CI cost justification Low (documentation gap)
S3 payload signing Low
Integration tests in unit test projects Low

The core distributed execution model is sound — the capability routing, artifact lifecycle management, result serialization with portable paths, and the check-subscribe-recheck pattern for WaitForResultAsync are all well-implemented. The main ask is splitting IDistributedCoordinator before it becomes a public contract that's hard to change.

Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review: Distributed Execution Mode (#2430)

This is a substantial and well-structured addition. The architecture is clearly thought through, the abstractions are clean, and the test coverage is extensive. Comments below focus on architectural concerns rather than style.


1. Interface Segregation Violation in IDistributedCoordinator

IDistributedCoordinator is a unified interface implemented by both master and worker coordinators. In the SignalR implementation:

  • SignalRWorkerCoordinator.EnqueueModuleAsync throws NotSupportedException
  • SignalRWorkerCoordinator.WaitForResultAsync throws NotSupportedException
  • SignalRWorkerCoordinator.GetRegisteredWorkersAsync returns empty
  • SignalRWorkerCoordinator.SignalCompletionAsync is a no-op

This violates the Interface Segregation Principle — clients are forced to depend on methods they cannot meaningfully implement. Since IDistributedCoordinator is public and is the primary extension point for custom transports, this will mislead third-party implementors who don't know which methods they can safely throw on.

Better approach: Split into role-specific contracts:

public interface IMasterCoordinator
{
    Task EnqueueModuleAsync(ModuleAssignment assignment, CancellationToken ct);
    Task<SerializedModuleResult> WaitForResultAsync(string moduleTypeName, CancellationToken ct);
    Task<IReadOnlyList<WorkerRegistration>> GetRegisteredWorkersAsync(CancellationToken ct);
    Task SignalCompletionAsync(CancellationToken ct);
}

public interface IWorkerCoordinator
{
    Task<ModuleAssignment?> DequeueModuleAsync(IReadOnlySet<string> capabilities, CancellationToken ct);
    Task PublishResultAsync(SerializedModuleResult result, CancellationToken ct);
    Task RegisterWorkerAsync(WorkerRegistration registration, CancellationToken ct);
}

2. RunIdentifierResolver Duplicated Across Three Packages

ModularPipelines.Distributed.Artifacts.S3/Configuration/RunIdentifierResolver.cs and ModularPipelines.Distributed.Redis/Configuration/RunIdentifierResolver.cs are byte-for-byte identical (73 lines each), both implementing the same priority chain: explicit config → GITHUB_SHABUILD_SOURCEVERSIONCI_COMMIT_SHAgit rev-parse HEAD → GUID fallback.

This shared logic belongs in ModularPipelines.Distributed (the core package). Keeping it duplicated means that adding support for a new CI platform's SHA env var (e.g., GitLab's CI_COMMIT_SHA is there, but Bitbucket's BITBUCKET_COMMIT is missing) requires changes in multiple packages.


3. Architecture Docs Don't Match Implementation

docs/docs/distributed/architecture.md documents these coordinator methods:

  • BroadcastCancellationAsync()
  • IsCancellationRequestedAsync()

The actual IDistributedCoordinator interface has SignalCompletionAsync() instead. The docs appear to be from an earlier design iteration. This will mislead users trying to implement custom coordinators.


4. SignalR Transport Requires Redis Anyway

The ModularPipelines.Distributed.Discovery.Redis package exists to answer "how do workers find the master's SignalR URL?" — by storing the URL in Redis.

This means the SignalR transport path requires both Redis (for service discovery) and a cloudflared tunnel (to expose the master's ASP.NET Core hub to external workers). The Redis-only path requires only Redis. For most CI use cases, the Redis-only path is simpler and more reliable.

The current docs don't explain what scenarios justify choosing SignalR over pure Redis. If the intent is that SignalR handles push-vs-pull efficiency at scale, that case is worth spelling out explicitly.


5. Lua Script Unbounded Scan on Work Queue

In RedisDistributedCoordinator.TryScanAndClaimAsync, the Lua script does:

  1. LRANGE 0 -1 — fetches the entire work queue
  2. Iterates in Lua to find the first capability-matching item
  3. LREM that item

Redis Lua scripts block all other operations for their duration. On a large work queue with heterogeneous capability requirements, this scan grows linearly with queue depth. Since all workers compete on the same Redis server, one slow scan blocks all other workers.

Better approach: Bound the scan range with a configurable limit (LRANGE 0 N) or use capability-bucketed queues (one list per capability set) to avoid scanning.


6. Hardcoded 64KB Compression Threshold

In DistributedWorkPublisher.GatherDependencyResults, payloads over 64KB are GZip-compressed. This threshold is a hardcoded constant. Modules with large dependency results (e.g., returning large file manifests) may frequently cross this threshold while others never will. Expose it as a property on DistributedOptions so operators can tune it for their workload.


7. CI Cost: 5 Runners vs Previous 3

The workflow goes from 3 matrix runners to 5 (adding 2 extra ubuntu instances). This is a 67% increase in GitHub Actions compute. The net benefit depends on whether wall-clock time savings from distributed execution exceed the coordination overhead (Redis round-trips, heartbeats, serialization, artifact transfers) — particularly for a project that already parallelizes work within each runner.

It would be useful to document the expected speedup rationale to justify the cost (e.g., "modules pinned to specific OS runners can now run concurrently across machines").


8. DisablePayloadSigning = true in S3 Store

S3DistributedArtifactStore sets DisablePayloadSigning = true for R2 compatibility. This removes the AWS SDK's pre-upload SHA-256 body signing, which is one layer of upload integrity verification. HTTPS still protects the transfer, but this is worth a code comment explaining the trade-off, and it's worth evaluating whether newer AWSSDK checksum validation options work with R2 as an alternative.


9. Integration Tests in Unit Test Projects

Several tests that spin up full pipeline instances live in *.UnitTests projects under Integration/ sub-folders:

  • DistributedPipelineIntegrationTests.cs
  • CapabilityRoutingIntegrationTests.cs
  • MatrixExpansionIntegrationTests.cs

These are integration tests by nature. Mixing them into unit test projects makes it harder to run fast unit-only feedback loops. Consider moving them to *.IntegrationTests projects or applying a test category trait so they can be filtered separately.


Minor: Architecture.md References Possibly Stale Class Names

The architecture doc mentions WorkerHealthMonitor and WorkerHeartbeatService as background services, but these files don't appear in the PR diff. Either they're referenced by different names in the code or are missing from the implementation. Worth a quick check to ensure the documented class names match the shipped ones.


Summary

Issue Severity
IDistributedCoordinator ISP violation (public API) High
RunIdentifierResolver duplication Medium
Docs/code mismatch on coordinator methods Medium
SignalR transport requires Redis anyway (clarity) Medium
Lua script unbounded LRANGE scan Low-Medium
Hardcoded 64KB compression threshold Low
CI cost increase without documented justification Low
S3 payload signing trade-off undocumented Low
Integration tests in unit test projects Low

The core distributed execution model is sound. The capability routing, artifact lifecycle management with deduplication, result serialization with portable cross-platform paths, and the check-subscribe-recheck pattern in WaitForResultAsync are all well-implemented. The main ask before merging is splitting IDistributedCoordinator — it is the primary public extension point and will be difficult to change once users depend on it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant