Skip to content

feat(engine): parallel branch isolation and race strategy#16

Open
albertgwo wants to merge 2 commits intofeat/engine-pr3-plain-adapterfrom
feat/engine-pr4-parallel-isolation
Open

feat(engine): parallel branch isolation and race strategy#16
albertgwo wants to merge 2 commits intofeat/engine-pr3-plain-adapterfrom
feat/engine-pr4-parallel-isolation

Conversation

@albertgwo
Copy link
Copy Markdown
Contributor

@albertgwo albertgwo commented Mar 19, 2026

User description

Summary

  • Add parallel branch execution with isolated variable scopes
  • Implement `walkBranch` for subgraph traversal within parallel nodes
  • Add `race` strategy (first-completed wins) alongside default `all` strategy
  • Branch results are merged at the join node

Dependency

Base: `feat/engine-pr3-plain-adapter`

PR 8 of 16 in the execution engine implementation.

Test plan

  • Parallel branches execute independently with isolated scopes
  • Race strategy completes when first branch finishes
  • All strategy waits for every branch
  • Branch results merge correctly at join
  • Errors in one branch don't corrupt other branches

Generated description

Below is a concise technical summary of the changes proposed in this PR:
Add configurable parallel execution strategies so PlainAdapter can run branches in either all or first mode and CompiledFlow merges the branch outputs at the join node while honoring each node’s join strategy. Enable the walker to traverse each branch with isolated state copies, preventing nested parallels and ensuring branch-level validation and tests capture the new flows.

TopicDetails
Parallel execution Implement PlainAdapter.executeParallel (all vs first), expose it through the ExecutionAdapter contract, and invoke it from the parallel callbacks so branch results merge into the parent state after each join while keeping the adapter and flow logic aligned with the new strategies (covered by strategy-focused tests).
Modified files (6)
  • packages/engine/src/__tests__/engine/adapter.test.ts
  • packages/engine/src/__tests__/engine/parallel.test.ts
  • packages/engine/src/adapters/plain.ts
  • packages/engine/src/adapters/types.ts
  • packages/engine/src/engine/compiled-flow.ts
  • packages/engine/src/index.ts
Latest Contributors(0)
UserCommitDate
Branch isolation Ensure each branch walks a subgraph via walkBranch, update walker exports, and enforce/docs in the schema so branches execute with isolated state, reject nested parallels, and the new walker plus validation tests guard those invariants.
Modified files (4)
  • packages/engine/src/__tests__/engine/parallel.test.ts
  • packages/engine/src/walker/index.ts
  • packages/engine/src/walker/walk.ts
  • packages/schema/src/structural.ts
Latest Contributors(0)
UserCommitDate
This pull request is reviewed by Baz. Review like a pro on (Baz).

