diff --git a/README.md b/README.md index 623003b..7c4628e 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,7 @@ openclaw config set plugins.entries.agent-control-openclaw-plugin.config.apiKey | `agentVersion` | string | — | Version string sent to Agent Control during agent sync. | | `timeoutMs` | integer | SDK default | Client timeout in milliseconds. | | `failClosed` | boolean | `false` | Block tool calls when Agent Control is unreachable. See [Fail-open vs fail-closed](#fail-open-vs-fail-closed). | +| `observabilityEnabled` | boolean | `true` | Emit pre-tool control execution events to Agent Control observability. Enabled by default unless explicitly set to `false`. | | `logLevel` | string | `warn` | Logging verbosity. See [Logging](#logging). | | `userAgent` | string | `openclaw-agent-control-plugin/0.1` | Custom User-Agent header for requests to Agent Control. | @@ -96,6 +97,20 @@ Set `failClosed` to `true` if you need the guarantee that no tool call executes openclaw config set plugins.entries.agent-control-openclaw-plugin.config.failClosed true ``` +## Observability + +Observability is enabled by default. Unless `observabilityEnabled` is explicitly set to `false`, the plugin sends control execution events to Agent Control's observability API after each `before_tool_call` evaluation. + +- Events are emitted only for the `pre` stage because the current OpenClaw plugin SDK typings expose a pre-tool hook but no post-tool hook. +- Emission is best-effort and non-blocking. Ingest failures are logged at `warn` and do not change allow/block behavior. +- The plugin stamps OpenTelemetry trace and span IDs when an active span exists, otherwise it generates OTEL-compatible IDs locally. + +Disable it with: + +```bash +openclaw config set plugins.entries.agent-control-openclaw-plugin.config.observabilityEnabled false +``` + ## Logging The plugin stays quiet by default and only emits warnings, errors, and tool block events. @@ -133,6 +148,7 @@ openclaw plugins disable agent-control-openclaw-plugin openclaw config unset plugins.entries.agent-control-openclaw-plugin.config.apiKey openclaw config unset plugins.entries.agent-control-openclaw-plugin.config.logLevel openclaw config unset plugins.entries.agent-control-openclaw-plugin.config.agentVersion +openclaw config unset plugins.entries.agent-control-openclaw-plugin.config.observabilityEnabled openclaw config unset plugins.entries.agent-control-openclaw-plugin.config.userAgent ``` diff --git a/openclaw.plugin.json b/openclaw.plugin.json index b36a66e..66a3956 100644 --- a/openclaw.plugin.json +++ b/openclaw.plugin.json @@ -31,6 +31,10 @@ "failClosed": { "type": "boolean" }, + "observabilityEnabled": { + "type": "boolean", + "default": true + }, "logLevel": { "type": "string", "enum": ["warn", "info", "debug"] @@ -54,6 +58,10 @@ "label": "Fail Closed", "help": "If true, block tool invocations when Agent Control is unavailable." }, + "observabilityEnabled": { + "label": "Enable Observability", + "help": "Defaults to true. Set false to disable best-effort pre-tool control execution events to Agent Control observability." + }, "logLevel": { "label": "Log Level", "help": "Controls plugin verbosity: warn logs only warnings, errors, and block events; info adds high-level lifecycle logs; debug adds verbose diagnostics." diff --git a/package-lock.json b/package-lock.json index fc90364..d28f23b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8,7 +8,8 @@ "name": "agent-control-openclaw-plugin", "version": "1.7.0", "dependencies": { - "agent-control": "^2.0.0", + "@opentelemetry/api": "^1.9.1", + "agent-control": "^2.2.0", "esbuild": "^0.27.3", "jiti": "^2.6.1" }, @@ -20,6 +21,9 @@ "semantic-release": "^25.0.3", "typescript": "^5.9.2", "vitest": "^4.0.18" + }, + "engines": { + "node": ">=24" } }, "node_modules/@actions/core": { @@ -783,6 +787,15 @@ "@octokit/openapi-types": "^27.0.0" } }, + "node_modules/@opentelemetry/api": { + "version": "1.9.1", + "resolved": "https://registry.npmjs.org/@opentelemetry/api/-/api-1.9.1.tgz", + "integrity": "sha512-gLyJlPHPZYdAk1JENA9LeHejZe1Ti77/pTeFm/nMXmQH/HFZlcS/O2XJB+L8fkbrNSqhdtlvjBVjxwUYanNH5Q==", + "license": "Apache-2.0", + "engines": { + "node": ">=8.0.0" + } + }, "node_modules/@oxc-project/types": { "version": "0.120.0", "resolved": "https://registry.npmjs.org/@oxc-project/types/-/types-0.120.0.tgz", @@ -1895,9 +1908,9 @@ } }, "node_modules/agent-control": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/agent-control/-/agent-control-2.0.0.tgz", - "integrity": "sha512-QnN5ujNuLArKQozSuOojPofGykrWr6m3ydThpFx2n0doB9VJVQl2HvONB1XqEoK/VZdc3l6MgpYYMwI4w9m+SQ==", + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/agent-control/-/agent-control-2.2.0.tgz", + "integrity": "sha512-CVm8jkleOyTLr8GUr46JsxuXQSCJcPH2XPcTDa1yjibWp65OZPIryKw5lw8QMT9Cdw0nCuQyokjd+dXjtARYdQ==", "license": "Apache-2.0", "dependencies": { "zod": "^4.1.11" diff --git a/package.json b/package.json index 764721b..b8acaaf 100644 --- a/package.json +++ b/package.json @@ -38,14 +38,15 @@ "release": "semantic-release" }, "dependencies": { - "agent-control": "^2.0.0", + "@opentelemetry/api": "^1.9.1", + "agent-control": "^2.2.0", "esbuild": "^0.27.3", "jiti": "^2.6.1" }, "devDependencies": { - "@vitest/coverage-v8": "^4.0.18", "@semantic-release/git": "^10.0.1", "@types/node": "^24.5.2", + "@vitest/coverage-v8": "^4.0.18", "oxlint": "^0.15.0", "semantic-release": "^25.0.3", "typescript": "^5.9.2", diff --git a/src/agent-control-plugin.ts b/src/agent-control-plugin.ts index ad81163..7cdc331 100644 --- a/src/agent-control-plugin.ts +++ b/src/agent-control-plugin.ts @@ -1,6 +1,13 @@ import { AgentControlClient } from "agent-control"; +import type { JsonValue } from "agent-control"; import type { OpenClawPluginApi } from "openclaw/plugin-sdk/core"; import { createPluginLogger, resolveLogLevel } from "./logging.ts"; +import { + buildControlExecutionEvents, + buildControlObservabilityIndex, + emitControlExecutionEvents, + resolveTraceContext, +} from "./observability.ts"; import { resolveStepsForContext } from "./tool-catalog.ts"; import { buildEvaluationContext } from "./session-context.ts"; import { @@ -74,6 +81,7 @@ export default function register(api: OpenClawPluginApi) { } const failClosed = cfg.failClosed === true; + const observabilityEnabled = cfg.observabilityEnabled !== false; const baseAgentName = asString(cfg.agentName) ?? "openclaw-agent"; const configuredAgentVersion = asString(cfg.agentVersion); const pluginVersion = asString(api.version); @@ -91,6 +99,9 @@ export default function register(api: OpenClawPluginApi) { logger.info( `agent-control: client_init duration_sec=${secondsSince(clientInitStartedAt)} timeout_ms=${clientTimeoutMs ?? "default"} server_url=${serverUrl}`, ); + logger.info( + `agent-control: observability enabled=${observabilityEnabled}`, + ); const states = new Map(); let gatewayWarmupPromise: Promise | null = null; @@ -110,6 +121,7 @@ export default function register(api: OpenClawPluginApi) { steps: [], stepsHash: hashSteps([]), lastSyncedStepsHash: null, + controlObservabilityById: new Map(), syncPromise: null, }; states.set(sourceAgentId, created); @@ -160,7 +172,7 @@ export default function register(api: OpenClawPluginApi) { const currentHash = state.stepsHash; const promise = (async () => { const syncStartedAt = process.hrtime.bigint(); - await client.agents.init({ + const syncResponse = await client.agents.init({ agent: { agentName: state.agentName, agentVersion: configuredAgentVersion, @@ -172,6 +184,9 @@ export default function register(api: OpenClawPluginApi) { }, steps: state.steps, }); + if (Array.isArray(syncResponse?.controls)) { + state.controlObservabilityById = buildControlObservabilityIndex(syncResponse.controls); + } logger.info( `agent-control: sync_agent duration_sec=${secondsSince(syncStartedAt)} agent=${state.sourceAgentId} step_count=${state.steps.length}`, ); @@ -283,22 +298,43 @@ export default function register(api: OpenClawPluginApi) { ); const evaluateStartedAt = process.hrtime.bigint(); + const traceContext = observabilityEnabled ? resolveTraceContext() : null; const evaluation = await client.evaluation.evaluate({ - body: { - agentName: state.agentName, - stage: "pre", - step: { - type: "tool", - name: event.toolName, - input: event.params, - context, - }, + agentName: state.agentName, + stage: "pre", + step: { + type: "tool", + name: event.toolName, + input: (event.params ?? null) as JsonValue, + context: context as Record, }, }); logger.debug( `agent-control: before_tool_call phase=evaluate duration_sec=${secondsSince(evaluateStartedAt)} agent=${sourceAgentId} tool=${event.toolName} safe=${evaluation.isSafe}`, ); + if (observabilityEnabled && traceContext) { + emitControlExecutionEvents({ + client, + logger, + events: buildControlExecutionEvents({ + evaluation, + agentName: state.agentName, + stepName: event.toolName, + stepType: "tool", + checkStage: "pre", + traceContext, + controlObservabilityById: state.controlObservabilityById, + sourceAgentId, + pluginId: api.id, + runId: event.runId ?? ctx.runId, + toolCallId: event.toolCallId ?? ctx.toolCallId, + }), + agentName: state.agentName, + stepName: event.toolName, + }); + } + if (evaluation.isSafe) { return; } diff --git a/src/observability.ts b/src/observability.ts new file mode 100644 index 0000000..65bd8bd --- /dev/null +++ b/src/observability.ts @@ -0,0 +1,248 @@ +import { randomBytes } from "node:crypto"; +import { trace } from "@opentelemetry/api"; +import type { + AgentControlClient, + ConditionNodeOutput, + Control, + ControlExecutionEvent, + EvaluationResponse, +} from "agent-control"; +import { isRecord } from "./shared.ts"; +import type { ControlObservabilityIdentity, PluginLogger } from "./types.ts"; + +type TraceContext = { + traceId: string; + spanId: string; +}; + +type EventCategory = "match" | "error" | "non_match"; + +type ControlMatchRecord = NonNullable[number]; + +type BuildControlExecutionEventsParams = { + evaluation: EvaluationResponse; + agentName: string; + stepName: string; + stepType: string; + checkStage: "pre" | "post"; + traceContext: TraceContext; + controlObservabilityById: Map; + sourceAgentId: string; + pluginId: string; + runId?: string; + toolCallId?: string; +}; + +type EmitControlExecutionEventsParams = { + client: Pick; + logger: PluginLogger; + events: ControlExecutionEvent[]; + agentName: string; + stepName: string; +}; + +function generateTraceId(): string { + return randomBytes(16).toString("hex"); +} + +function generateSpanId(): string { + return randomBytes(8).toString("hex"); +} + +export function resolveTraceContext(): TraceContext { + const activeSpan = trace.getActiveSpan(); + const spanContext = activeSpan?.spanContext(); + if (spanContext && trace.isSpanContextValid(spanContext)) { + return { + traceId: spanContext.traceId, + spanId: spanContext.spanId, + }; + } + + return { + traceId: generateTraceId(), + spanId: generateSpanId(), + }; +} + +function isRenderedControl( + control: Control["control"], +): control is Extract { + return isRecord(control) && "condition" in control; +} + +function visitConditionLeaves( + condition: ConditionNodeOutput, + visit: (identity: { evaluatorName: string; selectorPath: string }) => void, +): void { + const selectorPath = condition.selector?.path?.trim() || "*"; + const evaluatorName = condition.evaluator?.name?.trim(); + + if (evaluatorName) { + visit({ evaluatorName, selectorPath }); + return; + } + + for (const child of condition.and ?? []) { + visitConditionLeaves(child, visit); + } + for (const child of condition.or ?? []) { + visitConditionLeaves(child, visit); + } + if (condition.not) { + visitConditionLeaves(condition.not, visit); + } +} + +function buildControlObservabilityIdentity( + condition: ConditionNodeOutput, +): ControlObservabilityIdentity { + const allEvaluators: string[] = []; + const allSelectorPaths: string[] = []; + const seenEvaluators = new Set(); + const seenSelectorPaths = new Set(); + let leafCount = 0; + + visitConditionLeaves(condition, ({ evaluatorName, selectorPath }) => { + leafCount += 1; + + if (!seenEvaluators.has(evaluatorName)) { + seenEvaluators.add(evaluatorName); + allEvaluators.push(evaluatorName); + } + if (!seenSelectorPaths.has(selectorPath)) { + seenSelectorPaths.add(selectorPath); + allSelectorPaths.push(selectorPath); + } + }); + + return { + selectorPath: allSelectorPaths[0] ?? null, + evaluatorName: allEvaluators[0] ?? null, + leafCount, + allEvaluators, + allSelectorPaths, + }; +} + +export function buildControlObservabilityIndex( + controls: Control[] | null | undefined, +): Map { + const index = new Map(); + + for (const control of controls ?? []) { + if (!isRenderedControl(control.control)) { + continue; + } + + index.set(control.id, buildControlObservabilityIdentity(control.control.condition)); + } + + return index; +} + +function mapAppliesTo(stepType: string): "llm_call" | "tool_call" { + return stepType === "tool" ? "tool_call" : "llm_call"; +} + +function buildEventMetadata( + match: ControlMatchRecord, + identity: ControlObservabilityIdentity | undefined, + params: BuildControlExecutionEventsParams, +): Record | undefined { + const metadata: Record = {}; + + if (isRecord(match.result.metadata)) { + Object.assign(metadata, match.result.metadata); + } + + if (identity) { + metadata.primary_evaluator = identity.evaluatorName; + metadata.primary_selector_path = identity.selectorPath; + metadata.leaf_count = identity.leafCount; + metadata.all_evaluators = identity.allEvaluators; + metadata.all_selector_paths = identity.allSelectorPaths; + } + + metadata.openclaw_step_name = params.stepName; + metadata.openclaw_step_type = params.stepType; + metadata.openclaw_source_agent_id = params.sourceAgentId; + metadata.openclaw_plugin_id = params.pluginId; + if (params.runId) { + metadata.openclaw_run_id = params.runId; + } + if (params.toolCallId) { + metadata.openclaw_tool_call_id = params.toolCallId; + } + + return Object.keys(metadata).length > 0 ? metadata : undefined; +} + +function buildEventsForCategory( + matches: EvaluationResponse["matches"] | EvaluationResponse["errors"] | EvaluationResponse["nonMatches"], + category: EventCategory, + params: BuildControlExecutionEventsParams, +): ControlExecutionEvent[] { + const categoryMatches = matches ?? []; + const matched = category === "match"; + const includeErrorMessage = category === "error"; + + return categoryMatches.map((match) => { + const identity = params.controlObservabilityById.get(match.controlId); + + return { + action: match.action, + agentName: params.agentName, + appliesTo: mapAppliesTo(params.stepType), + checkStage: params.checkStage, + confidence: match.result.confidence, + controlExecutionId: match.controlExecutionId, + controlId: match.controlId, + controlName: match.controlName, + errorMessage: includeErrorMessage ? match.result.error ?? null : null, + evaluatorName: identity?.evaluatorName ?? null, + matched, + metadata: buildEventMetadata(match, identity, params), + selectorPath: identity?.selectorPath ?? null, + spanId: params.traceContext.spanId, + timestamp: new Date(), + traceId: params.traceContext.traceId, + }; + }); +} + +export function buildControlExecutionEvents( + params: BuildControlExecutionEventsParams, +): ControlExecutionEvent[] { + return [ + ...buildEventsForCategory(params.evaluation.matches, "match", params), + ...buildEventsForCategory(params.evaluation.errors, "error", params), + ...buildEventsForCategory(params.evaluation.nonMatches, "non_match", params), + ]; +} + +export function emitControlExecutionEvents(params: EmitControlExecutionEventsParams): void { + if (params.events.length === 0) { + return; + } + + void params.client.observability + .ingestEvents({ events: params.events }) + .then((response) => { + if (response.status !== "queued" || response.dropped > 0) { + params.logger.warn( + `agent-control: observability_ingest partial agent=${params.agentName} step=${params.stepName} status=${response.status} enqueued=${response.enqueued} dropped=${response.dropped}`, + ); + return; + } + + params.logger.debug( + `agent-control: observability_ingest agent=${params.agentName} step=${params.stepName} received=${response.received} enqueued=${response.enqueued}`, + ); + }) + .catch((error) => { + params.logger.warn( + `agent-control: observability_ingest failed agent=${params.agentName} step=${params.stepName} error=${String(error)}`, + ); + }); +} diff --git a/src/types.ts b/src/types.ts index 2da17e4..cc73256 100644 --- a/src/types.ts +++ b/src/types.ts @@ -2,6 +2,14 @@ import type { OpenClawPluginApi } from "openclaw/plugin-sdk/core"; export type LogLevel = "warn" | "info" | "debug"; +export type ControlObservabilityIdentity = { + selectorPath: string | null; + evaluatorName: string | null; + leafCount: number; + allEvaluators: string[]; + allSelectorPaths: string[]; +}; + export type AgentControlPluginConfig = { enabled?: boolean; serverUrl?: string; @@ -11,6 +19,7 @@ export type AgentControlPluginConfig = { timeoutMs?: number; userAgent?: string; failClosed?: boolean; + observabilityEnabled?: boolean; logLevel?: LogLevel; }; @@ -28,6 +37,7 @@ export type AgentState = { steps: AgentControlStep[]; stepsHash: string; lastSyncedStepsHash: string | null; + controlObservabilityById: Map; syncPromise: Promise | null; }; diff --git a/test/agent-control-plugin.test.ts b/test/agent-control-plugin.test.ts index 6229bd4..676efb6 100644 --- a/test/agent-control-plugin.test.ts +++ b/test/agent-control-plugin.test.ts @@ -15,6 +15,7 @@ const { init: vi.fn(), agentsInit: vi.fn(), evaluationEvaluate: vi.fn(), + ingestEvents: vi.fn(), }, resolveStepsForContextMock: vi.fn(), buildEvaluationContextMock: vi.fn(), @@ -29,6 +30,9 @@ vi.mock("agent-control", () => ({ evaluation = { evaluate: clientMocks.evaluationEvaluate, }; + observability = { + ingestEvents: clientMocks.ingestEvents, + }; }, })); @@ -114,6 +118,12 @@ beforeEach(() => { clientMocks.init.mockReset(); clientMocks.agentsInit.mockReset().mockResolvedValue(undefined); clientMocks.evaluationEvaluate.mockReset().mockResolvedValue({ isSafe: true }); + clientMocks.ingestEvents.mockReset().mockResolvedValue({ + received: 0, + enqueued: 0, + dropped: 0, + status: "queued", + }); resolveStepsForContextMock.mockReset().mockResolvedValue([{ type: "tool", name: "shell" }]); buildEvaluationContextMock.mockReset().mockResolvedValue({ channelType: "unknown" }); }); @@ -284,6 +294,61 @@ describe("agent-control plugin logging and blocking", () => { ); }); + it("warns when gateway warmup fails and still evaluates later tool calls", async () => { + // Given gateway warmup fails once before regular tool evaluation starts + const api = createMockApi({ + serverUrl: "http://localhost:8000", + }); + + resolveStepsForContextMock + .mockRejectedValueOnce(new Error("warmup exploded")) + .mockResolvedValue([{ type: "tool", name: "shell" }]); + + // When gateway_start and a later tool evaluation are executed + register(api.api); + await runGatewayStart(api); + const result = await runBeforeToolCall(api); + + // Then the warmup failure is only warned and later tool calls still evaluate + expect(result).toBeUndefined(); + expect(api.warn).toHaveBeenCalledWith( + expect.stringContaining("gateway_boot_warmup failed"), + ); + expect(clientMocks.evaluationEvaluate).toHaveBeenCalledOnce(); + }); + + it("waits for in-flight gateway warmup before evaluating a tool call", async () => { + // Given gateway warmup is still running when a tool call arrives + const api = createMockApi({ + serverUrl: "http://localhost:8000", + logLevel: "debug", + }); + const warmupDeferred = createDeferred<{ type: string; name: string }[]>(); + resolveStepsForContextMock + .mockImplementationOnce(() => warmupDeferred.promise) + .mockResolvedValue([{ type: "tool", name: "shell" }]); + + // When gateway_start begins warmup and before_tool_call fires before warmup completes + register(api.api); + const gatewayStartPromise = runGatewayStart(api); + await Promise.resolve(); + const beforeToolCallPromise = runBeforeToolCall(api); + await Promise.resolve(); + await Promise.resolve(); + + // Then evaluation does not begin until the warmup promise resolves + expect(clientMocks.evaluationEvaluate).not.toHaveBeenCalled(); + + warmupDeferred.resolve([{ type: "tool", name: "shell" }]); + await gatewayStartPromise; + await beforeToolCallPromise; + + expect(clientMocks.evaluationEvaluate).toHaveBeenCalledOnce(); + expect(api.info).toHaveBeenCalledWith( + expect.stringContaining("waiting_for_gateway_boot_warmup=true"), + ); + }); + it("deduplicates concurrent syncs for the same source agent", async () => { // Given two concurrent tool calls sharing the same source agent and sync promise const api = createMockApi({ @@ -309,6 +374,47 @@ describe("agent-control plugin logging and blocking", () => { expect(clientMocks.evaluationEvaluate).toHaveBeenCalledTimes(2); }); + it("resyncs immediately when steps change during an in-flight sync", async () => { + // Given one tool call changes the step catalog while another sync is still in flight + const api = createMockApi({ + serverUrl: "http://localhost:8000", + }); + const syncDeferred = createDeferred(); + clientMocks.agentsInit + .mockImplementationOnce(() => syncDeferred.promise) + .mockResolvedValueOnce(undefined); + resolveStepsForContextMock + .mockResolvedValueOnce([{ type: "tool", name: "shell" }]) + .mockResolvedValueOnce([ + { type: "tool", name: "shell" }, + { type: "tool", name: "grep" }, + ]); + + // When a second tool call updates the steps before the first sync completes + register(api.api); + const first = runBeforeToolCall(api); + await Promise.resolve(); + await Promise.resolve(); + const second = runBeforeToolCall(api, { toolName: "grep" }); + await Promise.resolve(); + await Promise.resolve(); + + expect(clientMocks.agentsInit).toHaveBeenCalledTimes(1); + + syncDeferred.resolve(undefined); + await Promise.all([first, second]); + + // Then the plugin immediately performs a second sync using the newer step catalog + expect(clientMocks.agentsInit).toHaveBeenCalledTimes(2); + expect(clientMocks.agentsInit.mock.calls[1]?.[0]).toMatchObject({ + steps: [ + { type: "tool", name: "shell" }, + { type: "tool", name: "grep" }, + ], + }); + expect(clientMocks.evaluationEvaluate).toHaveBeenCalledTimes(2); + }); + it("skips resyncing when the step catalog has not changed", async () => { // Given a source agent whose step catalog is unchanged across two tool calls const api = createMockApi({ @@ -334,9 +440,36 @@ describe("agent-control plugin logging and blocking", () => { clientMocks.evaluationEvaluate.mockResolvedValueOnce({ isSafe: false, matches: [ - { action: "deny", controlName: "alpha" }, - { action: "deny", controlName: "alpha" }, - { action: "deny", controlName: "beta" }, + { + action: "deny", + controlExecutionId: "exec-alpha-1", + controlId: 1, + controlName: "alpha", + result: { + matched: true, + confidence: 0.9, + }, + }, + { + action: "deny", + controlExecutionId: "exec-alpha-2", + controlId: 2, + controlName: "alpha", + result: { + matched: true, + confidence: 0.8, + }, + }, + { + action: "deny", + controlExecutionId: "exec-beta-1", + controlId: 3, + controlName: "beta", + result: { + matched: true, + confidence: 0.7, + }, + }, ], errors: null, }); @@ -373,4 +506,282 @@ describe("agent-control plugin logging and blocking", () => { expect.stringContaining("reason=[agent-control] blocked by policy evaluation"), ); }); + + it("emits control execution events by default when observability is not configured", async () => { + // Given observability is left unset and the synced agent returns one rendered control + const api = createMockApi({ + serverUrl: "http://localhost:8000", + }); + + clientMocks.agentsInit.mockResolvedValueOnce({ + controls: [ + { + id: 7, + name: "deny-shell", + control: { + action: { + decision: "deny", + }, + condition: { + selector: { + path: "input.command", + }, + evaluator: { + name: "regex", + config: { + pattern: "rm -rf", + }, + }, + }, + enabled: true, + execution: "server", + }, + }, + ], + }); + clientMocks.evaluationEvaluate.mockResolvedValueOnce({ + isSafe: false, + confidence: 0.5, + matches: [ + { + action: "deny", + controlExecutionId: "exec-match", + controlId: 7, + controlName: "deny-shell", + result: { + matched: true, + confidence: 0.99, + metadata: { + policy_source: "server", + }, + }, + }, + ], + errors: [ + { + action: "observe", + controlExecutionId: "exec-error", + controlId: 8, + controlName: "audit-shell", + result: { + matched: false, + confidence: 0.1, + error: "timeout", + }, + }, + ], + nonMatches: [ + { + action: "observe", + controlExecutionId: "exec-non-match", + controlId: 9, + controlName: "allow-shell", + result: { + matched: false, + confidence: 0.2, + }, + }, + ], + }); + + // When the tool call is evaluated for a named source agent + register(api.api); + await runBeforeToolCall(api, {}, { agentId: "worker-1" }); + + // Then the plugin sends one observability batch with per-control events + expect(clientMocks.ingestEvents).toHaveBeenCalledTimes(1); + const request = clientMocks.ingestEvents.mock.calls[0]?.[0]; + expect(request.events).toHaveLength(3); + + const [matchedEvent, errorEvent, nonMatchEvent] = request.events; + expect(matchedEvent).toMatchObject({ + action: "deny", + agentName: "openclaw-agent:worker-1", + appliesTo: "tool_call", + checkStage: "pre", + controlExecutionId: "exec-match", + controlId: 7, + controlName: "deny-shell", + evaluatorName: "regex", + matched: true, + selectorPath: "input.command", + }); + expect(matchedEvent.metadata).toMatchObject({ + policy_source: "server", + primary_evaluator: "regex", + primary_selector_path: "input.command", + leaf_count: 1, + all_evaluators: ["regex"], + all_selector_paths: ["input.command"], + openclaw_step_name: "shell", + openclaw_step_type: "tool", + openclaw_source_agent_id: "worker-1", + openclaw_plugin_id: "agent-control-openclaw-plugin", + openclaw_run_id: "run-1", + openclaw_tool_call_id: "call-1", + }); + expect(matchedEvent.traceId).toMatch(/^[a-f0-9]{32}$/); + expect(matchedEvent.spanId).toMatch(/^[a-f0-9]{16}$/); + + expect(errorEvent).toMatchObject({ + action: "observe", + controlExecutionId: "exec-error", + controlId: 8, + controlName: "audit-shell", + errorMessage: "timeout", + matched: false, + }); + expect(errorEvent.traceId).toBe(matchedEvent.traceId); + expect(errorEvent.spanId).toBe(matchedEvent.spanId); + + expect(nonMatchEvent).toMatchObject({ + action: "observe", + controlExecutionId: "exec-non-match", + controlId: 9, + controlName: "allow-shell", + errorMessage: null, + matched: false, + }); + expect(nonMatchEvent.traceId).toBe(matchedEvent.traceId); + expect(nonMatchEvent.spanId).toBe(matchedEvent.spanId); + }); + + it("does not fail the tool call when default observability ingestion fails", async () => { + // Given observability is left unset but the ingest request fails + const api = createMockApi({ + serverUrl: "http://localhost:8000", + }); + + clientMocks.evaluationEvaluate.mockResolvedValueOnce({ + isSafe: true, + confidence: 1, + nonMatches: [ + { + action: "observe", + controlExecutionId: "exec-non-match", + controlId: 9, + controlName: "allow-shell", + result: { + matched: false, + confidence: 0.2, + }, + }, + ], + }); + clientMocks.ingestEvents.mockRejectedValueOnce(new Error("observability offline")); + + // When the plugin evaluates the tool call + register(api.api); + const result = await runBeforeToolCall(api); + await Promise.resolve(); + await Promise.resolve(); + + // Then the tool call is still allowed and the ingest failure is only logged + expect(result).toBeUndefined(); + expect(api.warn).toHaveBeenCalledWith( + expect.stringContaining("observability_ingest failed"), + ); + }); + + it("does not emit control execution events when observability is explicitly disabled", async () => { + // Given observability is explicitly disabled in plugin configuration + const api = createMockApi({ + serverUrl: "http://localhost:8000", + observabilityEnabled: false, + }); + + clientMocks.evaluationEvaluate.mockResolvedValueOnce({ + isSafe: true, + confidence: 1, + nonMatches: [ + { + action: "observe", + controlExecutionId: "exec-non-match", + controlId: 9, + controlName: "allow-shell", + result: { + matched: false, + confidence: 0.2, + }, + }, + ], + }); + + // When the plugin evaluates a tool call + register(api.api); + const result = await runBeforeToolCall(api); + await Promise.resolve(); + await Promise.resolve(); + + // Then the tool call still proceeds and no observability batch is sent + expect(result).toBeUndefined(); + expect(clientMocks.ingestEvents).not.toHaveBeenCalled(); + }); + + it("allows the tool call when fail-open sync fails", async () => { + // Given fail-open mode and a step-resolution failure before evaluation + const api = createMockApi({ + serverUrl: "http://localhost:8000", + }); + + resolveStepsForContextMock.mockRejectedValueOnce(new Error("resolver exploded")); + + // When the plugin attempts to evaluate a tool call + register(api.api); + const result = await runBeforeToolCall(api); + + // Then the tool call is allowed to continue without evaluation + expect(result).toBeUndefined(); + expect(clientMocks.evaluationEvaluate).not.toHaveBeenCalled(); + expect(api.warn).toHaveBeenCalledWith(expect.stringContaining("unable to sync")); + expect(api.warn).not.toHaveBeenCalledWith(expect.stringContaining("blocked tool=shell")); + }); + + it("blocks the tool call when fail-closed evaluation throws", async () => { + // Given fail-closed mode and an evaluation request that throws + const api = createMockApi({ + serverUrl: "http://localhost:8000", + failClosed: true, + }); + + clientMocks.evaluationEvaluate.mockRejectedValueOnce(new Error("eval exploded")); + + // When the plugin evaluates a tool call + register(api.api); + const result = await runBeforeToolCall(api); + + // Then the tool call is blocked by the evaluation failure fallback + expect(result).toEqual({ + block: true, + blockReason: USER_BLOCK_MESSAGE, + }); + expect(api.warn.mock.calls.map(([message]) => String(message))).toEqual( + expect.arrayContaining([ + expect.stringContaining("evaluation failed"), + expect.stringContaining("reason=evaluation_failed fail_closed=true"), + ]), + ); + }); + + it("allows the tool call when fail-open evaluation throws", async () => { + // Given fail-open mode and an evaluation request that throws + const api = createMockApi({ + serverUrl: "http://localhost:8000", + }); + + clientMocks.evaluationEvaluate.mockRejectedValueOnce(new Error("eval exploded")); + + // When the plugin evaluates a tool call + register(api.api); + const result = await runBeforeToolCall(api); + + // Then the tool call is allowed and only the failure warning is emitted + expect(result).toBeUndefined(); + expect(api.warn).toHaveBeenCalledWith( + expect.stringContaining("evaluation failed for agent=default tool=shell"), + ); + expect(api.warn).not.toHaveBeenCalledWith( + expect.stringContaining("reason=evaluation_failed fail_closed=true"), + ); + }); }); diff --git a/test/observability.test.ts b/test/observability.test.ts new file mode 100644 index 0000000..7c756a3 --- /dev/null +++ b/test/observability.test.ts @@ -0,0 +1,398 @@ +import { trace } from "@opentelemetry/api"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import type { ControlExecutionEvent } from "agent-control"; +import { + buildControlExecutionEvents, + buildControlObservabilityIndex, + emitControlExecutionEvents, + resolveTraceContext, +} from "../src/observability.ts"; + +afterEach(() => { + vi.restoreAllMocks(); +}); + +describe("observability helpers", () => { + it("derives observability identity in leaf evaluation order", () => { + // Given two rendered controls with composite and leaf conditions + const index = buildControlObservabilityIndex([ + { + id: 1, + name: "composite", + control: { + action: { + decision: "observe", + }, + condition: { + and: [ + { + selector: { + path: "input", + }, + evaluator: { + name: "regex", + config: { + pattern: "alpha", + }, + }, + }, + { + not: { + selector: { + path: "context.user.role", + }, + evaluator: { + name: "list", + config: { + values: ["admin"], + }, + }, + }, + }, + ], + }, + enabled: true, + execution: "server", + }, + }, + ]); + + // When the cached observability metadata is read back + const identity = index.get(1); + + // Then the first leaf becomes the representative identity and all leaves are tracked + expect(identity).toEqual({ + selectorPath: "input", + evaluatorName: "regex", + leafCount: 2, + allEvaluators: ["regex", "list"], + allSelectorPaths: ["input", "context.user.role"], + }); + }); + + it("skips unrendered controls and traverses or branches", () => { + // Given one rendered control with an or branch and one unrendered template control + const index = buildControlObservabilityIndex([ + { + id: 1, + name: "or-control", + control: { + action: { + decision: "observe", + }, + condition: { + or: [ + { + selector: { + path: "input.command", + }, + evaluator: { + name: "regex", + config: { + pattern: "npm", + }, + }, + }, + { + selector: { + path: "context.user.role", + }, + evaluator: { + name: "equals", + config: { + value: "admin", + }, + }, + }, + ], + }, + enabled: true, + execution: "server", + }, + }, + { + id: 2, + name: "template-control", + control: { + action: { + decision: "observe", + }, + } as never, + }, + ]); + + // When the index is built from the returned controls + const identity = index.get(1); + + // Then only rendered controls are indexed and both or leaves are captured + expect(index.size).toBe(1); + expect(identity).toEqual({ + selectorPath: "input.command", + evaluatorName: "regex", + leafCount: 2, + allEvaluators: ["regex", "equals"], + allSelectorPaths: ["input.command", "context.user.role"], + }); + }); + + it("builds control execution events with category-specific error handling", () => { + // Given a cached control identity and a mixed evaluation response + const events = buildControlExecutionEvents({ + evaluation: { + isSafe: false, + confidence: 0.5, + matches: [ + { + action: "deny", + controlExecutionId: "exec-match", + controlId: 1, + controlName: "deny-shell", + result: { + matched: true, + confidence: 0.9, + metadata: { + outcome: "match", + }, + }, + }, + ], + errors: [ + { + action: "observe", + controlExecutionId: "exec-error", + controlId: 2, + controlName: "audit-shell", + result: { + matched: false, + confidence: 0.1, + error: "timeout", + }, + }, + ], + nonMatches: [ + { + action: "observe", + controlExecutionId: "exec-non-match", + controlId: 3, + controlName: "allow-shell", + result: { + matched: false, + confidence: 0.2, + error: "ignored", + }, + }, + ], + }, + agentName: "openclaw-agent:worker-1", + stepName: "shell", + stepType: "tool", + checkStage: "pre", + traceContext: { + traceId: "a".repeat(32), + spanId: "b".repeat(16), + }, + controlObservabilityById: new Map([ + [ + 1, + { + selectorPath: "input.command", + evaluatorName: "regex", + leafCount: 1, + allEvaluators: ["regex"], + allSelectorPaths: ["input.command"], + }, + ], + ]), + sourceAgentId: "worker-1", + pluginId: "agent-control-openclaw-plugin", + runId: "run-1", + toolCallId: "call-1", + }); + + // When the events are built + const [matchEvent, errorEvent, nonMatchEvent] = events; + + // Then each category maps to the correct matched/error fields + expect(matchEvent).toMatchObject({ + matched: true, + errorMessage: null, + evaluatorName: "regex", + selectorPath: "input.command", + traceId: "a".repeat(32), + spanId: "b".repeat(16), + }); + expect(matchEvent.metadata).toMatchObject({ + outcome: "match", + primary_evaluator: "regex", + openclaw_step_name: "shell", + openclaw_tool_call_id: "call-1", + }); + + expect(errorEvent).toMatchObject({ + matched: false, + errorMessage: "timeout", + evaluatorName: null, + selectorPath: null, + }); + expect(nonMatchEvent).toMatchObject({ + matched: false, + errorMessage: null, + }); + }); + + it("falls back to generated trace identifiers when no active span exists", () => { + // Given no active OpenTelemetry span + const traceContext = resolveTraceContext(); + + // When trace context is resolved + expect(traceContext.traceId).toMatch(/^[a-f0-9]{32}$/); + expect(traceContext.spanId).toMatch(/^[a-f0-9]{16}$/); + }); + + it("reuses the active OpenTelemetry span context when it is valid", () => { + // Given a valid active span context from OpenTelemetry + vi.spyOn(trace, "getActiveSpan").mockReturnValue({ + spanContext: () => ({ + traceId: "1".repeat(32), + spanId: "2".repeat(16), + traceFlags: 1, + }), + } as never); + vi.spyOn(trace, "isSpanContextValid").mockReturnValue(true); + + // When trace context is resolved + const traceContext = resolveTraceContext(); + + // Then the active span identifiers are preserved + expect(traceContext).toEqual({ + traceId: "1".repeat(32), + spanId: "2".repeat(16), + }); + }); + + it("logs ingestion failures without throwing", async () => { + // Given one event and an ingest client that fails + const logger = { + info: vi.fn(), + debug: vi.fn(), + warn: vi.fn(), + block: vi.fn(), + }; + const ingestEvents = vi.fn().mockRejectedValue(new Error("boom")); + const events: ControlExecutionEvent[] = [ + { + action: "observe", + agentName: "openclaw-agent:worker-1", + appliesTo: "tool_call", + checkStage: "pre", + confidence: 1, + controlId: 1, + controlName: "allow-shell", + matched: false, + spanId: "b".repeat(16), + traceId: "a".repeat(32), + }, + ]; + + // When the fire-and-forget ingest path runs + emitControlExecutionEvents({ + client: { + observability: { + ingestEvents, + }, + } as never, + logger, + events, + agentName: "openclaw-agent:worker-1", + stepName: "shell", + }); + await Promise.resolve(); + await Promise.resolve(); + + // Then the error is logged and the caller is not forced to await or catch + expect(ingestEvents).toHaveBeenCalledWith({ events }); + expect(logger.warn).toHaveBeenCalledWith( + expect.stringContaining("observability_ingest failed"), + ); + }); + + it("returns early when there are no events to ingest", () => { + // Given an empty event batch + const logger = { + info: vi.fn(), + debug: vi.fn(), + warn: vi.fn(), + block: vi.fn(), + }; + const ingestEvents = vi.fn(); + + // When the fire-and-forget ingest path runs + emitControlExecutionEvents({ + client: { + observability: { + ingestEvents, + }, + } as never, + logger, + events: [], + agentName: "openclaw-agent:worker-1", + stepName: "shell", + }); + + // Then no ingest call or log is emitted + expect(ingestEvents).not.toHaveBeenCalled(); + expect(logger.warn).not.toHaveBeenCalled(); + expect(logger.debug).not.toHaveBeenCalled(); + }); + + it("warns when observability ingestion only partially enqueues events", async () => { + // Given one event and an ingest client that partially accepts the batch + const logger = { + info: vi.fn(), + debug: vi.fn(), + warn: vi.fn(), + block: vi.fn(), + }; + const events: ControlExecutionEvent[] = [ + { + action: "observe", + agentName: "openclaw-agent:worker-1", + appliesTo: "tool_call", + checkStage: "pre", + confidence: 1, + controlId: 1, + controlName: "allow-shell", + matched: false, + spanId: "b".repeat(16), + traceId: "a".repeat(32), + }, + ]; + const ingestEvents = vi.fn().mockResolvedValue({ + received: 1, + enqueued: 0, + dropped: 1, + status: "partial", + }); + + // When the fire-and-forget ingest path runs + emitControlExecutionEvents({ + client: { + observability: { + ingestEvents, + }, + } as never, + logger, + events, + agentName: "openclaw-agent:worker-1", + stepName: "shell", + }); + await Promise.resolve(); + await Promise.resolve(); + + // Then the partial enqueue result is warned instead of logged as success + expect(logger.warn).toHaveBeenCalledWith( + expect.stringContaining("observability_ingest partial"), + ); + expect(logger.debug).not.toHaveBeenCalled(); + }); +}); diff --git a/test/session-context.test.ts b/test/session-context.test.ts index 8485dad..714e480 100644 --- a/test/session-context.test.ts +++ b/test/session-context.test.ts @@ -50,6 +50,7 @@ describe("buildEvaluationContext", () => { steps: [], stepsHash: "hash-1", lastSyncedStepsHash: null, + controlObservabilityById: new Map(), syncPromise: null, }, event: { @@ -103,6 +104,7 @@ describe("buildEvaluationContext", () => { steps: [{ type: "tool" as const, name: "shell" }], stepsHash: "hash-1", lastSyncedStepsHash: "hash-0", + controlObservabilityById: new Map(), syncPromise: null, }, event: {}, @@ -158,6 +160,7 @@ describe("buildEvaluationContext", () => { steps: [], stepsHash: "hash-1", lastSyncedStepsHash: null, + controlObservabilityById: new Map(), syncPromise: null, }, event: {},