Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions evaluators/contrib/atr/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
.PHONY: help test lint lint-fix typecheck build

help:
@echo "Agent Control Evaluator - ATR Threat Rules - Makefile commands"
@echo " make test - run pytest"
@echo " make lint - run ruff check"
@echo " make lint-fix - run ruff check --fix"
@echo " make typecheck - run mypy"
@echo " make build - build package"

test:
uv run --with pytest --with pytest-asyncio --with pytest-cov pytest tests --cov=src --cov-report=xml:../../../coverage-evaluators-atr.xml -q

lint:
uv run --with ruff ruff check --config ../../../pyproject.toml src/

lint-fix:
uv run --with ruff ruff check --config ../../../pyproject.toml --fix src/

typecheck:
uv run --with mypy mypy --config-file ../../../pyproject.toml src/

build:
uv build
47 changes: 47 additions & 0 deletions evaluators/contrib/atr/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# ATR Threat Rules Evaluator for Agent Control

Regex-based AI agent threat detection using [ATR (Agent Threat Rules)](https://agentthreatrule.org) community rules.

## Features

- 20 bundled rules covering OWASP Agentic Top 10 categories
- Pure regex detection -- no API keys, no external calls
- Sub-5ms evaluation time
- Configurable severity threshold and category filtering
- Auto-discovered via Python entry points

## Categories

| Category | Rules | Description |
|----------|-------|-------------|
| prompt-injection | 5 | Direct, indirect, jailbreak, system override, multi-turn |
| agent-manipulation | 2 | Cross-agent attacks, goal hijacking |
| context-exfiltration | 2 | Data exfil via tools, context window leaks |
| privilege-escalation | 2 | Unauthorized escalation, role assumption |
| tool-poisoning | 5 | Tool definition poisoning, hidden instructions, credentials, reverse shell |
| skill-compromise | 1 | Malicious skill installation |
| excessive-autonomy | 2 | Unauthorized actions, safety bypass |
| data-poisoning | 1 | Training data poisoning |

## Configuration

```python
from agent_control_evaluator_atr.threat_rules import ATRConfig

config = ATRConfig(
min_severity="medium", # "low", "medium", "high", "critical"
block_on_match=True, # matched=True when threat detected
categories=[], # empty = all categories
on_error="allow", # "allow" (fail-open) or "deny" (fail-closed)
)
```

## Installation

```bash
uv pip install -e evaluators/contrib/atr
```

## License

Apache-2.0. ATR rules are MIT-licensed.
42 changes: 42 additions & 0 deletions evaluators/contrib/atr/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
[project]
name = "agent-control-evaluator-atr"
version = "0.1.0"
description = "ATR (Agent Threat Rules) evaluator for agent-control"
readme = "README.md"
requires-python = ">=3.12"
license = { text = "Apache-2.0" }
authors = [{ name = "ATR Community" }]
dependencies = [
"agent-control-evaluators>=3.0.0",
"agent-control-models>=3.0.0",
]

[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"pytest-asyncio>=0.23.0",
"pytest-cov>=4.0.0",
"ruff>=0.1.0",
"mypy>=1.8.0",
]

[project.entry-points."agent_control.evaluators"]
"atr.threat_rules" = "agent_control_evaluator_atr.threat_rules:ATREvaluator"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["src/agent_control_evaluator_atr"]

[tool.ruff]
line-length = 100
target-version = "py312"

[tool.ruff.lint]
select = ["E", "F", "I"]

[tool.uv.sources]
agent-control-evaluators = { path = "../../builtin", editable = true }
agent-control-models = { path = "../../../models", editable = true }
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__all__: list[str] = []
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .config import ATRConfig
from .evaluator import ATREvaluator

__all__ = ["ATREvaluator", "ATRConfig"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from __future__ import annotations

from typing import Literal

from agent_control_evaluators import EvaluatorConfig
from pydantic import Field


class ATRConfig(EvaluatorConfig):
"""Configuration for ATR (Agent Threat Rules) evaluator.

Attributes:
min_severity: Minimum severity level to match ("low", "medium", "high", "critical")
block_on_match: Whether to set matched=True when a threat is detected
categories: Category filter; empty list means all categories
on_error: Error policy ("allow" = fail-open, "deny" = fail-closed)
"""

min_severity: Literal["low", "medium", "high", "critical"] = "medium"
block_on_match: bool = True
categories: list[str] = Field(default_factory=list)
on_error: Literal["allow", "deny"] = "allow"
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
from __future__ import annotations

import json
import re
from pathlib import Path
from typing import Any

from agent_control_evaluators import (
Evaluator,
EvaluatorMetadata,
register_evaluator,
)
from agent_control_models import EvaluatorResult

from .config import ATRConfig

_SEVERITY_ORDER: dict[str, int] = {
"low": 0,
"medium": 1,
"high": 2,
"critical": 3,
}

_SEVERITY_CONFIDENCE: dict[str, float] = {
"low": 0.6,
"medium": 0.75,
"high": 0.9,
"critical": 0.99,
}

_RULES_PATH = Path(__file__).parent / "rules.json"


def _load_rules(path: Path) -> list[dict[str, Any]]:
"""Load ATR rules from the bundled JSON file."""
with path.open(encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, list):
raise ValueError(f"Expected list of rules, got {type(data).__name__}")
return data


def _coerce_to_string(data: Any) -> str:
"""Convert arbitrary input data to a string for pattern matching."""
if data is None:
return ""
if isinstance(data, str):
return data
if isinstance(data, dict):
# Scan all common content fields, not just the first match
parts = []
for key in ("content", "input", "output", "text", "message"):
if key in data and data[key] is not None:
parts.append(str(data[key]))
if parts:
return "\n".join(parts)
# Fall back to JSON serialization
try:
return json.dumps(data, ensure_ascii=False, sort_keys=True, default=str)
except TypeError:
return str(data)
if isinstance(data, (int, float, bool)):
return str(data)
if isinstance(data, (list, tuple)):
try:
return json.dumps(data, ensure_ascii=False, default=str)
except TypeError:
return str(data)
return str(data)


@register_evaluator
class ATREvaluator(Evaluator[ATRConfig]):
"""ATR (Agent Threat Rules) evaluator.

Regex-based AI agent threat detection using community rules.
No external API calls or keys required.
"""

metadata = EvaluatorMetadata(
name="atr.threat_rules",
version="0.1.0",
description="Regex-based AI agent threat detection using ATR community rules",
requires_api_key=False,
timeout_ms=5000,
)

config_model = ATRConfig

@classmethod
def is_available(cls) -> bool:
"""Always available -- no optional dependencies."""
return _RULES_PATH.exists()

def __init__(self, config: ATRConfig) -> None:
super().__init__(config)
self.config = config

# Load and filter rules eagerly
raw_rules = _load_rules(_RULES_PATH)

min_level = _SEVERITY_ORDER.get(self.config.min_severity, 1)
allowed_categories = set(self.config.categories) if self.config.categories else None

self._compiled_rules: list[dict[str, Any]] = []
for rule in raw_rules:
severity = rule.get("severity", "medium").lower()
if _SEVERITY_ORDER.get(severity, 0) < min_level:
continue

category = rule.get("category", "")
if allowed_categories and category not in allowed_categories:
continue

compiled_patterns: list[dict[str, Any]] = []
for p in rule.get("patterns", []):
try:
compiled_patterns.append({
"regex": re.compile(p["pattern"], re.IGNORECASE),
"description": p.get("description", ""),
})
except re.error:
# Skip invalid patterns rather than failing entirely
continue

if compiled_patterns:
self._compiled_rules.append({
"id": rule.get("id", "unknown"),
"title": rule.get("title", ""),
"severity": severity,
"category": category,
"confidence": _SEVERITY_CONFIDENCE.get(severity, 0.75),
"patterns": compiled_patterns,
})

async def evaluate(self, data: Any) -> EvaluatorResult: # noqa: D401
"""Evaluate input data against ATR threat rules."""
if data is None:
return EvaluatorResult(matched=False, confidence=1.0, message="No data")

try:
text = _coerce_to_string(data)
except Exception as e: # noqa: BLE001
return self._error_result(f"Failed to coerce input: {e}")

if not text:
return EvaluatorResult(matched=False, confidence=1.0, message="Empty input")

try:
return self._match_rules(text)
except Exception as e: # noqa: BLE001
return self._error_result(f"ATR evaluation error: {e}")

def _match_rules(self, text: str) -> EvaluatorResult:
"""Run all compiled rules against the text and return all matches."""
all_findings: list[dict[str, Any]] = []
max_confidence = 0.0

for rule in self._compiled_rules:
for pattern_entry in rule["patterns"]:
regex: re.Pattern[str] = pattern_entry["regex"]
match = regex.search(text)
if match:
all_findings.append({
"rule_id": rule["id"],
"title": rule["title"],
"severity": rule["severity"],
"category": rule["category"],
"matched_text": match.group()[:200],
"pattern_description": pattern_entry["description"],
})
max_confidence = max(max_confidence, rule["confidence"])
break # one match per rule is enough, but continue to other rules

if all_findings:
matched = self.config.block_on_match
return EvaluatorResult(
matched=matched,
confidence=max_confidence,
message=f"ATR: {len(all_findings)} threat(s) detected",
metadata={
"findings": all_findings,
"count": len(all_findings),
"max_severity": all_findings[0]["severity"] if all_findings else None,
# Keep backward-compatible single-match fields
"rule_id": all_findings[0]["rule_id"],
"title": all_findings[0]["title"],
"severity": all_findings[0]["severity"],
"category": all_findings[0]["category"],
"matched_text": all_findings[0]["matched_text"],
"pattern_description": all_findings[0]["pattern_description"],
},
)

return EvaluatorResult(
matched=False,
confidence=1.0,
message="ATR: No threats detected",
)

def _error_result(self, error_detail: str) -> EvaluatorResult:
"""Build an error result respecting the on_error policy."""
fallback = self.config.on_error
if fallback == "deny":
# fail-closed: matched=True, error=None (to satisfy model validator)
return EvaluatorResult(
matched=True,
confidence=0.0,
message=f"ATR evaluation error (fail-closed): {error_detail}",
metadata={"error": error_detail, "fallback_action": "deny"},
)
# fail-open: matched=False, error set
return EvaluatorResult(
matched=False,
confidence=0.0,
message=f"ATR evaluation error: {error_detail}",
metadata={"error": error_detail, "fallback_action": "allow"},
error=error_detail,
)

async def aclose(self) -> None:
"""No resources to clean up."""
Loading