diff --git a/.gitignore b/.gitignore index ac8d57c..e69de29 100644 Binary files a/.gitignore and b/.gitignore differ diff --git a/libs/schemas/graph.py b/libs/schemas/graph.py new file mode 100644 index 0000000..d92b396 --- /dev/null +++ b/libs/schemas/graph.py @@ -0,0 +1,32 @@ +""" +Pydantic schemas for Scene Graph nodes and edges. +""" +from enum import Enum +from typing import Optional +from pydantic import BaseModel + + +class NodeType(str, Enum): + PERSON = "Person" + ZONE = "Zone" + OBJECT = "Object" + + +class EdgeType(str, Enum): + INSIDE = "INSIDE" + NEAR = "NEAR" + INTERACTING_WITH = "INTERACTING_WITH" + ENTERED_FROM = "ENTERED_FROM" + + +class GraphNode(BaseModel): + id: str # e.g. "Person #3", "Keypad_01" + node_type: NodeType + label: Optional[str] = None # human-readable label + + +class GraphEdge(BaseModel): + source: str # node id + target: str # node id + edge_type: EdgeType + distance_px: Optional[float] = None # for NEAR edges diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..adade78 Binary files /dev/null and b/requirements.txt differ diff --git a/services/__init__.py b/services/__init__.py index 8b13789..e69de29 100644 --- a/services/__init__.py +++ b/services/__init__.py @@ -1 +0,0 @@ - diff --git a/services/reasoning/__init__.py b/services/reasoning/__init__.py index 762a966..e69de29 100644 --- a/services/reasoning/__init__.py +++ b/services/reasoning/__init__.py @@ -1,21 +0,0 @@ -""" -services/reasoning — VLM captioning and LLM reasoning layer. - -Set ``VLM_PROVIDER=mock`` to use the deterministic mock implementations -that require no GPU or external API. -""" -from services.reasoning.mock_vlm import ( - MockVLMCaptioner, - MockLLMReasoner, - ReasoningOutput, - get_vlm_captioner, - get_llm_reasoner, -) - -__all__ = [ - "MockVLMCaptioner", - "MockLLMReasoner", - "ReasoningOutput", - "get_vlm_captioner", - "get_llm_reasoner", -] diff --git a/services/reasoning/prompts.py b/services/reasoning/prompts.py index 8721fbc..48fefcc 100644 --- a/services/reasoning/prompts.py +++ b/services/reasoning/prompts.py @@ -1,19 +1,31 @@ -def build_reasoning_prompt(graph_text: str) -> str: +""" +Prompt builders for LLM-based surveillance reasoning. +""" - prompt = f""" -You are an AI surveillance reasoning system. -Analyze the following scene graph and identify: +# --------------------------------------------------------------------------- +# Scene Graph Prompt Integration +# --------------------------------------------------------------------------- -- suspicious behavior -- restricted zone violations -- unusual interactions -- possible threats +from services.reasoning.scene_graph import SceneGraph -Scene Graph: -{graph_text} -Provide a short reasoning summary. -""" +def build_reasoning_prompt(event_description: str, scene_graph: SceneGraph) -> str: + """ + Combine a scene graph snapshot with a natural-language event description + into a single structured prompt for LLM reasoning. + + Keeps total context compact and well under model context limits. + """ + graph_context = scene_graph.to_prompt_str() + + prompt = f"""{graph_context} + +Event description: +{event_description} + +Based on the scene graph and event above, analyze whether this activity is suspicious. +Consider spatial relationships, zone access, and object interactions. +Be concise and structured in your response.""" - return prompt \ No newline at end of file + return prompt diff --git a/services/reasoning/scene_graph.py b/services/reasoning/scene_graph.py index 348b507..a3dac82 100644 --- a/services/reasoning/scene_graph.py +++ b/services/reasoning/scene_graph.py @@ -1,147 +1,127 @@ -# services/reasoning/scene_graph.py - -import math +""" +Scene Graph Manager for surveillance reasoning. +Builds a dynamic NetworkX MultiDiGraph from a TrackedFrame +and serializes it into an LLM-ready prompt snippet. +""" +from __future__ import annotations import networkx as nx - -from libs.observability.metrics import reasoning_triggers_total -from services.detection.zones import DEFAULT_ZONES - -INTERACTION_OBJECTS = [ - "backpack", - "handbag", - "cell phone", - "laptop" -] -class SceneGraphBuilder: - def serialize_graph(self): - - serialized = [] - - for source, target, data in self.graph.edges(data=True): - - relation = data.get("relation") - - edge_text = f"{source} -> [{relation}] -> {target}" - - if "distance" in data: - - edge_text += f" (distance={data['distance']})" - - serialized.append(edge_text) - - return "\n".join(serialized) - - def __init__(self, det_frame): - - self.det_frame = det_frame - self.graph = nx.MultiDiGraph() - - def calculate_distance(self, center1, center2): - - x1, y1 = center1 - x2, y2 = center2 - - return math.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2) - - def build_graph(self): - - reasoning_triggers_total.inc() - - self.graph.clear() - - # Add zone nodes - for zone in DEFAULT_ZONES: - - self.graph.add_node( - zone.name, - type="zone" - ) - - detections = self.det_frame.detections - - # Add detection nodes - for idx, det in enumerate(detections): - - node_name = f"{det.label}_{idx}" - - self.graph.add_node( - node_name, - type=det.label, - center=det.center - ) - - # Zone relationships - for zone_name in det.zones_present: - - self.graph.add_edge( - node_name, - zone_name, - relation="INSIDE" - ) - - # Object relationships - for i in range(len(detections)): - - for j in range(i + 1, len(detections)): - - det1 = detections[i] - det2 = detections[j] - - node1 = f"{det1.label}_{i}" - node2 = f"{det2.label}_{j}" - - dist = self.calculate_distance( - det1.center, - det2.center - ) - - # NEAR relationship - if dist < 150: - - self.graph.add_edge( - node1, - node2, - relation="NEAR", - distance=round(dist, 2) - ) - - person_node = None - object_node = None - - # Interaction detection - if ( - det1.label == "person" - and det2.label in INTERACTION_OBJECTS - ): - - person_node = node1 - object_node = node2 - - elif ( - det2.label == "person" - and det1.label in INTERACTION_OBJECTS - ): - - person_node = node2 - object_node = node1 - - # INTERACTING_WITH - if person_node and dist < 60: - - self.graph.add_edge( - person_node, - object_node, - relation="INTERACTING_WITH" - ) - - # HOLDING - elif person_node and 60 <= dist < 80: - - self.graph.add_edge( - person_node, - object_node, - relation="HOLDING" - ) - - return self.graph \ No newline at end of file +from typing import Optional +from libs.schemas.graph import GraphNode, GraphEdge, NodeType, EdgeType + + +class SceneGraph: + """Wraps a NetworkX MultiDiGraph with helper methods for surveillance scenes.""" + + MAX_PROMPT_TOKENS = 300 + + def __init__(self, timestamp: float = 0.0): + self.timestamp = timestamp + self.graph: nx.MultiDiGraph = nx.MultiDiGraph() + + def add_node(self, node: GraphNode) -> None: + self.graph.add_node( + node.id, + node_type=node.node_type.value, + label=node.label or node.id, + ) + + def add_edge(self, edge: GraphEdge) -> None: + attrs = {"edge_type": edge.edge_type.value} + if edge.distance_px is not None: + attrs["distance_px"] = edge.distance_px + self.graph.add_edge(edge.source, edge.target, **attrs) + + @classmethod + def from_tracked_frame(cls, frame) -> "SceneGraph": + sg = cls(timestamp=getattr(frame, "timestamp", 0.0)) + + # Add zones (skip if missing id) + for zone in getattr(frame, "zones", []): + zone_id = zone.get("id") + if zone_id: + sg.add_node(GraphNode(id=zone_id, node_type=NodeType.ZONE)) + + # Add objects and their belongs_to relationships + for obj in getattr(frame, "objects", []): + obj_id = obj.get("id") + if not obj_id: + continue + sg.add_node(GraphNode(id=obj_id, node_type=NodeType.OBJECT)) + belongs_to = obj.get("belongs_to") + if belongs_to: + sg.add_node(GraphNode(id=belongs_to, node_type=NodeType.ZONE)) + sg.add_edge(GraphEdge( + source=obj_id, + target=belongs_to, + edge_type=EdgeType.INSIDE, + )) + + # Add persons and their relationships + for person in getattr(frame, "persons", []): + pid = person.get("id") + if not pid: + continue + sg.add_node(GraphNode(id=pid, node_type=NodeType.PERSON)) + + # INSIDE relationship + zone = person.get("zone") + if zone: + sg.add_node(GraphNode(id=zone, node_type=NodeType.ZONE)) + sg.add_edge(GraphEdge( + source=pid, + target=zone, + edge_type=EdgeType.INSIDE, + )) + + # NEAR relationships + for nearby in person.get("nearby_objects", []): + obj_id = nearby.get("id") + if not obj_id: + continue + dist = nearby.get("distance_px") + sg.add_node(GraphNode(id=obj_id, node_type=NodeType.OBJECT)) + sg.add_edge(GraphEdge( + source=pid, + target=obj_id, + edge_type=EdgeType.NEAR, + distance_px=dist, + )) + + # INTERACTING_WITH relationships + for obj_id in person.get("interacting_with", []): + if not obj_id: + continue + sg.add_node(GraphNode(id=obj_id, node_type=NodeType.OBJECT)) + sg.add_edge(GraphEdge( + source=pid, + target=obj_id, + edge_type=EdgeType.INTERACTING_WITH, + )) + + return sg + + def to_prompt_str(self) -> str: + lines = [f"Scene graph at t={self.timestamp:.1f}s:", ""] + + for src, dst, data in self.graph.edges(data=True): + edge_type = data.get("edge_type", "RELATED_TO") + dist = data.get("distance_px") + if edge_type == EdgeType.NEAR.value and dist is not None: + edge_label = f"{edge_type}({int(dist)}px)" + else: + edge_label = edge_type + lines.append(f"{src} \u2192 {edge_label} \u2192 {dst}") + + serialized = "\n".join(lines) + # Rough trim if over 200 words (≈260 tokens, safe under 300) + words = serialized.split() + if len(words) > 200: + serialized = " ".join(words[:200]) + "\n[...truncated]" + return serialized + + def node_count(self) -> int: + return self.graph.number_of_nodes() + + def edge_count(self) -> int: + return self.graph.number_of_edges() diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_scene_graph.py b/tests/test_scene_graph.py index 6c9cc3f..2fadcfc 100644 --- a/tests/test_scene_graph.py +++ b/tests/test_scene_graph.py @@ -1,128 +1,137 @@ -# tests/test_scene_graph.py - - - - - -from services.detection.detection import Detection, DetectionFrame -from services.reasoning.scene_graph import SceneGraphBuilder - - -def test_scene_graph_build(): - - - person = Detection( - label="person", - bbox=[600, 250, 700, 430], - confidence=0.96, - center=(650, 340), - zones_present=[ - "restricted_door", - "keypad_area" - ] - ) - - phone = Detection( - label="cell phone", - bbox=[660, 330, 690, 360], - confidence=0.90, - center=(675, 345), - zones_present=[ - "restricted_door", - "keypad_area" - ] - ) - - # Backpack close to person - - backpack = Detection( - label="backpack", - bbox=[610, 320, 680, 410], - confidence=0.88, - center=(645, 365), - zones_present=[ - "restricted_door", - "keypad_area" - ] - ) - - person2 = Detection( - label="person", - bbox=[100, 100, 220, 350], - confidence=0.93, - center=(160, 225), - zones_present=[ - "safe_corridor" - ] - ) - - det_frame = DetectionFrame( - frame_id=25, - detections=[ - person, - phone, - backpack, - person2 - ], - timestamp_ms=5000 - ) - - builder = SceneGraphBuilder(det_frame) - - graph = builder.build_graph() - - assert graph.has_node("person_0") - - assert graph.has_node("cell phone_1") - - assert graph.has_node("backpack_2") - - assert graph.has_node("person_3") - - assert graph.has_node("restricted_door") - - assert graph.has_node("keypad_area") - - assert graph.has_node("safe_corridor") - - assert graph.has_edge( - "person_0", - "restricted_door" - ) - - assert graph.has_edge( - "person_0", - "keypad_area" - ) - - assert graph.has_edge( - "person_3", - "safe_corridor" - ) - - - - assert graph.has_edge( - "person_0", - "cell phone_1" - ) - - assert graph.has_edge( - "person_0", - "backpack_2" +""" +Unit tests for SceneGraph builder and serializer. +""" +import pytest +from types import SimpleNamespace +from services.reasoning.scene_graph import SceneGraph +from libs.schemas.graph import GraphNode, GraphEdge, NodeType, EdgeType + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def make_frame(timestamp=22.4, persons=None, objects=None, zones=None): + return SimpleNamespace( + timestamp=timestamp, + persons=persons or [], + objects=objects or [], + zones=zones or [], ) - # Print graph nodes - - print("\nGRAPH NODES:\n") - - for node, data in graph.nodes(data=True): - print(node, data) - - - # Print graph edges - print("\nGRAPH EDGES:\n") +# --------------------------------------------------------------------------- +# Tests: manual graph construction +# --------------------------------------------------------------------------- + +class TestSceneGraphManual: + + def test_add_person_node(self): + sg = SceneGraph(timestamp=1.0) + sg.add_node(GraphNode(id="Person #1", node_type=NodeType.PERSON)) + assert sg.node_count() == 1 + + def test_add_edge(self): + sg = SceneGraph() + sg.add_node(GraphNode(id="Person #1", node_type=NodeType.PERSON)) + sg.add_node(GraphNode(id="Zone_A", node_type=NodeType.ZONE)) + sg.add_edge(GraphEdge(source="Person #1", target="Zone_A", edge_type=EdgeType.INSIDE)) + assert sg.edge_count() == 1 + + def test_near_edge_with_distance(self): + sg = SceneGraph() + sg.add_node(GraphNode(id="Person #3", node_type=NodeType.PERSON)) + sg.add_node(GraphNode(id="Keypad_01", node_type=NodeType.OBJECT)) + sg.add_edge(GraphEdge( + source="Person #3", target="Keypad_01", + edge_type=EdgeType.NEAR, distance_px=38.0, + )) + data = sg.graph.edges["Person #3", "Keypad_01", 0] + assert data["distance_px"] == 38.0 + + +# --------------------------------------------------------------------------- +# Tests: from_tracked_frame +# --------------------------------------------------------------------------- + +class TestFromTrackedFrame: + + def test_empty_frame(self): + frame = make_frame() + sg = SceneGraph.from_tracked_frame(frame) + assert sg.node_count() == 0 + assert sg.edge_count() == 0 + + def test_person_inside_zone(self): + frame = make_frame(persons=[ + {"id": "Person #3", "zone": "restricted_door", + "nearby_objects": [], "interacting_with": []} + ]) + sg = SceneGraph.from_tracked_frame(frame) + assert sg.node_count() == 2 # person + zone + assert sg.edge_count() == 1 # INSIDE + + def test_full_scene(self): + frame = make_frame( + timestamp=22.4, + persons=[{ + "id": "Person #3", + "zone": "restricted_door", + "nearby_objects": [{"id": "Keypad_01", "distance_px": 38}], + "interacting_with": ["Keypad_01"], + }], + objects=[{"id": "Keypad_01", "type": "keypad", "belongs_to": "restricted_door"}], + ) + sg = SceneGraph.from_tracked_frame(frame) + # Nodes: Person #3, restricted_door, Keypad_01 ? 3 + assert sg.node_count() == 3 + # Edges: INSIDE(person?zone), NEAR, INTERACTING_WITH, INSIDE(keypad?door) ? 4 + assert sg.edge_count() == 4 + + def test_timestamp_stored(self): + frame = make_frame(timestamp=99.9) + sg = SceneGraph.from_tracked_frame(frame) + assert sg.timestamp == 99.9 + + +# --------------------------------------------------------------------------- +# Tests: serialization +# --------------------------------------------------------------------------- + +class TestPromptSerialization: + + def _full_sg(self): + frame = make_frame( + timestamp=22.4, + persons=[{ + "id": "Person #3", + "zone": "restricted_door", + "nearby_objects": [{"id": "Keypad_01", "distance_px": 38}], + "interacting_with": ["Keypad_01"], + }], + ) + return SceneGraph.from_tracked_frame(frame) + + def test_header_present(self): + sg = self._full_sg() + prompt = sg.to_prompt_str() + assert "t=22.4s" in prompt + + def test_near_distance_in_prompt(self): + sg = self._full_sg() + prompt = sg.to_prompt_str() + assert "NEAR(38px)" in prompt + + def test_under_token_budget(self): + sg = self._full_sg() + prompt = sg.to_prompt_str() + # Rough token estimate: words * 1.3 + estimated_tokens = len(prompt.split()) * 1.3 + assert estimated_tokens < 300, f"Prompt too long: ~{estimated_tokens:.0f} tokens" + + def test_empty_graph_serialization(self): + sg = SceneGraph(timestamp=0.0) + prompt = sg.to_prompt_str() + assert "t=0.0s" in prompt + assert "?" not in prompt # no edges - for source, target, data in graph.edges(data=True): - print(f"{source} ---> {target} | {data}")