Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 74 additions & 1 deletion src/orchestrator/src/engine-contract.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@ import { join } from 'node:path';
import { describe, expect, it } from 'vitest';

import { createOrchestrator } from './engine.js';
import { compilePlan, compileTopology } from './net-compiler.js';
import { compilePlan, compileTopology, wireHandlers } from './net-compiler.js';
import type { NetEvent } from './petri-net.js';
import {
createPetrinautEventStream,
type PetrinautEvent,
type PetrinautTransitionFiredEvent,
} from './petrinaut-events.js';
import { InMemoryReportSink } from './report-sink.js';
import type { ActionContext, ActionHandlers, OrchestratorInput, Plan, RunCtx, TestRunner } from './types.js';

Expand Down Expand Up @@ -906,6 +911,74 @@ describe('Adapter: §7 event vocabulary', () => {
});
});

// ---------------------------------------------------------------------------
// FE-763 — Petrinaut event stream end-to-end on the orchestrator
// ---------------------------------------------------------------------------

describe('FE-763: Petrinaut event stream on a real run', () => {
it('emits initial_marking + transition_fired (with token payload) + net_halted for simplePlan happy path', async () => {
Copy link
Copy Markdown

@augmentcode augmentcode Bot May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test description mentions net_halted, but the assertions below expect zero net_halted/net_deadlocked events on the happy path; consider updating the description to match the intended behavior.

Severity: low

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

const fakes = createFakes();
const ctx: RunCtx = {
reportIds: [],
sliceOutcomes: new Map(),
epicOutcomes: new Map(),
};
const input: OrchestratorInput = {
plan: simplePlan,
sandboxDir: '/tmp/fake',
actions: fakes.actions,
reports: fakes.reports,
testRunner: fakes.testRunner,
policy: { maxRetries: 3 },
};

const blueprint = compileTopology(input.plan, input.policy);
const net = wireHandlers(blueprint, input, ctx);

const events: PetrinautEvent[] = [];
const stream = createPetrinautEventStream({
runId: 'run-e2e',
onEvent: (e) => events.push(e),
});
stream.emitInitialMarking(blueprint);
await net.run('serial', () => net.hasHaltToken(), stream.sink);

// 1. initial_marking is first.
expect(events[0]!.kind).toBe('initial_marking');

// 2. every event carries the runId.
expect(events.every((e) => 'runId' in e && e.runId === 'run-e2e')).toBe(true);

// 3. transition_fired events expose the FE-761 Slice 4 dispatch/complete
// topology directly in Petrinaut's wire format.
const fired = events.filter((e): e is PetrinautTransitionFiredEvent => e.kind === 'transition_fired');
const names = fired.map((e) => e.transitionName);
expect(names).toContain('slice-1:evaluate:dispatch');
expect(names).toContain('slice-1:evaluate:complete');
expect(names).toContain('slice-1:assess-semantic:dispatch');
expect(names).toContain('slice-1:assess-semantic:complete');

// 4. each transition_fired carries per-place token data with a UUID
// (cross-team-agreed shape: { id: <UUID>, ...payload }).
for (const e of fired) {
for (const tokens of Object.values(e.input)) {
for (const tok of tokens) expect(typeof tok.id).toBe('string');
}
for (const tokens of Object.values(e.output)) {
for (const tok of tokens) expect(typeof tok.id).toBe('string');
}
}

// 5. happy path: no net_halted / net_deadlocked emitted (engine exits
// the loop cleanly when nothing remains enabled). When the cook
// fails — retry exhaustion etc. — Petrinaut sees the halt token
// travel through the topology as a transition_fired event landing
// in `slice:<sid>:halted`, plus the engine emits net_halted.
expect(events.filter((e) => e.kind === 'net_halted')).toHaveLength(0);
expect(events.filter((e) => e.kind === 'net_deadlocked')).toHaveLength(0);
});
});

// ---------------------------------------------------------------------------
// Contract test #12 — parallel fires concurrently
// ---------------------------------------------------------------------------
Expand Down
19 changes: 17 additions & 2 deletions src/orchestrator/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import { writeFileSync } from 'node:fs';
import { join } from 'node:path';

import { compileTopology, wireHandlers } from './net-compiler.js';
import type { FiringPolicy } from './petri-net.js';
import type { FiringPolicy, NetEventSink } from './petri-net.js';
import { createPetrinautEventStream } from './petrinaut-events.js';
import { serializeBlueprint } from './petrinaut-export.js';
import type { Orchestrator, OrchestratorInput, OrchestratorResult, RunCtx } from './types.js';

Expand Down Expand Up @@ -38,8 +39,22 @@ export function createOrchestrator(firingPolicy: FiringPolicy): Orchestrator {
writeFileSync(join(input.runDir, 'net.json'), `${JSON.stringify(net, null, 2)}\n`);
}

// FE-763: open a Petrinaut event stream when runDir is present.
// Emits an initial_marking event up-front, then transition_fired /
// net_halted / net_deadlocked events as the net runs. Library
// callers without a runDir get the existing no-op behavior.
let eventSink: NetEventSink | undefined;
if (input.runDir) {
const stream = createPetrinautEventStream({
runId: input.runId ?? 'unknown',
filePath: join(input.runDir, 'petrinaut-events.jsonl'),
});
stream.emitInitialMarking(blueprint);
eventSink = stream.sink;
}

const net = wireHandlers(blueprint, input, ctx);
await net.run(firingPolicy, () => net.hasHaltToken());
await net.run(firingPolicy, () => net.hasHaltToken(), eventSink);

