FE-763: Petrinaut event stream — initial markings + transition firings#158
FE-763: Petrinaut event stream — initial markings + transition firings#158kostandinang wants to merge 1 commit into
Conversation
|
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. |
dc53d4c to
f6356f7
Compare
0d89acd to
7590a6f
Compare
PR SummaryLow Risk Overview A new The Petri interpreter now attaches Reviewed by Cursor Bugbot for commit 9c0641c. Bugbot is set up for automated code reviews on this repo. Configure here. |
🤖 Augment PR SummarySummary: Adds a Petrinaut-focused runtime event stream to the orchestrator so Petrinaut can visualize live cook runs (initial marking + per-transition firings + terminal outcomes). Changes:
Technical Notes: The stream writes one JSON object per line (JSONL) and fans out to an optional in-process 🤖 Was this summary useful? React with 👍 or 👎 |
| kind: 'transition_fired', | ||
| ts: event.ts, | ||
| runId, | ||
| transitionName: event.transitionId ?? '', |
There was a problem hiding this comment.
transitionName: event.transitionId ?? '' will silently emit an empty transition name if an upstream transition_fired event is malformed; that can hide bugs and may violate the Petrinaut schema. Consider failing fast (or otherwise making missing transitionId unmistakable) for transition_fired events.
Severity: low
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| tokens: Token[][] | undefined, | ||
| ): Record<string, PetrinautToken[]> { | ||
| const out: Record<string, PetrinautToken[]> = {}; | ||
| if (!places || !tokens) return out; |
There was a problem hiding this comment.
groupTokens() returns {} when places is present but tokens is missing, which would silently drop arc/place information from emitted Petrinaut events. Consider validating places/tokens alignment and/or emitting empty arrays per place so data loss is obvious.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| // --------------------------------------------------------------------------- | ||
|
|
||
| describe('FE-763: Petrinaut event stream on a real run', () => { | ||
| it('emits initial_marking + transition_fired (with token payload) + net_halted for simplePlan happy path', async () => { |
There was a problem hiding this comment.
Emits the runtime events Petrinaut needs to visualize a live cook run, in
the cross-team-agreed payload shape (2026-05-26 alignment):
initial_marking:
{ kind, ts, runId, marking: { <place>: [{id, ...payload}] } }
transition_fired:
{ kind, ts, runId, transitionName,
input: { <place>: [{id, ...payload}] },
output: { <place>: [{id, ...payload}] } }
net_halted / net_deadlocked:
{ kind, ts, runId }
What landed:
- New module src/orchestrator/src/petrinaut-events.ts: pure adapter
createPetrinautEventStream({ runId, filePath?, tokenIdFn?, onEvent? })
returning { sink: NetEventSink, emitInitialMarking(blueprint) }.
Writes one JSONL object per line to filePath when set and fans out to
onEvent for in-process consumers (tests, future sync-server forwarder).
- petri-net.ts: NetEvent gains parallel optional consumedTokens?: Token[][]
and producedTokens?: Token[][] (one entry per arc, same indexing as the
existing consumed/produced place-name lists). These are populated for
transition_fired events so the adapter can render the per-place
{ id, ...payload } shape Petrinaut expects without re-reading the net.
scheduleDeferred gains a consumedTokens parameter alongside the existing
consumedPlaces so async fires emit the same shape as sync fires.
- net-compiler.ts: all four scheduleDeferred call sites pass the captured
consumed tokens through to the deferred event.
- engine.ts: when input.runDir is present, opens a Petrinaut event stream
writing to <runDir>/petrinaut-events.jsonl, emits initial_marking from
the compiled blueprint up-front, then passes the sink to net.run(). The
same gate that drives FE-762's <runDir>/net.json write — library callers
without a runDir get the existing no-op behavior.
Halt outcomes (FE-761 Slice 2b halted-as-place):
- Petrinaut sees halt as a halt token traveling through the topology via
transition_fired events landing on slice:<sid>:halted / epic:<eid>:halted,
plus a terminal net_halted event from the engine.
Open coordination item: token UUID lifecycle across consume->emit (lineage
tracing). v1 generates fresh UUIDs per emission. When Petrinaut decides
whether identity should persist, this module is the seam to evolve.
Tests:
- 4 unit tests in petrinaut-events.test.ts covering initial_marking,
transition_fired adapter shape, terminal events, and JSONL file
roundtrip via mkdtempSync.
- 1 end-to-end test in engine-contract.test.ts running simplePlan happy
path with the Petrinaut sink — asserts initial_marking first, runId on
every event, the FE-761 Slice 4 dispatch + complete transition names
both appear, every token carries an id, and happy paths emit no
net_halted / net_deadlocked.
All 130 orchestrator tests pass; npm run fix + check + build all green.
Co-authored-by: Amp <amp@ampcode.com>
f6356f7 to
ff3504c
Compare
7590a6f to
9c0641c
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 9c0641c. Configure here.
| consumed: transition.inputs, | ||
| consumedTokens: consumed.map((t) => [t]), | ||
| produced: producedPlaces, | ||
| producedTokens, |
There was a problem hiding this comment.
Deferred handlers emit duplicate transition_fired events with consumed tokens
Medium Severity
For deferred fire handlers (action, run-tests, assess-semantic, verify-epic), depositClaim emits a transition_fired event with populated consumedTokens but empty producedTokens when the fire handler returns []. Then completeDeferred emits a second transition_fired for the same transition with the same consumedTokens plus the actual producedTokens. The rename from _consumed to consumed and the addition of consumedTokens: consumed.map((t) => [t]) means the Petrinaut stream now receives two rich events per deferred firing — the first showing tokens vanishing with no output, the second claiming the same tokens were consumed again. A Petrinaut visualizer animating token movements would see contradictory state.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 9c0641c. Configure here.



Summary
petrinaut-events.ts—createPetrinautEventStream({ runId, filePath?, tokenIdFn?, onEvent? })returns aNetEventSinkadapter plusemitInitialMarking(blueprint)helper. Writes one JSON object per line tofilePathwhen provided and fans out toonEventfor in-process consumers.<runDir>/petrinaut-events.jsonlon every run:initial_markingup-front, thentransition_firedper fire, then anynet_halted/net_deadlockedterminal event.{ id: <UUID>, ...payload };input/outputareRecord<place, tokens[]>; every event carriesrunId.Context
engine.tswithinput.runDir-conditional behavior; chaining them avoids a self-conflict in that file. Stack ordering is a Graphite concern; the underlying frontier items remain "parallel" permemory/PLAN.md.reports.jsonl— Petrinaut consumes the new stream directly; reports stay the orchestrator's internal log.slice:<sid>:halted/epic:<eid>:haltedplaces viatransition_firedevents, plus a terminalnet_haltedfrom the engine.What changed
src/orchestrator/src/petrinaut-events.ts: pure adapter from the orchestrator'sNetEventstream to Petrinaut's wire format.PetrinautEventdiscriminated union overinitial_marking/transition_fired/net_halted/net_deadlocked;PetrinautToken { id, sliceId?, epicId?, retryCount?, reworkCount?, haltReason? }.emitInitialMarking(blueprint)derives the up-front marking fromblueprint.initialTokens, assigns fresh UUIDs, and groups by place.sink.emit(event)translates eachNetEventinto the corresponding Petrinaut event, buildinginput/outputrecords from the parallelconsumedTokens/producedTokensarrays.petri-net.ts:NetEventgains parallel optionalconsumedTokens?: Token[][]andproducedTokens?: Token[][]arrays — populated fortransition_firedso the adapter can include per-arc token payload without re-reading the net.scheduleDeferredgains aconsumedTokensparameter alongsideconsumedPlacesso async fires emit the same shape as sync fires.net-compiler.ts: all fourscheduleDeferredcall sites pass the capturedconsumedtokens through.engine.ts: wheninput.runDiris present, opens a Petrinaut event stream writing to<runDir>/petrinaut-events.jsonl, emitsinitial_markingfrom the compiled blueprint, then passes the sink tonet.run().Verification
petrinaut-events.test.ts: initial_marking grouping, transition_fired adapter shape, terminal-event forwarding, JSONL file roundtrip viamkdtempSync+readFileSync.engine-contract.test.tsrunningsimplePlanhappy path with the Petrinaut sink — assertsinitial_markingfirst,runIdon every event, the FE-761 Slice 4 dispatch + complete transition names both appear in the stream, every token carries anid, and happy paths emit nonet_halted/net_deadlocked.npm run verify(check + test + build) green.Out of scope
Traceability
petri-event-streaminmemory/PLAN.md; stacks on FE-762 (which stacks on FE-761).