Skip to content

feat(engine): wait/signal system with Clock interface#18

Open
albertgwo wants to merge 3 commits intofeat/engine-pr3-plain-adapterfrom
feat/engine-pr6-wait-signal-clock
Open

feat(engine): wait/signal system with Clock interface#18
albertgwo wants to merge 3 commits intofeat/engine-pr3-plain-adapterfrom
feat/engine-pr6-wait-signal-clock

Conversation

@albertgwo
Copy link
Copy Markdown
Contributor

@albertgwo albertgwo commented Mar 19, 2026

User description

Summary

  • Add `Clock` interface with `RealClock` and `TestClock` implementations
  • Add wait/signal system: nodes can wait for named signals with optional timeouts
  • Add `Execution` handle for sending signals to running workflows
  • TestClock enables deterministic time-dependent testing

Dependency

Base: `feat/engine-pr3-plain-adapter` (branches from adapter, parallel to PR 4/5)

PR 10 of 16 in the execution engine implementation.

Test plan

  • Wait node blocks until signal received
  • Signal delivery resumes waiting nodes
  • Timeout fires when signal not received in time
  • TestClock allows deterministic time advancement

Generated description

Below is a concise technical summary of the changes proposed in this PR:
Enable wait nodes by coordinating PlainAdapter.waitForEvent, the new Execution handle, and CompiledFlow.start so workflows pause for signals, surface waiting state/TTL, and resolve traces even when timeouts route to successors. Provide a shared Clock abstraction with RealClock, TestClock, and parseDuration plus supporting exports/tests so signal timeouts and paused-execution TTLs behave deterministically in both production and test environments.

TopicDetails
Wait/Signal Flow Implement the wait/signal control flow by extending PlainAdapter, Execution, and CompiledFlow so wait nodes suspend on waitForEvent, route timeout_next, surface waiting status/trace data, and let callers resume workflows with Execution.signal while honoring signal validation, TTLs, and existing engine hooks.
Modified files (9)
  • packages/engine/src/__tests__/engine/wait-signal.test.ts
  • packages/engine/src/adapters/index.ts
  • packages/engine/src/adapters/plain.ts
  • packages/engine/src/adapters/types.ts
  • packages/engine/src/engine/compiled-flow.ts
  • packages/engine/src/engine/execution.ts
  • packages/engine/src/engine/index.ts
  • packages/engine/src/engine/types.ts
  • packages/engine/src/index.ts
Latest Contributors(0)
UserCommitDate
Clock abstraction Provide deterministic timing support by introducing the Clock interface, concrete RealClock/TestClock implementations, parseDuration, and their tests so wait timeouts and TTL-driven expirations run consistently across prod and test scenarios.
Modified files (3)
  • packages/engine/src/__tests__/engine/clock.test.ts
  • packages/engine/src/engine/clock.ts
  • packages/engine/src/engine/duration.ts
Latest Contributors(0)
UserCommitDate
This pull request is reviewed by Baz. Review like a pro on (Baz).

Add Clock abstraction for deterministic time control in tests:
- RealClock wraps Date.now() and native setTimeout/clearTimeout
- TestClock with advance() for synchronous timer triggering
- ISO 8601 duration parser (PT format + legacy shorthand)
- ValidateSignalFn type and new EngineOptions: clock, validateSignal, pausedExecutionTTL
- Execution class: status tracking, signal delivery, TestClock time advancement, TTL
- PlainAdapter.waitForEvent(): promise-based suspension with timeout support
- PlainAdapter.deliverSignal(): external signal delivery to waiting nodes
- WaitTimeoutError for timeout expiration
- CompiledFlow.start(): async flow execution that handles wait nodes
- resolveWaitNext callback routes to timeout_next on timeout
- execute() still throws "use start()" for flows with wait nodes
Clock tests (21):
- RealClock delegates to Date.now() and native timers
- TestClock: starts at 0, advance fires timers in order, boundary, clearTimeout
- parseDuration: PT format, legacy shorthand, error cases