// Derive halt reason from any halt token deposited during the run.
const haltTokens = net.getHaltTokens();
Expand Down
8 changes: 4 additions & 4 deletions src/orchestrator/src/net-compiler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ export function wireHandlers(blueprint: NetBlueprint, input: OrchestratorInput,
}
return out;
})();
net.scheduleDeferred(skel.id, skel.contract, skel.inputs, deferred);
net.scheduleDeferred(skel.id, skel.contract, skel.inputs, consumed, deferred);
return [];
};
break;
Expand Down Expand Up @@ -735,7 +735,7 @@ export function wireHandlers(blueprint: NetBlueprint, input: OrchestratorInput,
{ place: budgetPlace, token: { ...baseToken, retryCount: retryCount + 1 } },
];
})();
net.scheduleDeferred(skel.id, skel.contract, skel.inputs, deferred);
net.scheduleDeferred(skel.id, skel.contract, skel.inputs, consumed, deferred);
return [];
};
break;
Expand Down Expand Up @@ -790,7 +790,7 @@ export function wireHandlers(blueprint: NetBlueprint, input: OrchestratorInput,
{ place: budgetPlace, token: { ...baseToken, reworkCount: reworkCount + 1 } },
];
})();
net.scheduleDeferred(skel.id, skel.contract, skel.inputs, deferred);
net.scheduleDeferred(skel.id, skel.contract, skel.inputs, consumed, deferred);
return [];
};
break;
Expand Down Expand Up @@ -872,7 +872,7 @@ export function wireHandlers(blueprint: NetBlueprint, input: OrchestratorInput,
// happens in sibling-passthrough transitions downstream.
return [{ place: intermediatePlace, token: { ...inputToken, reportId } }];
})();
net.scheduleDeferred(skel.id, skel.contract, skel.inputs, deferred);
net.scheduleDeferred(skel.id, skel.contract, skel.inputs, consumed, deferred);
return [];
};
break;
Expand Down
28 changes: 25 additions & 3 deletions src/orchestrator/src/petri-net.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,24 @@ export type FiringPolicy = 'serial' | 'parallel';
/** Event kinds aligned with spec doc §7. */
export type NetEventKind = 'transition_fired' | 'net_deadlocked' | 'net_halted';

/** Structured event emitted during net execution. */
/**
* Structured event emitted during net execution.
*
* `consumed` / `produced` are place-name lists (one entry per arc). The
* parallel `consumedTokens` / `producedTokens` carry the actual tokens
* that traversed each arc, indexed the same way — they are populated for
* `transition_fired` events so downstream adapters (e.g. the FE-763
* Petrinaut event stream) can include token payload in the wire format.
*/
export type NetEvent = {
kind: NetEventKind;
ts: string;
transitionId?: string;
contract?: TransitionContract;
consumed?: string[];
consumedTokens?: Token[][];
produced?: string[];
producedTokens?: Token[][];
};

/** Sink for structured net events. Optional — defaults to no-op. */
Expand Down Expand Up @@ -138,11 +148,14 @@ export class PetriNet {
transitionId: string,
contract: TransitionContract | undefined,
consumedPlaces: string[],
consumedTokens: Token[],
work: Promise<{ place: string; token: Token }[]>,
): void {
this.pendingDeferred++;
work
.then((outputs) => this.completeDeferred(transitionId, contract, consumedPlaces, outputs))
.then((outputs) =>
this.completeDeferred(transitionId, contract, consumedPlaces, consumedTokens, outputs),
)
.catch((err) => {
this.deferredError ??= err;
this.pendingDeferred--;
Expand All @@ -154,20 +167,25 @@ export class PetriNet {
transitionId: string,
contract: TransitionContract | undefined,
consumedPlaces: string[],
consumedTokens: Token[],
outputs: { place: string; token: Token }[],
): void {
const producedPlaces: string[] = [];
const producedTokens: Token[][] = [];
for (const { place, token } of outputs) {
this.addToken(place, token);
producedPlaces.push(place);
producedTokens.push([token]);
}
this.deferredEventSink?.emit({
kind: 'transition_fired',
ts: new Date().toISOString(),
transitionId,
contract,
consumed: consumedPlaces,
consumedTokens: consumedTokens.map((t) => [t]),
produced: producedPlaces,
producedTokens,
});
this.pendingDeferred--;
this.wakeOneWaiter();
Expand Down Expand Up @@ -273,22 +291,26 @@ export class PetriNet {
}

private depositClaim(
{ transition, consumed: _consumed }: TransitionClaim,
{ transition, consumed }: TransitionClaim,
outputs: { place: string; token: Token }[],
eventSink?: NetEventSink,
): void {
const producedPlaces: string[] = [];
const producedTokens: Token[][] = [];
for (const { place, token } of outputs) {
this.addToken(place, token);
producedPlaces.push(place);
producedTokens.push([token]);
}
eventSink?.emit({
kind: 'transition_fired',
ts: new Date().toISOString(),
transitionId: transition.id,
contract: transition.contract,
consumed: transition.inputs,
consumedTokens: consumed.map((t) => [t]),
produced: producedPlaces,
producedTokens,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deferred handlers emit duplicate transition_fired events with consumed tokens

Medium Severity

For deferred fire handlers (action, run-tests, assess-semantic, verify-epic), depositClaim emits a transition_fired event with populated consumedTokens but empty producedTokens when the fire handler returns []. Then completeDeferred emits a second transition_fired for the same transition with the same consumedTokens plus the actual producedTokens. The rename from _consumed to consumed and the addition of consumedTokens: consumed.map((t) => [t]) means the Petrinaut stream now receives two rich events per deferred firing — the first showing tokens vanishing with no output, the second claiming the same tokens were consumed again. A Petrinaut visualizer animating token movements would see contradictory state.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 9c0641c. Configure here.

});
}

Expand Down
Loading
Loading