From 9c0641cf164355c7ccb855c7c423e4508259d98d Mon Sep 17 00:00:00 2001 From: Kostandin Angjellari Date: Thu, 28 May 2026 01:11:04 +0200 Subject: [PATCH] =?UTF-8?q?FE-763:=20Petrinaut=20event=20stream=20?= =?UTF-8?q?=E2=80=94=20initial=20markings=20+=20transition=20firings?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Emits the runtime events Petrinaut needs to visualize a live cook run, in the cross-team-agreed payload shape (2026-05-26 alignment): initial_marking: { kind, ts, runId, marking: { : [{id, ...payload}] } } transition_fired: { kind, ts, runId, transitionName, input: { : [{id, ...payload}] }, output: { : [{id, ...payload}] } } net_halted / net_deadlocked: { kind, ts, runId } What landed: - New module src/orchestrator/src/petrinaut-events.ts: pure adapter createPetrinautEventStream({ runId, filePath?, tokenIdFn?, onEvent? }) returning { sink: NetEventSink, emitInitialMarking(blueprint) }. Writes one JSONL object per line to filePath when set and fans out to onEvent for in-process consumers (tests, future sync-server forwarder). - petri-net.ts: NetEvent gains parallel optional consumedTokens?: Token[][] and producedTokens?: Token[][] (one entry per arc, same indexing as the existing consumed/produced place-name lists). These are populated for transition_fired events so the adapter can render the per-place { id, ...payload } shape Petrinaut expects without re-reading the net. scheduleDeferred gains a consumedTokens parameter alongside the existing consumedPlaces so async fires emit the same shape as sync fires. - net-compiler.ts: all four scheduleDeferred call sites pass the captured consumed tokens through to the deferred event. - engine.ts: when input.runDir is present, opens a Petrinaut event stream writing to /petrinaut-events.jsonl, emits initial_marking from the compiled blueprint up-front, then passes the sink to net.run(). The same gate that drives FE-762's /net.json write — library callers without a runDir get the existing no-op behavior. Halt outcomes (FE-761 Slice 2b halted-as-place): - Petrinaut sees halt as a halt token traveling through the topology via transition_fired events landing on slice::halted / epic::halted, plus a terminal net_halted event from the engine. Open coordination item: token UUID lifecycle across consume->emit (lineage tracing). v1 generates fresh UUIDs per emission. When Petrinaut decides whether identity should persist, this module is the seam to evolve. Tests: - 4 unit tests in petrinaut-events.test.ts covering initial_marking, transition_fired adapter shape, terminal events, and JSONL file roundtrip via mkdtempSync. - 1 end-to-end test in engine-contract.test.ts running simplePlan happy path with the Petrinaut sink — asserts initial_marking first, runId on every event, the FE-761 Slice 4 dispatch + complete transition names both appear, every token carries an id, and happy paths emit no net_halted / net_deadlocked. All 130 orchestrator tests pass; npm run fix + check + build all green. Co-authored-by: Amp --- src/orchestrator/src/engine-contract.test.ts | 75 ++++++- src/orchestrator/src/engine.ts | 19 +- src/orchestrator/src/net-compiler.ts | 8 +- src/orchestrator/src/petri-net.ts | 28 ++- src/orchestrator/src/petrinaut-events.test.ts | 174 ++++++++++++++++ src/orchestrator/src/petrinaut-events.ts | 189 ++++++++++++++++++ 6 files changed, 483 insertions(+), 10 deletions(-) create mode 100644 src/orchestrator/src/petrinaut-events.test.ts create mode 100644 src/orchestrator/src/petrinaut-events.ts diff --git a/src/orchestrator/src/engine-contract.test.ts b/src/orchestrator/src/engine-contract.test.ts index 291815f2..842bf2b6 100644 --- a/src/orchestrator/src/engine-contract.test.ts +++ b/src/orchestrator/src/engine-contract.test.ts @@ -5,8 +5,13 @@ import { join } from 'node:path'; import { describe, expect, it } from 'vitest'; import { createOrchestrator } from './engine.js'; -import { compilePlan, compileTopology } from './net-compiler.js'; +import { compilePlan, compileTopology, wireHandlers } from './net-compiler.js'; import type { NetEvent } from './petri-net.js'; +import { + createPetrinautEventStream, + type PetrinautEvent, + type PetrinautTransitionFiredEvent, +} from './petrinaut-events.js'; import { InMemoryReportSink } from './report-sink.js'; import type { ActionContext, ActionHandlers, OrchestratorInput, Plan, RunCtx, TestRunner } from './types.js'; @@ -906,6 +911,74 @@ describe('Adapter: §7 event vocabulary', () => { }); }); +// --------------------------------------------------------------------------- +// FE-763 — Petrinaut event stream end-to-end on the orchestrator +// --------------------------------------------------------------------------- + +describe('FE-763: Petrinaut event stream on a real run', () => { + it('emits initial_marking + transition_fired (with token payload) + net_halted for simplePlan happy path', async () => { + const fakes = createFakes(); + const ctx: RunCtx = { + reportIds: [], + sliceOutcomes: new Map(), + epicOutcomes: new Map(), + }; + const input: OrchestratorInput = { + plan: simplePlan, + sandboxDir: '/tmp/fake', + actions: fakes.actions, + reports: fakes.reports, + testRunner: fakes.testRunner, + policy: { maxRetries: 3 }, + }; + + const blueprint = compileTopology(input.plan, input.policy); + const net = wireHandlers(blueprint, input, ctx); + + const events: PetrinautEvent[] = []; + const stream = createPetrinautEventStream({ + runId: 'run-e2e', + onEvent: (e) => events.push(e), + }); + stream.emitInitialMarking(blueprint); + await net.run('serial', () => net.hasHaltToken(), stream.sink); + + // 1. initial_marking is first. + expect(events[0]!.kind).toBe('initial_marking'); + + // 2. every event carries the runId. + expect(events.every((e) => 'runId' in e && e.runId === 'run-e2e')).toBe(true); + + // 3. transition_fired events expose the FE-761 Slice 4 dispatch/complete + // topology directly in Petrinaut's wire format. + const fired = events.filter((e): e is PetrinautTransitionFiredEvent => e.kind === 'transition_fired'); + const names = fired.map((e) => e.transitionName); + expect(names).toContain('slice-1:evaluate:dispatch'); + expect(names).toContain('slice-1:evaluate:complete'); + expect(names).toContain('slice-1:assess-semantic:dispatch'); + expect(names).toContain('slice-1:assess-semantic:complete'); + + // 4. each transition_fired carries per-place token data with a UUID + // (cross-team-agreed shape: { id: , ...payload }). + for (const e of fired) { + for (const tokens of Object.values(e.input)) { + for (const tok of tokens) expect(typeof tok.id).toBe('string'); + } + for (const tokens of Object.values(e.output)) { + for (const tok of tokens) expect(typeof tok.id).toBe('string'); + } + } + + // 5. happy path: no net_halted / net_deadlocked emitted (engine exits + // the loop cleanly when nothing remains enabled). When the cook + // fails — retry exhaustion etc. — Petrinaut sees the halt token + // travel through the topology as a transition_fired event landing + // in `slice::halted`, plus the engine emits net_halted. + expect(events.filter((e) => e.kind === 'net_halted')).toHaveLength(0); + expect(events.filter((e) => e.kind === 'net_deadlocked')).toHaveLength(0); + }); +}); + // --------------------------------------------------------------------------- // Contract test #12 — parallel fires concurrently // --------------------------------------------------------------------------- diff --git a/src/orchestrator/src/engine.ts b/src/orchestrator/src/engine.ts index ca6c9d4f..60fba4df 100644 --- a/src/orchestrator/src/engine.ts +++ b/src/orchestrator/src/engine.ts @@ -2,7 +2,8 @@ import { writeFileSync } from 'node:fs'; import { join } from 'node:path'; import { compileTopology, wireHandlers } from './net-compiler.js'; -import type { FiringPolicy } from './petri-net.js'; +import type { FiringPolicy, NetEventSink } from './petri-net.js'; +import { createPetrinautEventStream } from './petrinaut-events.js'; import { serializeBlueprint } from './petrinaut-export.js'; import type { Orchestrator, OrchestratorInput, OrchestratorResult, RunCtx } from './types.js'; @@ -38,8 +39,22 @@ export function createOrchestrator(firingPolicy: FiringPolicy): Orchestrator { writeFileSync(join(input.runDir, 'net.json'), `${JSON.stringify(net, null, 2)}\n`); } + // FE-763: open a Petrinaut event stream when runDir is present. + // Emits an initial_marking event up-front, then transition_fired / + // net_halted / net_deadlocked events as the net runs. Library + // callers without a runDir get the existing no-op behavior. + let eventSink: NetEventSink | undefined; + if (input.runDir) { + const stream = createPetrinautEventStream({ + runId: input.runId ?? 'unknown', + filePath: join(input.runDir, 'petrinaut-events.jsonl'), + }); + stream.emitInitialMarking(blueprint); + eventSink = stream.sink; + } + const net = wireHandlers(blueprint, input, ctx); - await net.run(firingPolicy, () => net.hasHaltToken()); + await net.run(firingPolicy, () => net.hasHaltToken(), eventSink); // Derive halt reason from any halt token deposited during the run. const haltTokens = net.getHaltTokens(); diff --git a/src/orchestrator/src/net-compiler.ts b/src/orchestrator/src/net-compiler.ts index 8caf5859..3e8127b9 100644 --- a/src/orchestrator/src/net-compiler.ts +++ b/src/orchestrator/src/net-compiler.ts @@ -638,7 +638,7 @@ export function wireHandlers(blueprint: NetBlueprint, input: OrchestratorInput, } return out; })(); - net.scheduleDeferred(skel.id, skel.contract, skel.inputs, deferred); + net.scheduleDeferred(skel.id, skel.contract, skel.inputs, consumed, deferred); return []; }; break; @@ -735,7 +735,7 @@ export function wireHandlers(blueprint: NetBlueprint, input: OrchestratorInput, { place: budgetPlace, token: { ...baseToken, retryCount: retryCount + 1 } }, ]; })(); - net.scheduleDeferred(skel.id, skel.contract, skel.inputs, deferred); + net.scheduleDeferred(skel.id, skel.contract, skel.inputs, consumed, deferred); return []; }; break; @@ -790,7 +790,7 @@ export function wireHandlers(blueprint: NetBlueprint, input: OrchestratorInput, { place: budgetPlace, token: { ...baseToken, reworkCount: reworkCount + 1 } }, ]; })(); - net.scheduleDeferred(skel.id, skel.contract, skel.inputs, deferred); + net.scheduleDeferred(skel.id, skel.contract, skel.inputs, consumed, deferred); return []; }; break; @@ -872,7 +872,7 @@ export function wireHandlers(blueprint: NetBlueprint, input: OrchestratorInput, // happens in sibling-passthrough transitions downstream. return [{ place: intermediatePlace, token: { ...inputToken, reportId } }]; })(); - net.scheduleDeferred(skel.id, skel.contract, skel.inputs, deferred); + net.scheduleDeferred(skel.id, skel.contract, skel.inputs, consumed, deferred); return []; }; break; diff --git a/src/orchestrator/src/petri-net.ts b/src/orchestrator/src/petri-net.ts index 69af9a9b..3b94a95e 100644 --- a/src/orchestrator/src/petri-net.ts +++ b/src/orchestrator/src/petri-net.ts @@ -65,14 +65,24 @@ export type FiringPolicy = 'serial' | 'parallel'; /** Event kinds aligned with spec doc §7. */ export type NetEventKind = 'transition_fired' | 'net_deadlocked' | 'net_halted'; -/** Structured event emitted during net execution. */ +/** + * Structured event emitted during net execution. + * + * `consumed` / `produced` are place-name lists (one entry per arc). The + * parallel `consumedTokens` / `producedTokens` carry the actual tokens + * that traversed each arc, indexed the same way — they are populated for + * `transition_fired` events so downstream adapters (e.g. the FE-763 + * Petrinaut event stream) can include token payload in the wire format. + */ export type NetEvent = { kind: NetEventKind; ts: string; transitionId?: string; contract?: TransitionContract; consumed?: string[]; + consumedTokens?: Token[][]; produced?: string[]; + producedTokens?: Token[][]; }; /** Sink for structured net events. Optional — defaults to no-op. */ @@ -138,11 +148,14 @@ export class PetriNet { transitionId: string, contract: TransitionContract | undefined, consumedPlaces: string[], + consumedTokens: Token[], work: Promise<{ place: string; token: Token }[]>, ): void { this.pendingDeferred++; work - .then((outputs) => this.completeDeferred(transitionId, contract, consumedPlaces, outputs)) + .then((outputs) => + this.completeDeferred(transitionId, contract, consumedPlaces, consumedTokens, outputs), + ) .catch((err) => { this.deferredError ??= err; this.pendingDeferred--; @@ -154,12 +167,15 @@ export class PetriNet { transitionId: string, contract: TransitionContract | undefined, consumedPlaces: string[], + consumedTokens: Token[], outputs: { place: string; token: Token }[], ): void { const producedPlaces: string[] = []; + const producedTokens: Token[][] = []; for (const { place, token } of outputs) { this.addToken(place, token); producedPlaces.push(place); + producedTokens.push([token]); } this.deferredEventSink?.emit({ kind: 'transition_fired', @@ -167,7 +183,9 @@ export class PetriNet { transitionId, contract, consumed: consumedPlaces, + consumedTokens: consumedTokens.map((t) => [t]), produced: producedPlaces, + producedTokens, }); this.pendingDeferred--; this.wakeOneWaiter(); @@ -273,14 +291,16 @@ export class PetriNet { } private depositClaim( - { transition, consumed: _consumed }: TransitionClaim, + { transition, consumed }: TransitionClaim, outputs: { place: string; token: Token }[], eventSink?: NetEventSink, ): void { const producedPlaces: string[] = []; + const producedTokens: Token[][] = []; for (const { place, token } of outputs) { this.addToken(place, token); producedPlaces.push(place); + producedTokens.push([token]); } eventSink?.emit({ kind: 'transition_fired', @@ -288,7 +308,9 @@ export class PetriNet { transitionId: transition.id, contract: transition.contract, consumed: transition.inputs, + consumedTokens: consumed.map((t) => [t]), produced: producedPlaces, + producedTokens, }); } diff --git a/src/orchestrator/src/petrinaut-events.test.ts b/src/orchestrator/src/petrinaut-events.test.ts new file mode 100644 index 00000000..eef00cd1 --- /dev/null +++ b/src/orchestrator/src/petrinaut-events.test.ts @@ -0,0 +1,174 @@ +import { mkdtempSync, readFileSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; + +import { describe, expect, it } from 'vitest'; + +import { compileTopology } from './net-compiler.js'; +import { + createPetrinautEventStream, + type PetrinautEvent, + type PetrinautTransitionFiredEvent, +} from './petrinaut-events.js'; +import type { Plan } from './types.js'; + +const simplePlan: Plan = { + epics: [{ id: 'epic-1', summary: 'E', depends_on: [], verification: [] }], + slices: [ + { + id: 'slice-1', + epic_id: 'epic-1', + definition: 'D', + depends_on: [], + verification: [{ kind: 'unit-test', target: 't' }], + }, + ], +}; + +/** Deterministic UUID stub for stable event snapshots. */ +function deterministicTokenId(): () => string { + let n = 0; + return () => `tok-${++n}`; +} + +// --------------------------------------------------------------------------- +// Unit tests — createPetrinautEventStream as a NetEventSink adapter +// --------------------------------------------------------------------------- + +describe('createPetrinautEventStream — initial_marking', () => { + it('emits one initial_marking event grouping every initial token by place', () => { + const blueprint = compileTopology(simplePlan, { maxRetries: 3 }); + const events: PetrinautEvent[] = []; + const stream = createPetrinautEventStream({ + runId: 'run-1', + tokenIdFn: deterministicTokenId(), + onEvent: (e) => events.push(e), + }); + stream.emitInitialMarking(blueprint); + + expect(events).toHaveLength(1); + const ev = events[0]!; + expect(ev.kind).toBe('initial_marking'); + if (ev.kind !== 'initial_marking') return; // narrow + + expect(ev.runId).toBe('run-1'); + expect(Object.keys(ev.marking).sort()).toEqual( + [ + 'pool:code-agent', + 'pool:test-agent', + 'slice:slice-1:eligible', + 'slice:slice-1:retry-budget', + 'slice:slice-1:semantic-budget', + ].sort(), + ); + + // Every token carries an id; semantic payloads preserved. + const retry = ev.marking['slice:slice-1:retry-budget']!; + expect(retry).toHaveLength(1); + expect(retry[0]!.id).toBeDefined(); + expect(retry[0]!.retryCount).toBe(0); + expect(retry[0]!.sliceId).toBe('slice-1'); + }); +}); + +describe('createPetrinautEventStream — transition_fired adapter', () => { + it('translates a NetEvent into the cross-team-agreed transition_fired shape', () => { + const events: PetrinautEvent[] = []; + const stream = createPetrinautEventStream({ + runId: 'run-1', + tokenIdFn: deterministicTokenId(), + onEvent: (e) => events.push(e), + }); + stream.sink.emit({ + kind: 'transition_fired', + ts: '2026-05-27T00:00:00.000Z', + transitionId: 'slice-1:evaluate:dispatch', + consumed: ['slice:slice-1:spec-ready', 'pool:test-agent'], + consumedTokens: [[{ sliceId: 'slice-1', epicId: 'epic-1' }], [{ sliceId: '', epicId: '' }]], + produced: ['slice:slice-1:evaluate:running'], + producedTokens: [[{ sliceId: 'slice-1', epicId: 'epic-1' }]], + }); + + expect(events).toHaveLength(1); + const ev = events[0]! as PetrinautTransitionFiredEvent; + expect(ev.kind).toBe('transition_fired'); + expect(ev.runId).toBe('run-1'); + expect(ev.transitionName).toBe('slice-1:evaluate:dispatch'); + expect(Object.keys(ev.input).sort()).toEqual(['pool:test-agent', 'slice:slice-1:spec-ready']); + expect(ev.input['slice:slice-1:spec-ready']).toHaveLength(1); + expect(ev.input['slice:slice-1:spec-ready']![0]!.sliceId).toBe('slice-1'); + expect(Object.keys(ev.output)).toEqual(['slice:slice-1:evaluate:running']); + expect(ev.output['slice:slice-1:evaluate:running']![0]!.id).toBeDefined(); + }); + + it('forwards net_halted and net_deadlocked as terminal events', () => { + const events: PetrinautEvent[] = []; + const stream = createPetrinautEventStream({ + runId: 'run-1', + tokenIdFn: deterministicTokenId(), + onEvent: (e) => events.push(e), + }); + stream.sink.emit({ kind: 'net_halted', ts: '2026-05-27T00:00:00.000Z' }); + stream.sink.emit({ kind: 'net_deadlocked', ts: '2026-05-27T00:00:01.000Z' }); + + expect(events.map((e) => e.kind)).toEqual(['net_halted', 'net_deadlocked']); + expect(events.every((e) => 'runId' in e && e.runId === 'run-1')).toBe(true); + }); +}); + +// --------------------------------------------------------------------------- +// File output — JSONL roundtrip +// --------------------------------------------------------------------------- + +describe('createPetrinautEventStream — JSONL file output', () => { + it('appends one event per line and reloads as parsed events', () => { + const dir = mkdtempSync(join(tmpdir(), 'brunch-petrinaut-events-')); + const filePath = join(dir, 'petrinaut-events.jsonl'); + const blueprint = compileTopology(simplePlan, { maxRetries: 3 }); + + const stream = createPetrinautEventStream({ + runId: 'run-jsonl', + filePath, + tokenIdFn: deterministicTokenId(), + }); + + // Up-front initial marking. + stream.emitInitialMarking(blueprint); + + // A synthetic transition_fired (the production path goes through the + // NetEventSink during PetriNet.run; here we exercise the same adapter + // directly to avoid coupling this test to the heavy orchestrator path). + stream.sink.emit({ + kind: 'transition_fired', + ts: '2026-05-27T00:00:00.000Z', + transitionId: 'slice-1:evaluate:dispatch', + consumed: ['slice:slice-1:spec-ready', 'pool:test-agent'], + consumedTokens: [[{ sliceId: 'slice-1', epicId: 'epic-1' }], [{ sliceId: '', epicId: '' }]], + produced: ['slice:slice-1:evaluate:running'], + producedTokens: [[{ sliceId: 'slice-1', epicId: 'epic-1' }]], + }); + + // Terminal halt. + stream.sink.emit({ kind: 'net_halted', ts: '2026-05-27T00:00:01.000Z' }); + + const raw = readFileSync(filePath, 'utf8'); + const lines = raw + .trim() + .split('\n') + .map((l) => JSON.parse(l) as PetrinautEvent); + + expect(lines).toHaveLength(3); + expect(lines[0]!.kind).toBe('initial_marking'); + expect(lines[1]!.kind).toBe('transition_fired'); + expect(lines[2]!.kind).toBe('net_halted'); + + // Every event carries runId for cross-run isolation. + expect(lines.every((e) => 'runId' in e && e.runId === 'run-jsonl')).toBe(true); + + // Transition_fired arcs carry tokens with payload. + const fired = lines[1] as PetrinautTransitionFiredEvent; + expect(fired.transitionName).toBe('slice-1:evaluate:dispatch'); + expect(fired.input['slice:slice-1:spec-ready']![0]!.sliceId).toBe('slice-1'); + expect(fired.output['slice:slice-1:evaluate:running']![0]!.id).toBeDefined(); + }); +}); diff --git a/src/orchestrator/src/petrinaut-events.ts b/src/orchestrator/src/petrinaut-events.ts new file mode 100644 index 00000000..aa4024c0 --- /dev/null +++ b/src/orchestrator/src/petrinaut-events.ts @@ -0,0 +1,189 @@ +// --------------------------------------------------------------------------- +// FE-763 — Petrinaut event stream for a live cook run. +// +// Adapts the orchestrator's internal NetEvent stream into the cross-team- +// agreed Petrinaut event format, plus an `initial_marking` event emitted +// once at run start from the compiled blueprint. +// +// Wire format (2026-05-26 alignment): +// +// transition_fired: +// { kind, ts, runId, transitionName, +// input: { : [{ id: , ...payload }] }, +// output: { : [{ id: , ...payload }] } } +// +// initial_marking: +// { kind, ts, runId, +// marking: { : [{ id: , ...payload }] } } +// +// net_halted / net_deadlocked: +// { kind, ts, runId } +// +// Halt outcomes appear in two complementary forms: +// 1. structurally — as halt tokens on `slice::halted` / `epic::halted` +// places (deposited by the FE-761 Slice 2b halted-as-place refactor). +// These flow naturally through `transition_fired` events as token payload. +// 2. as a terminal `net_halted` event marking the run's end state. +// +// Open coordination item (tracked on FE-763): token UUID lifecycle — +// today every emission generates fresh UUIDs (no lineage across +// consume→emit). When Petrinaut decides whether to persist token +// identity across firings this module is the seam to evolve. +// --------------------------------------------------------------------------- + +import { randomUUID } from 'node:crypto'; +import { appendFileSync, writeFileSync } from 'node:fs'; + +import type { NetBlueprint, TokenSeed } from './net-blueprint.js'; +import type { NetEvent, NetEventSink, Token } from './petri-net.js'; + +export type PetrinautToken = { + id: string; + sliceId?: string; + epicId?: string; + retryCount?: number; + reworkCount?: number; + /** Halt reason carried by halt tokens (FE-761 Slice 2b). */ + haltReason?: string; +}; + +export type PetrinautInitialMarkingEvent = { + kind: 'initial_marking'; + ts: string; + runId: string; + marking: Record; +}; + +export type PetrinautTransitionFiredEvent = { + kind: 'transition_fired'; + ts: string; + runId: string; + transitionName: string; + input: Record; + output: Record; +}; + +export type PetrinautTerminalEvent = { + kind: 'net_halted' | 'net_deadlocked'; + ts: string; + runId: string; +}; + +export type PetrinautEvent = + | PetrinautInitialMarkingEvent + | PetrinautTransitionFiredEvent + | PetrinautTerminalEvent; + +export type CreatePetrinautEventStreamOpts = { + runId: string; + /** When set, every event is appended as one JSON object per line. */ + filePath?: string; + /** Override the per-token UUID generator (tests). */ + tokenIdFn?: () => string; + /** Fan-out for in-memory consumers (tests, sync-server forwarder). */ + onEvent?: (event: PetrinautEvent) => void; +}; + +export type PetrinautEventStream = { + /** NetEventSink to pass into `PetriNet.run()`. */ + sink: NetEventSink; + /** Emit the initial marking event from a compiled blueprint. Call once before `net.run()`. */ + emitInitialMarking(blueprint: NetBlueprint): void; +}; + +/** + * Create a Petrinaut-shaped event stream. Returns a NetEventSink adapter and + * a helper to emit the initial marking up-front. The stream writes one JSON + * object per line to `filePath` when provided, and also fans out to + * `onEvent` so in-process consumers (tests, the sync server) can subscribe + * without re-reading the file. + */ +export function createPetrinautEventStream(opts: CreatePetrinautEventStreamOpts): PetrinautEventStream { + const { runId, filePath, onEvent } = opts; + const tokenId = opts.tokenIdFn ?? randomUUID; + + // Initialize the file as empty so the first append produces a well-formed JSONL file. + if (filePath) writeFileSync(filePath, ''); + + function publish(event: PetrinautEvent): void { + if (filePath) appendFileSync(filePath, `${JSON.stringify(event)}\n`); + onEvent?.(event); + } + + function groupTokens( + places: string[] | undefined, + tokens: Token[][] | undefined, + ): Record { + const out: Record = {}; + if (!places || !tokens) return out; + for (let i = 0; i < places.length; i++) { + const place = places[i]!; + const placeTokens = tokens[i] ?? []; + const list = out[place] ?? []; + for (const t of placeTokens) list.push(tokenToPetrinaut(t, tokenId)); + out[place] = list; + } + return out; + } + + const sink: NetEventSink = { + emit(event: NetEvent): void { + switch (event.kind) { + case 'transition_fired': { + publish({ + kind: 'transition_fired', + ts: event.ts, + runId, + transitionName: event.transitionId ?? '', + input: groupTokens(event.consumed, event.consumedTokens), + output: groupTokens(event.produced, event.producedTokens), + }); + return; + } + case 'net_halted': + case 'net_deadlocked': { + publish({ kind: event.kind, ts: event.ts, runId }); + return; + } + } + }, + }; + + function emitInitialMarking(blueprint: NetBlueprint): void { + const marking: Record = {}; + for (const { place, token } of blueprint.initialTokens) { + const list = marking[place] ?? []; + list.push(seedToPetrinaut(token, tokenId())); + marking[place] = list; + } + publish({ + kind: 'initial_marking', + ts: new Date().toISOString(), + runId, + marking, + }); + } + + return { sink, emitInitialMarking }; +} + +function tokenToPetrinaut(token: Token, idFn: () => string): PetrinautToken { + return { + id: idFn(), + ...(token.sliceId ? { sliceId: token.sliceId } : {}), + ...(token.epicId ? { epicId: token.epicId } : {}), + ...(token.retryCount !== undefined ? { retryCount: token.retryCount } : {}), + ...(token.reworkCount !== undefined ? { reworkCount: token.reworkCount } : {}), + ...(token.haltReason !== undefined ? { haltReason: token.haltReason } : {}), + }; +} + +function seedToPetrinaut(seed: TokenSeed, id: string): PetrinautToken { + return { + id, + ...(seed.sliceId ? { sliceId: seed.sliceId } : {}), + ...(seed.epicId ? { epicId: seed.epicId } : {}), + ...(seed.retryCount !== undefined ? { retryCount: seed.retryCount } : {}), + ...(seed.reworkCount !== undefined ? { reworkCount: seed.reworkCount } : {}), + }; +}