Add executeParallel to ExecutionAdapter interface with 'all' and 'first'
strategies. Implement in PlainAdapter. Add walkBranch to walker for
multi-node branch subgraphs with isolated state (structuredClone).
Rewrite onParallel in CompiledFlow to use branch isolation, walkBranch,
and adapter.executeParallel with namespace-by-branch-ID merge at join.
Add nested parallel rejection in walkBranch (runtime) and structural
validation (schema package).
13 tests covering: branch isolation (writes don't leak), merge at join
(namespaced results), 'all' strategy (all complete, error propagation),
'first' strategy (both complete), multi-node branch subgraphs
(action->switch->action chain), nested parallel rejection (walkBranch
runtime + structural validation), and PlainAdapter.executeParallel
unit tests.
@cloudflare-workers-and-pages
Copy link
Copy Markdown

Deploying flowprint with  Cloudflare Pages  Cloudflare Pages

Latest commit: f27165a
Status: ✅  Deploy successful!
Preview URL: https://952fa8e6.flowprint.pages.dev
Branch Preview URL: https://feat-engine-pr4-parallel-iso.flowprint.pages.dev

View logs

Comment on lines +39 to +42
executeParallel(
branches: (() => Promise<unknown>)[],
strategy: 'all' | 'first',
): Promise<unknown[]>
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CompiledFlow assumes executeParallel returns {branchId, state} but it's typed as Promise<unknown[]>, should we tighten executeParallel's return type to Promise<ParallelBranchResult[]>?

executeParallel(): Promise<ParallelBranchResult[]>

Finding type: Breaking Changes | Severity: 🔴 High


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In packages/engine/src/adapters/types.ts around lines 39-42, the executeParallel method
is currently typed as returning Promise<unknown[]>, but CompiledFlow expects each
element to have the shape { branchId: string; state: Record<string, unknown> }. Change
the executeParallel signature to return Promise<ParallelBranchResult[]> (or a generic
like Promise<ParallelBranchResult[]>), and add/export a ParallelBranchResult
type/interface in the same file: interface ParallelBranchResult { branchId: string;
state: Record<string, unknown> }. Update any related exports so adapters must implement
that return shape. This will ensure adapters return the exact structure CompiledFlow
relies on and prevent runtime merging bugs.

Heads up!

Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription

Comment on lines +116 to +127
private async executeParallelFirst(branches: (() => Promise<unknown>)[]): Promise<unknown[]> {
let firstIndex = -1
const results = await Promise.all(
branches.map(async (branch, index) => {
const result = await branch()
if (firstIndex === -1) {
firstIndex = index
}
return result
}),
)
return results
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executeParallelFirst tracks firstIndex locally but never returns or persists it — should we return a primary index/flag from the adapter or record it in the parallel node output?

Finding type: Logical Bugs | Severity: 🔴 High


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In packages/engine/src/adapters/plain.ts around lines 116 to 127, the
executeParallelFirst method records firstIndex locally but never returns or persists it,
so callers cannot tell which branch finished first. Change the method signature to
return an object that includes both the results array and the primary index (for
example, Promise<{ results: unknown[]; primaryIndex: number }>) and set primaryIndex to
the index of the first-finishing branch; update executeParallel to propagate that return
type for the 'first' strategy (and for 'all' return primaryIndex = -1 or null). Also
update executeParallelAll to return the same shaped object and adjust any type
annotations so callers will receive the winner metadata.

Heads up!

Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription

Comment on lines +117 to +121
let firstIndex = -1
const results = await Promise.all(
branches.map(async (branch, index) => {
const result = await branch()
if (firstIndex === -1) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

firstIndex is assigned but unused, should we remove the tracking or return/expose the first-to-complete index?

Finding type: Conciseness | Severity: 🔴 High


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In packages/engine/src/adapters/plain.ts around lines 117 to 121 inside the private
method executeParallelFirst, the local variable firstIndex is assigned but never read or
returned. Remove the firstIndex declaration and the if (firstIndex === -1) { firstIndex
= index } assignment, and simplify the mapped async branch to just await and return each
branch's result; also update the method comment to no longer mention tracking the
first-to-finish index. Ensure the method still returns Promise<unknown[]> and run
tests/TS compile to confirm no unused locals remain.

Heads up!

Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription

Comment on lines +256 to 267
const strategy = node.join_strategy ?? 'all'
const joinNodeId = node.join

// Build a branch function for each branch ID.
// Each branch gets an isolated copy of the parent state.
const branchFns = node.branches.map((branchId) => {
return async (): Promise<{ branchId: string; state: Record<string, unknown> }> => {
const branchNode = doc.nodes[branchId]
if (!branchNode) {
throw new Error(`Parallel branch node "${branchId}" not found`)
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding adapter.executeParallel is a package-impacting engine change, should we add a .changeset/*.md documenting the new parallel execution and the nested-parallel validation change in packages/schema/src/structural.ts?

Finding type: AI Coding Guidelines | Severity: 🟠 Medium


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In packages/engine/src/engine/compiled-flow.ts around lines 256 to 287, the newly added
parallel execution plumbing (the strategy variable and the adapter.executeParallel call)
changes runtime behavior and is a package-impacting change. Create a .changeset/*.md
file that targets the packages/engine package and clearly documents: (1) addition of
parallel branch execution with node.join_strategy and adapter.executeParallel, (2) that
branch results are namespaced by branch ID and merged into parent state, and (3) that
this change pairs with a nested-parallel structural validation change in
packages/schema/src/structural.ts. Make the changeset indicate a minor version bump for
packages/engine and include a short summary sentence, the rationale, and the files/areas
affected (compiled-flow.ts and structural.ts). Ensure this satisfies the CLAUDE.md
requirement that package changes include a .changeset entry.

Heads up!

Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription

Comment on lines +259 to +270
// Build a branch function for each branch ID.
// Each branch gets an isolated copy of the parent state.
const branchFns = node.branches.map((branchId) => {
return async (): Promise<{ branchId: string; state: Record<string, unknown> }> => {
const branchNode = doc.nodes[branchId]
if (!branchNode) {
throw new Error(`Parallel branch node "${branchId}" not found`)
}

const branchPromises = node.branches.map(async (branchId) => {
const branchNode = doc.nodes[branchId]
if (!branchNode) {
throw new Error(`Parallel branch node "${branchId}" not found`)
}

const handler = resolvedHandlers.get(branchId)
if (!handler) {
throw new Error(`No resolved handler for parallel branch "${branchId}"`)
}

if (isActionNode(branchNode)) {
let result: unknown
switch (handler.type) {
case 'registered':
case 'entry_point':
result = await adapter.executeAction(branchId, handler.fn, ctx, {
metadata: branchNode.metadata as Record<string, unknown> | undefined,
})
break
case 'expressions': {
const legacyCtx = buildLegacyContext(ctx)
const output: Record<string, unknown> = {}
for (const [key, expr] of Object.entries(handler.exprs)) {
output[key] = evaluateExpression(expr, legacyCtx, expressionTimeout)
}
result = output
break
}
case 'rules': {
const rulesDoc = loadRulesFile(handler.rulesFile, projectRoot)
const legacyCtx = buildLegacyContext(ctx)
const rulesResult = evaluateRules(rulesDoc, legacyCtx, expressionTimeout)
result = rulesResult.output
break
}
case 'native':
result = {}
break
// Isolated state: structuredClone ensures branches cannot see each other's writes
const branchState = structuredClone(ctx.state)
const branchCtx: ExecutionContext = {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buildCallbacks/PlainAdapter share the parent ctx.signal so branches don't abort under the 'all' strategy, contradicting the docs — should we add per-branch AbortControllers that cancel others on reject or update the contract/docs?

Finding type: Logical Bugs | Severity: 🔴 High


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In packages/engine/src/engine/compiled-flow.ts around lines 259 to 287, the onParallel
branch-building logic currently reuses the parent ctx.signal for every branch so
branches do not get isolated AbortControllers and cannot be cancelled when a sibling
fails. Refactor this code to: (1) create an AbortController for each branch and set
branchCtx.signal to that controller.signal before calling walkBranch; (2) wrap each
branch function so that if any branch Promise rejects or its signal is aborted, you call
abort() on all other branch controllers to cancel them; and (3) ensure the
adapter.executeParallel is invoked with these wrapped branch functions (or update the
PlainAdapter to respect per-branch signals and abort others on a rejection). This will
enforce the documented contract that branches get independent signals and remaining
branches are cancelled on failure.

Heads up!

Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription

Comment on lines +296 to 300
// Write merged results into parent context
Object.assign(ctx.state, resultMap)

const parallelResult = resultMap

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onParallel merges branch outputs into ctx.state but doesn't create a node-level entry for the parallel node, should we set ctx.state[nodeId] = resultMap or return { [nodeId]: resultMap } when merging?
ctx.state[nodeId] = resultMap

Finding type: Breaking Changes | Severity: 🔴 High


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In packages/engine/src/engine/compiled-flow.ts around lines 296 to 300, the onParallel
handler merges branch outputs into ctx.state but does not create the aggregated entry
for the parallel node itself. Modify the code so that after merging branch results you
also set ctx.state[nodeId] = resultMap and set parallelResult = { [nodeId]: resultMap }
(and ensure the NodeExecutionRecord.output uses that node-scoped object). This will
restore the previous contract where the parallel node has an entry in ctx.state and the
node execution output.

Heads up!

Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription

Comment on lines +214 to +225
let currentNodeId: string | undefined = startNodeId

while (currentNodeId) {
// Stop when we reach the join node
if (currentNodeId === stopNodeId) {
break
}

if (ctx.signal.aborted) {
break
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

walkBranch duplicates the entire node-processing loop that already lives in walkGraph (see lines 113‑186). Both functions iterate nodes, call the same callbacks for trigger/action/wait/switch/error/terminal nodes, and merge outputs, so any future node type change or callback adjustment needs to be applied in two places. Can we factor the common traversal loop into a shared helper (e.g. runNodeLoop(ctx, callbacks, { stopAtNodeId, allowParallel })) and let walkGraph and walkBranch reuse it with different stop conditions and parallel-node checks? This keeps the behavior in one location and prevents the two implementations from drifting apart whenever we add a new node type or change error handling.

Finding type: Code Dedup and Conventions | Severity: 🟢 Low


Want Baz to fix this for you? Activate Fixer

Heads up!

Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription

@albertgwo
Copy link
Copy Markdown
Contributor Author

Code Review Findings

Critical (90-100 confidence)

1. executeParallelFirst race strategy is dead code (95)

  • File: packages/engine/src/adapters/plain.ts:~118-130
  • firstIndex is computed but never returned or used. The method runs Promise.all and returns all results — functionally identical to executeParallelAll. No Promise.race, no abort signals to cancel losers. The "first-completed wins, others cancelled" behavior from the interface docstring does not exist.
  • Fix: Either implement with Promise.race + AbortController to cancel losers, or remove the "first" strategy until properly implemented.

2. walkBranch shares the callbacks object with parent and siblings (92)

  • File: packages/engine/src/engine/compiled-flow.ts:~278
  • callbacks built once in buildCallbacks and passed to all concurrent walkBranch calls. walkGraph mutates callbacks.onStep in-place, so all branches share the same interceptor pushing to a single trace array in nondeterministic order.
  • Fix: Create per-branch copy of callbacks with its own onStep.

Important (80-89 confidence)

3. JSDoc on ExecutionAdapter.executeParallel promises abort semantics that don't exist (88)

  • File: packages/engine/src/adapters/types.ts:~35-43
  • Documents per-branch AbortController and "abort remaining branches on failure" — neither implemented.

4. walkBranch does not track compensation (85)

  • File: packages/engine/src/walker/walk.ts, walkBranch
  • If a branch contains action nodes with compensation handlers, those handlers are silently dropped. Branch failures don't trigger rollback.

5. Nested parallel detection break only stops one branch scan (82)

  • File: packages/schema/src/structural.ts:~268
  • break exits inner while loop but outer for loop continues. Could report duplicate errors.

6. "first" strategy test doesn't test race semantics (83)

  • File: packages/engine/src/__tests__/engine/parallel.test.ts:~381-403
  • Asserts both branches complete with both results available — identical to "all" strategy behavior.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant