feat(engine): wait/signal system with Clock interface#18
feat(engine): wait/signal system with Clock interface#18albertgwo wants to merge 3 commits intofeat/engine-pr3-plain-adapterfrom
Conversation
Add Clock abstraction for deterministic time control in tests: - RealClock wraps Date.now() and native setTimeout/clearTimeout - TestClock with advance() for synchronous timer triggering - ISO 8601 duration parser (PT format + legacy shorthand) - ValidateSignalFn type and new EngineOptions: clock, validateSignal, pausedExecutionTTL
- Execution class: status tracking, signal delivery, TestClock time advancement, TTL - PlainAdapter.waitForEvent(): promise-based suspension with timeout support - PlainAdapter.deliverSignal(): external signal delivery to waiting nodes - WaitTimeoutError for timeout expiration - CompiledFlow.start(): async flow execution that handles wait nodes - resolveWaitNext callback routes to timeout_next on timeout - execute() still throws "use start()" for flows with wait nodes
Clock tests (21): - RealClock delegates to Date.now() and native timers - TestClock: starts at 0, advance fires timers in order, boundary, clearTimeout - parseDuration: PT format, legacy shorthand, error cases Wait/signal tests (17): - Wait node pauses execution with correct status and waitingFor - Signal resumes execution and data merges into state - Multiple sequential waits - Timeout routes to timeout_next via TestClock.advance - Wrong signal name / non-waiting state / validateSignal hook rejection - execute() throws on wait nodes - Result promise resolves, completedResult getter - Paused execution TTL expiration and non-interference - advanceTime() with RealClock throws, ISO 8601 parsing - Trace recording includes wait nodes
| return new Promise<unknown>((resolve, reject) => { | ||
| const entry: { | ||
| resolve: (data: unknown) => void | ||
| reject: (err: Error) => void | ||
| eventName: string |
There was a problem hiding this comment.
waitForEvent doesn't remove entries from pendingWaits on TTL expiry, allowing late signals to revive failed executions; should we surface a cancellation hook on Execution.fail to reject and delete the pending wait?
Finding type: Logical Bugs | Severity: 🔴 High
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
In packages/engine/src/adapters/plain.ts around lines 123 to 127, the waitForEvent
method creates a pending wait entry but there is no external cancellation hook, so late
signals can revive a failed execution. Add a public method cancelPendingWait(nodeId:
string, err?: Error): boolean that looks up the pending entry, clears its timeoutId via
this.clock.clearTimeout, deletes it from this.pendingWaits, calls entry.reject(err ??
new Error('cancelled')), and returns true if found. Update waitForEvent to rely on the
same entry shape and ensure timeouts still clear the entry via the new method. Also
update the ExecutionAdapter type (and any callers like Execution.fail) to call
adapter.cancelPendingWait(nodeId, new WaitTimeoutError(...)) when an execution is marked
failed so pending waits are rejected and cannot be resumed by late signals.
Heads up!
Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription
| start(input: Record<string, unknown>): Execution { | ||
| const adapter = this.adapter | ||
| if (!(adapter instanceof PlainAdapter)) { | ||
| throw new Error('start() requires PlainAdapter (or a subclass)') | ||
| } | ||
|
|
||
| const ttl = this.options.pausedExecutionTTL ?? DEFAULT_PAUSED_TTL | ||
| const execution = new Execution(adapter, this.clock, this.options.validateSignal, ttl) |
There was a problem hiding this comment.
CompiledFlow.start() throws unless the adapter is PlainAdapter — should we base start/Execution on an adapter interface that requires waitForEvent/deliverSignal instead of hard-guarding PlainAdapter?
Finding type: Breaking Changes | Severity: 🔴 High
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
In packages/engine/src/engine/compiled-flow.ts around lines 83 to 90, the start() method
currently throws unless this.adapter is an instance of PlainAdapter. Change start() to
accept any adapter that implements the wait/signal contract instead of hard-checking
PlainAdapter: replace the instanceof guard with a runtime type guard that checks for the
presence of waitForEvent (and related methods), update the Execution construction and
the runAsync signature to accept that adapter interface (or a new
WaitableExecutionAdapter type extending ExecutionAdapter with waitForEvent), and change
any places expecting PlainAdapter (runAsync and buildCallbacks parameters) to use the
more general adapter type. This will let custom adapters that implement waitForEvent
participate without breaking existing non-PlainAdapter adapters.
Heads up!
Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription
| try { | ||
| signalData = await plainAdapter.waitForEvent(nodeId, node.event, timeoutMs) | ||
| } catch (err: unknown) { | ||
| // WaitTimeoutError — route to timeout_next if available | ||
| if (err instanceof Error && err.name === 'WaitTimeoutError' && node.timeout_next) { | ||
| signalData = undefined | ||
| } else { | ||
| throw err | ||
| } |
There was a problem hiding this comment.
onWait rethrows WaitTimeoutError when timeout_next is missing — should we treat the timeout as non‑fatal by setting signalData = undefined so resolveWaitNext routes to node.next?
Finding type: Logical Bugs | Severity: 🔴 High
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
In packages/engine/src/engine/compiled-flow.ts around lines 422 to 430, the onWait
handler currently rethrows a WaitTimeoutError when a wait times out and
node.timeout_next is not present. Modify the catch so that if the caught error is a
WaitTimeoutError, set signalData = undefined (i.e., treat the timeout as a non-fatal
missing signal) instead of rethrowing, allowing resolveWaitNext to route to node.next;
only rethrow other error types. Keep the rest of the onWait logic unchanged.
Heads up!
Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription
| // Engine | ||
| export { FlowprintEngine, CompiledFlow } from './engine/index.js' | ||
| export type { EngineOptions, EngineHooks, ExecutionResult } from './engine/index.js' | ||
| export { | ||
| FlowprintEngine, | ||
| CompiledFlow, | ||
| Execution, | ||
| RealClock, | ||
| TestClock, | ||
| parseDuration, | ||
| } from './engine/index.js' | ||
| export type { | ||
| Clock, | ||
| ExecutionStatus, |
There was a problem hiding this comment.
packages/engine added package-level exports but lacks a .changeset/*.md entry — should we add a changeset documenting the engine API changes?
Finding type: AI Coding Guidelines | Severity: 🟢 Low
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
In packages/engine/src/index.ts around lines 60-80 the module-level exports were
extended (Execution, RealClock, TestClock, parseDuration and re-export of
WaitTimeoutError). This changed the public package API but no .changeset entry was
added. Create a new file under .changeset (e.g. a timestamped .md in the repo root
.changeset directory) that documents these engine API changes: list the package name
(packages/engine), briefly summarize the added exports, and mark the release type
(suggest minor if this is a backwards-compatible API addition, otherwise patch). Ensure
the changeset follows the repo's changeset format (heading, summary, and releases
mapping to the package) so it will be picked up by the release tooling.
Heads up!
Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription
| this.ttlTimerId = this.clock.setTimeout(() => { | ||
| if (this._status === 'waiting') { | ||
| this.fail( | ||
| new Error( | ||
| `Paused execution expired after ${this.ttl}ms TTL (waiting for '${this._waitingFor}')`, |
There was a problem hiding this comment.
TTL only calls Execution.fail and doesn't cancel the pending plainAdapter.waitForEvent, leaving runAsync stuck—should we reject or cancel that wait and clear pendingWaits so runAsync can unwind?
Finding type: Logical Bugs | Severity: 🔴 High
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
In packages/engine/src/engine/execution.ts around lines 131-135, the TTL callback inside
startTtl only calls this.fail(...) but does not cancel/reject the pending wait held by
the PlainAdapter, leaving runAsync blocked. Modify the TTL expiry path to first cancel
or reject the adapter's pending wait for this._waitingNodeId (e.g. call a new or
existing adapter method like adapter.cancelWait(this._waitingNodeId) or
adapter.rejectWait(this._waitingNodeId, error)), then call this.fail(error). If
PlainAdapter lacks such an API, update the PlainAdapter type and adapter implementations
to provide a cancel/reject-wait method and use it here so the underlying wait promise in
runAsync is unblocked when TTL expires.
Heads up!
Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription
Deploying flowprint with
|
| Latest commit: |
429e21d
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://d544d6ff.flowprint.pages.dev |
| Branch Preview URL: | https://feat-engine-pr6-wait-signal.flowprint.pages.dev |
Code Review FindingsCritical (90-100 confidence)1.
while (true) {
// Find earliest timer <= targetTime
// If none, break
// Fire it, update currentTime
}2. TTL timer and wait-timeout race causes double-rejection (92)
fail(error: Error): void {
if (this._status === 'completed' || this._status === 'failed') return
// ...
}Important (80-89 confidence)3.
4. Missing changeset file (82)
|
User description
Summary
Dependency
Base: `feat/engine-pr3-plain-adapter` (branches from adapter, parallel to PR 4/5)
PR 10 of 16 in the execution engine implementation.
Test plan
Generated description
Below is a concise technical summary of the changes proposed in this PR:
Enable wait nodes by coordinating
PlainAdapter.waitForEvent, the newExecutionhandle, andCompiledFlow.startso workflows pause for signals, surface waiting state/TTL, and resolve traces even when timeouts route to successors. Provide a sharedClockabstraction withRealClock,TestClock, andparseDurationplus supporting exports/tests so signal timeouts and paused-execution TTLs behave deterministically in both production and test environments.PlainAdapter,Execution, andCompiledFlowso wait nodes suspend onwaitForEvent, routetimeout_next, surface waiting status/trace data, and let callers resume workflows withExecution.signalwhile honoring signal validation, TTLs, and existing engine hooks.Modified files (9)
Latest Contributors(0)
Clockinterface, concreteRealClock/TestClockimplementations,parseDuration, and their tests so wait timeouts and TTL-driven expirations run consistently across prod and test scenarios.Modified files (3)
Latest Contributors(0)