Wait/signal tests (17):
- Wait node pauses execution with correct status and waitingFor
- Signal resumes execution and data merges into state
- Multiple sequential waits
- Timeout routes to timeout_next via TestClock.advance
- Wrong signal name / non-waiting state / validateSignal hook rejection
- execute() throws on wait nodes
- Result promise resolves, completedResult getter
- Paused execution TTL expiration and non-interference
- advanceTime() with RealClock throws, ISO 8601 parsing
- Trace recording includes wait nodes
Comment on lines +123 to +127
return new Promise<unknown>((resolve, reject) => {
const entry: {
resolve: (data: unknown) => void
reject: (err: Error) => void
eventName: string
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waitForEvent doesn't remove entries from pendingWaits on TTL expiry, allowing late signals to revive failed executions; should we surface a cancellation hook on Execution.fail to reject and delete the pending wait?

Finding type: Logical Bugs | Severity: 🔴 High


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In packages/engine/src/adapters/plain.ts around lines 123 to 127, the waitForEvent
method creates a pending wait entry but there is no external cancellation hook, so late
signals can revive a failed execution. Add a public method cancelPendingWait(nodeId:
string, err?: Error): boolean that looks up the pending entry, clears its timeoutId via
this.clock.clearTimeout, deletes it from this.pendingWaits, calls entry.reject(err ??
new Error('cancelled')), and returns true if found. Update waitForEvent to rely on the
same entry shape and ensure timeouts still clear the entry via the new method. Also
update the ExecutionAdapter type (and any callers like Execution.fail) to call
adapter.cancelPendingWait(nodeId, new WaitTimeoutError(...)) when an execution is marked
failed so pending waits are rejected and cannot be resumed by late signals.

Heads up!

Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription

Comment on lines +83 to +90
start(input: Record<string, unknown>): Execution {
const adapter = this.adapter
if (!(adapter instanceof PlainAdapter)) {
throw new Error('start() requires PlainAdapter (or a subclass)')
}

const ttl = this.options.pausedExecutionTTL ?? DEFAULT_PAUSED_TTL
const execution = new Execution(adapter, this.clock, this.options.validateSignal, ttl)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CompiledFlow.start() throws unless the adapter is PlainAdapter — should we base start/Execution on an adapter interface that requires waitForEvent/deliverSignal instead of hard-guarding PlainAdapter?

Finding type: Breaking Changes | Severity: 🔴 High


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In packages/engine/src/engine/compiled-flow.ts around lines 83 to 90, the start() method
currently throws unless this.adapter is an instance of PlainAdapter. Change start() to
accept any adapter that implements the wait/signal contract instead of hard-checking
PlainAdapter: replace the instanceof guard with a runtime type guard that checks for the
presence of waitForEvent (and related methods), update the Execution construction and
the runAsync signature to accept that adapter interface (or a new
WaitableExecutionAdapter type extending ExecutionAdapter with waitForEvent), and change
any places expecting PlainAdapter (runAsync and buildCallbacks parameters) to use the
more general adapter type. This will let custom adapters that implement waitForEvent
participate without breaking existing non-PlainAdapter adapters.

Heads up!

Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription

Comment on lines +422 to +430
try {
signalData = await plainAdapter.waitForEvent(nodeId, node.event, timeoutMs)
} catch (err: unknown) {
// WaitTimeoutError — route to timeout_next if available
if (err instanceof Error && err.name === 'WaitTimeoutError' && node.timeout_next) {
signalData = undefined
} else {
throw err
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onWait rethrows WaitTimeoutError when timeout_next is missing — should we treat the timeout as non‑fatal by setting signalData = undefined so resolveWaitNext routes to node.next?

Finding type: Logical Bugs | Severity: 🔴 High


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In packages/engine/src/engine/compiled-flow.ts around lines 422 to 430, the onWait
handler currently rethrows a WaitTimeoutError when a wait times out and
node.timeout_next is not present. Modify the catch so that if the caught error is a
WaitTimeoutError, set signalData = undefined (i.e., treat the timeout as a non-fatal
missing signal) instead of rethrowing, allowing resolveWaitNext to route to node.next;
only rethrow other error types. Keep the rest of the onWait logic unchanged.

Heads up!

Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription

Comment on lines 60 to +71
// Engine
export { FlowprintEngine, CompiledFlow } from './engine/index.js'
export type { EngineOptions, EngineHooks, ExecutionResult } from './engine/index.js'
export {
FlowprintEngine,
CompiledFlow,
Execution,
RealClock,
TestClock,
parseDuration,
} from './engine/index.js'
export type {
Clock,
ExecutionStatus,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

packages/engine added package-level exports but lacks a .changeset/*.md entry — should we add a changeset documenting the engine API changes?

Finding type: AI Coding Guidelines | Severity: 🟢 Low


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In packages/engine/src/index.ts around lines 60-80 the module-level exports were
extended (Execution, RealClock, TestClock, parseDuration and re-export of
WaitTimeoutError). This changed the public package API but no .changeset entry was
added. Create a new file under .changeset (e.g. a timestamped .md in the repo root
.changeset directory) that documents these engine API changes: list the package name
(packages/engine), briefly summarize the added exports, and mark the release type
(suggest minor if this is a backwards-compatible API addition, otherwise patch). Ensure
the changeset follows the repo's changeset format (heading, summary, and releases
mapping to the package) so it will be picked up by the release tooling.

Heads up!

Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription

Comment on lines +131 to +135
this.ttlTimerId = this.clock.setTimeout(() => {
if (this._status === 'waiting') {
this.fail(
new Error(
`Paused execution expired after ${this.ttl}ms TTL (waiting for '${this._waitingFor}')`,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TTL only calls Execution.fail and doesn't cancel the pending plainAdapter.waitForEvent, leaving runAsync stuck—should we reject or cancel that wait and clear pendingWaits so runAsync can unwind?

Finding type: Logical Bugs | Severity: 🔴 High


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In packages/engine/src/engine/execution.ts around lines 131-135, the TTL callback inside
startTtl only calls this.fail(...) but does not cancel/reject the pending wait held by
the PlainAdapter, leaving runAsync blocked. Modify the TTL expiry path to first cancel
or reject the adapter's pending wait for this._waitingNodeId (e.g. call a new or
existing adapter method like adapter.cancelWait(this._waitingNodeId) or
adapter.rejectWait(this._waitingNodeId, error)), then call this.fail(error). If
PlainAdapter lacks such an API, update the PlainAdapter type and adapter implementations
to provide a cancel/reject-wait method and use it here so the underlying wait promise in
runAsync is unblocked when TTL expires.

Heads up!

Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription

@cloudflare-workers-and-pages
Copy link
Copy Markdown

Deploying flowprint with  Cloudflare Pages  Cloudflare Pages

Latest commit: 429e21d
Status: ✅  Deploy successful!
Preview URL: https://d544d6ff.flowprint.pages.dev
Branch Preview URL: https://feat-engine-pr6-wait-signal.flowprint.pages.dev

View logs

@albertgwo
Copy link
Copy Markdown
Contributor Author

Code Review Findings

Critical (90-100 confidence)

1. TestClock.advance() does not handle timers scheduled by timer callbacks (95)

  • File: packages/engine/src/engine/clock.ts, advance() method
  • Collects all pending timers up front with this.timers.filter(...), then iterates over snapshot. If a timer callback schedules a new timer within the advance window, it's missed. Real-world setTimeout does not have this limitation.
  • Fix: Replace with loop that processes timers one at a time, re-scanning after each callback fires:
while (true) {
  // Find earliest timer <= targetTime
  // If none, break
  // Fire it, update currentTime
}

2. TTL timer and wait-timeout race causes double-rejection (92)

  • Files: packages/engine/src/engine/execution.ts:~112-133 and compiled-flow.ts:~93-97
  • When both fire simultaneously, fail() called twice. _error overwritten with whichever fires second. No terminal-state guard.
  • Fix: Add guard to complete() and fail():
fail(error: Error): void {
  if (this._status === 'completed' || this._status === 'failed') return
  // ...
}

Important (80-89 confidence)

3. PlainAdapter.pendingWaits shared across all start() calls (85)

  • File: packages/engine/src/adapters/plain.ts:~63
  • Same CompiledFlow instance creates one PlainAdapter. Two concurrent start() calls with same nodeId overwrite each other's pending wait. First execution's wait never resolved.
  • Fix: Create new PlainAdapter per start() call, or scope pendingWaits by execution ID.

4. Missing changeset file (82)

  • Adds Execution, Clock, TestClock, RealClock, parseDuration, WaitTimeoutError, ValidateSignalFn, ExecutionStatus to public API.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant