feat(engine): parallel branch isolation and race strategy#16
feat(engine): parallel branch isolation and race strategy#16albertgwo wants to merge 2 commits intofeat/engine-pr3-plain-adapterfrom
Conversation
Add executeParallel to ExecutionAdapter interface with 'all' and 'first' strategies. Implement in PlainAdapter. Add walkBranch to walker for multi-node branch subgraphs with isolated state (structuredClone). Rewrite onParallel in CompiledFlow to use branch isolation, walkBranch, and adapter.executeParallel with namespace-by-branch-ID merge at join. Add nested parallel rejection in walkBranch (runtime) and structural validation (schema package).
13 tests covering: branch isolation (writes don't leak), merge at join (namespaced results), 'all' strategy (all complete, error propagation), 'first' strategy (both complete), multi-node branch subgraphs (action->switch->action chain), nested parallel rejection (walkBranch runtime + structural validation), and PlainAdapter.executeParallel unit tests.
Deploying flowprint with
|
| Latest commit: |
f27165a
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://952fa8e6.flowprint.pages.dev |
| Branch Preview URL: | https://feat-engine-pr4-parallel-iso.flowprint.pages.dev |
| executeParallel( | ||
| branches: (() => Promise<unknown>)[], | ||
| strategy: 'all' | 'first', | ||
| ): Promise<unknown[]> |
There was a problem hiding this comment.
CompiledFlow assumes executeParallel returns {branchId, state} but it's typed as Promise<unknown[]>, should we tighten executeParallel's return type to Promise<ParallelBranchResult[]>?
executeParallel(): Promise<ParallelBranchResult[]>
Finding type: Breaking Changes | Severity: 🔴 High
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
In packages/engine/src/adapters/types.ts around lines 39-42, the executeParallel method
is currently typed as returning Promise<unknown[]>, but CompiledFlow expects each
element to have the shape { branchId: string; state: Record<string, unknown> }. Change
the executeParallel signature to return Promise<ParallelBranchResult[]> (or a generic
like Promise<ParallelBranchResult[]>), and add/export a ParallelBranchResult
type/interface in the same file: interface ParallelBranchResult { branchId: string;
state: Record<string, unknown> }. Update any related exports so adapters must implement
that return shape. This will ensure adapters return the exact structure CompiledFlow
relies on and prevent runtime merging bugs.
Heads up!
Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription
| private async executeParallelFirst(branches: (() => Promise<unknown>)[]): Promise<unknown[]> { | ||
| let firstIndex = -1 | ||
| const results = await Promise.all( | ||
| branches.map(async (branch, index) => { | ||
| const result = await branch() | ||
| if (firstIndex === -1) { | ||
| firstIndex = index | ||
| } | ||
| return result | ||
| }), | ||
| ) | ||
| return results |
There was a problem hiding this comment.
executeParallelFirst tracks firstIndex locally but never returns or persists it — should we return a primary index/flag from the adapter or record it in the parallel node output?
Finding type: Logical Bugs | Severity: 🔴 High
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
In packages/engine/src/adapters/plain.ts around lines 116 to 127, the
executeParallelFirst method records firstIndex locally but never returns or persists it,
so callers cannot tell which branch finished first. Change the method signature to
return an object that includes both the results array and the primary index (for
example, Promise<{ results: unknown[]; primaryIndex: number }>) and set primaryIndex to
the index of the first-finishing branch; update executeParallel to propagate that return
type for the 'first' strategy (and for 'all' return primaryIndex = -1 or null). Also
update executeParallelAll to return the same shaped object and adjust any type
annotations so callers will receive the winner metadata.
Heads up!
Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription
| let firstIndex = -1 | ||
| const results = await Promise.all( | ||
| branches.map(async (branch, index) => { | ||
| const result = await branch() | ||
| if (firstIndex === -1) { |
There was a problem hiding this comment.
firstIndex is assigned but unused, should we remove the tracking or return/expose the first-to-complete index?
Finding type: Conciseness | Severity: 🔴 High
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
In packages/engine/src/adapters/plain.ts around lines 117 to 121 inside the private
method executeParallelFirst, the local variable firstIndex is assigned but never read or
returned. Remove the firstIndex declaration and the if (firstIndex === -1) { firstIndex
= index } assignment, and simplify the mapped async branch to just await and return each
branch's result; also update the method comment to no longer mention tracking the
first-to-finish index. Ensure the method still returns Promise<unknown[]> and run
tests/TS compile to confirm no unused locals remain.
Heads up!
Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription
| const strategy = node.join_strategy ?? 'all' | ||
| const joinNodeId = node.join | ||
|
|
||
| // Build a branch function for each branch ID. | ||
| // Each branch gets an isolated copy of the parent state. | ||
| const branchFns = node.branches.map((branchId) => { | ||
| return async (): Promise<{ branchId: string; state: Record<string, unknown> }> => { | ||
| const branchNode = doc.nodes[branchId] | ||
| if (!branchNode) { | ||
| throw new Error(`Parallel branch node "${branchId}" not found`) | ||
| } | ||
|
|
There was a problem hiding this comment.
Adding adapter.executeParallel is a package-impacting engine change, should we add a .changeset/*.md documenting the new parallel execution and the nested-parallel validation change in packages/schema/src/structural.ts?
Finding type: AI Coding Guidelines | Severity: 🟠 Medium
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
In packages/engine/src/engine/compiled-flow.ts around lines 256 to 287, the newly added
parallel execution plumbing (the strategy variable and the adapter.executeParallel call)
changes runtime behavior and is a package-impacting change. Create a .changeset/*.md
file that targets the packages/engine package and clearly documents: (1) addition of
parallel branch execution with node.join_strategy and adapter.executeParallel, (2) that
branch results are namespaced by branch ID and merged into parent state, and (3) that
this change pairs with a nested-parallel structural validation change in
packages/schema/src/structural.ts. Make the changeset indicate a minor version bump for
packages/engine and include a short summary sentence, the rationale, and the files/areas
affected (compiled-flow.ts and structural.ts). Ensure this satisfies the CLAUDE.md
requirement that package changes include a .changeset entry.
Heads up!
Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription
| // Build a branch function for each branch ID. | ||
| // Each branch gets an isolated copy of the parent state. | ||
| const branchFns = node.branches.map((branchId) => { | ||
| return async (): Promise<{ branchId: string; state: Record<string, unknown> }> => { | ||
| const branchNode = doc.nodes[branchId] | ||
| if (!branchNode) { | ||
| throw new Error(`Parallel branch node "${branchId}" not found`) | ||
| } | ||
|
|
||
| const branchPromises = node.branches.map(async (branchId) => { | ||
| const branchNode = doc.nodes[branchId] | ||
| if (!branchNode) { | ||
| throw new Error(`Parallel branch node "${branchId}" not found`) | ||
| } | ||
|
|
||
| const handler = resolvedHandlers.get(branchId) | ||
| if (!handler) { | ||
| throw new Error(`No resolved handler for parallel branch "${branchId}"`) | ||
| } | ||
|
|
||
| if (isActionNode(branchNode)) { | ||
| let result: unknown | ||
| switch (handler.type) { | ||
| case 'registered': | ||
| case 'entry_point': | ||
| result = await adapter.executeAction(branchId, handler.fn, ctx, { | ||
| metadata: branchNode.metadata as Record<string, unknown> | undefined, | ||
| }) | ||
| break | ||
| case 'expressions': { | ||
| const legacyCtx = buildLegacyContext(ctx) | ||
| const output: Record<string, unknown> = {} | ||
| for (const [key, expr] of Object.entries(handler.exprs)) { | ||
| output[key] = evaluateExpression(expr, legacyCtx, expressionTimeout) | ||
| } | ||
| result = output | ||
| break | ||
| } | ||
| case 'rules': { | ||
| const rulesDoc = loadRulesFile(handler.rulesFile, projectRoot) | ||
| const legacyCtx = buildLegacyContext(ctx) | ||
| const rulesResult = evaluateRules(rulesDoc, legacyCtx, expressionTimeout) | ||
| result = rulesResult.output | ||
| break | ||
| } | ||
| case 'native': | ||
| result = {} | ||
| break | ||
| // Isolated state: structuredClone ensures branches cannot see each other's writes | ||
| const branchState = structuredClone(ctx.state) | ||
| const branchCtx: ExecutionContext = { |
There was a problem hiding this comment.
buildCallbacks/PlainAdapter share the parent ctx.signal so branches don't abort under the 'all' strategy, contradicting the docs — should we add per-branch AbortControllers that cancel others on reject or update the contract/docs?
Finding type: Logical Bugs | Severity: 🔴 High
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
In packages/engine/src/engine/compiled-flow.ts around lines 259 to 287, the onParallel
branch-building logic currently reuses the parent ctx.signal for every branch so
branches do not get isolated AbortControllers and cannot be cancelled when a sibling
fails. Refactor this code to: (1) create an AbortController for each branch and set
branchCtx.signal to that controller.signal before calling walkBranch; (2) wrap each
branch function so that if any branch Promise rejects or its signal is aborted, you call
abort() on all other branch controllers to cancel them; and (3) ensure the
adapter.executeParallel is invoked with these wrapped branch functions (or update the
PlainAdapter to respect per-branch signals and abort others on a rejection). This will
enforce the documented contract that branches get independent signals and remaining
branches are cancelled on failure.
Heads up!
Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription
| // Write merged results into parent context | ||
| Object.assign(ctx.state, resultMap) | ||
|
|
||
| const parallelResult = resultMap | ||
|
|
There was a problem hiding this comment.
onParallel merges branch outputs into ctx.state but doesn't create a node-level entry for the parallel node, should we set ctx.state[nodeId] = resultMap or return { [nodeId]: resultMap } when merging?
ctx.state[nodeId] = resultMap
Finding type: Breaking Changes | Severity: 🔴 High
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
In packages/engine/src/engine/compiled-flow.ts around lines 296 to 300, the onParallel
handler merges branch outputs into ctx.state but does not create the aggregated entry
for the parallel node itself. Modify the code so that after merging branch results you
also set ctx.state[nodeId] = resultMap and set parallelResult = { [nodeId]: resultMap }
(and ensure the NodeExecutionRecord.output uses that node-scoped object). This will
restore the previous contract where the parallel node has an entry in ctx.state and the
node execution output.
Heads up!
Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription
| let currentNodeId: string | undefined = startNodeId | ||
|
|
||
| while (currentNodeId) { | ||
| // Stop when we reach the join node | ||
| if (currentNodeId === stopNodeId) { | ||
| break | ||
| } | ||
|
|
||
| if (ctx.signal.aborted) { | ||
| break | ||
| } | ||
|
|
There was a problem hiding this comment.
walkBranch duplicates the entire node-processing loop that already lives in walkGraph (see lines 113‑186). Both functions iterate nodes, call the same callbacks for trigger/action/wait/switch/error/terminal nodes, and merge outputs, so any future node type change or callback adjustment needs to be applied in two places. Can we factor the common traversal loop into a shared helper (e.g. runNodeLoop(ctx, callbacks, { stopAtNodeId, allowParallel })) and let walkGraph and walkBranch reuse it with different stop conditions and parallel-node checks? This keeps the behavior in one location and prevents the two implementations from drifting apart whenever we add a new node type or change error handling.
Finding type: Code Dedup and Conventions | Severity: 🟢 Low
Want Baz to fix this for you? Activate Fixer
Heads up!
Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription
Code Review FindingsCritical (90-100 confidence)1.
2.
Important (80-89 confidence)3. JSDoc on
4.
5. Nested parallel detection
6. "first" strategy test doesn't test race semantics (83)
|
User description
Summary
Dependency
Base: `feat/engine-pr3-plain-adapter`
PR 8 of 16 in the execution engine implementation.
Test plan
Generated description
Below is a concise technical summary of the changes proposed in this PR:
Add configurable parallel execution strategies so
PlainAdaptercan run branches in eitherallorfirstmode andCompiledFlowmerges the branch outputs at the join node while honoring each node’s join strategy. Enable the walker to traverse each branch with isolated state copies, preventing nested parallels and ensuring branch-level validation and tests capture the new flows.PlainAdapter.executeParallel(allvsfirst), expose it through theExecutionAdaptercontract, and invoke it from the parallel callbacks so branch results merge into the parent state after each join while keeping the adapter and flow logic aligned with the new strategies (covered by strategy-focused tests).Modified files (6)
Latest Contributors(0)
walkBranch, update walker exports, and enforce/docs in the schema so branches execute with isolated state, reject nested parallels, and the new walker plus validation tests guard those invariants.Modified files (4)
Latest Contributors(0)