Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified .gitignore
Binary file not shown.
32 changes: 32 additions & 0 deletions libs/schemas/graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
Pydantic schemas for Scene Graph nodes and edges.
"""
from enum import Enum
from typing import Optional
from pydantic import BaseModel


class NodeType(str, Enum):
PERSON = "Person"
ZONE = "Zone"
OBJECT = "Object"


class EdgeType(str, Enum):
INSIDE = "INSIDE"
NEAR = "NEAR"
INTERACTING_WITH = "INTERACTING_WITH"
ENTERED_FROM = "ENTERED_FROM"


class GraphNode(BaseModel):
id: str # e.g. "Person #3", "Keypad_01"
node_type: NodeType
label: Optional[str] = None # human-readable label


class GraphEdge(BaseModel):
source: str # node id
target: str # node id
edge_type: EdgeType
distance_px: Optional[float] = None # for NEAR edges
Binary file added requirements.txt
Binary file not shown.
1 change: 0 additions & 1 deletion services/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@

21 changes: 0 additions & 21 deletions services/reasoning/__init__.py
Original file line number Diff line number Diff line change
@@ -1,21 +0,0 @@
"""
services/reasoning — VLM captioning and LLM reasoning layer.

Set ``VLM_PROVIDER=mock`` to use the deterministic mock implementations
that require no GPU or external API.
"""
from services.reasoning.mock_vlm import (
MockVLMCaptioner,
MockLLMReasoner,
ReasoningOutput,
get_vlm_captioner,
get_llm_reasoner,
)

__all__ = [
"MockVLMCaptioner",
"MockLLMReasoner",
"ReasoningOutput",
"get_vlm_captioner",
"get_llm_reasoner",
]
38 changes: 25 additions & 13 deletions services/reasoning/prompts.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
def build_reasoning_prompt(graph_text: str) -> str:
"""
Prompt builders for LLM-based surveillance reasoning.
"""

prompt = f"""
You are an AI surveillance reasoning system.

Analyze the following scene graph and identify:
# ---------------------------------------------------------------------------
# Scene Graph Prompt Integration
# ---------------------------------------------------------------------------

- suspicious behavior
- restricted zone violations
- unusual interactions
- possible threats
from services.reasoning.scene_graph import SceneGraph

Scene Graph:
{graph_text}

Provide a short reasoning summary.
"""
def build_reasoning_prompt(event_description: str, scene_graph: SceneGraph) -> str:
"""
Combine a scene graph snapshot with a natural-language event description
into a single structured prompt for LLM reasoning.

Keeps total context compact and well under model context limits.
"""
graph_context = scene_graph.to_prompt_str()

prompt = f"""{graph_context}

Event description:
{event_description}

Based on the scene graph and event above, analyze whether this activity is suspicious.
Consider spatial relationships, zone access, and object interactions.
Be concise and structured in your response."""

return prompt
return prompt
270 changes: 125 additions & 145 deletions services/reasoning/scene_graph.py
Original file line number Diff line number Diff line change
@@ -1,147 +1,127 @@
# services/reasoning/scene_graph.py

import math
"""
Scene Graph Manager for surveillance reasoning.
Builds a dynamic NetworkX MultiDiGraph from a TrackedFrame
and serializes it into an LLM-ready prompt snippet.
"""
from __future__ import annotations

import networkx as nx

from libs.observability.metrics import reasoning_triggers_total
from services.detection.zones import DEFAULT_ZONES

INTERACTION_OBJECTS = [
"backpack",
"handbag",
"cell phone",
"laptop"
]
class SceneGraphBuilder:
def serialize_graph(self):

serialized = []

for source, target, data in self.graph.edges(data=True):

relation = data.get("relation")

edge_text = f"{source} -> [{relation}] -> {target}"

if "distance" in data:

edge_text += f" (distance={data['distance']})"

serialized.append(edge_text)

return "\n".join(serialized)

def __init__(self, det_frame):

self.det_frame = det_frame
self.graph = nx.MultiDiGraph()

def calculate_distance(self, center1, center2):

x1, y1 = center1
x2, y2 = center2

return math.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)

def build_graph(self):

reasoning_triggers_total.inc()

self.graph.clear()

# Add zone nodes
for zone in DEFAULT_ZONES:

self.graph.add_node(
zone.name,
type="zone"
)

detections = self.det_frame.detections

# Add detection nodes
for idx, det in enumerate(detections):

node_name = f"{det.label}_{idx}"

self.graph.add_node(
node_name,
type=det.label,
center=det.center
)

# Zone relationships
for zone_name in det.zones_present:

self.graph.add_edge(
node_name,
zone_name,
relation="INSIDE"
)

# Object relationships
for i in range(len(detections)):

for j in range(i + 1, len(detections)):

det1 = detections[i]
det2 = detections[j]

node1 = f"{det1.label}_{i}"
node2 = f"{det2.label}_{j}"

dist = self.calculate_distance(
det1.center,
det2.center
)

# NEAR relationship
if dist < 150:

self.graph.add_edge(
node1,
node2,
relation="NEAR",
distance=round(dist, 2)
)

person_node = None
object_node = None

# Interaction detection
if (
det1.label == "person"
and det2.label in INTERACTION_OBJECTS
):

person_node = node1
object_node = node2

elif (
det2.label == "person"
and det1.label in INTERACTION_OBJECTS
):

person_node = node2
object_node = node1

# INTERACTING_WITH
if person_node and dist < 60:

self.graph.add_edge(
person_node,
object_node,
relation="INTERACTING_WITH"
)

# HOLDING
elif person_node and 60 <= dist < 80:

self.graph.add_edge(
person_node,
object_node,
relation="HOLDING"
)

return self.graph
from typing import Optional
from libs.schemas.graph import GraphNode, GraphEdge, NodeType, EdgeType


class SceneGraph:
"""Wraps a NetworkX MultiDiGraph with helper methods for surveillance scenes."""

MAX_PROMPT_TOKENS = 300

def __init__(self, timestamp: float = 0.0):
self.timestamp = timestamp
self.graph: nx.MultiDiGraph = nx.MultiDiGraph()

def add_node(self, node: GraphNode) -> None:
self.graph.add_node(
node.id,
node_type=node.node_type.value,
label=node.label or node.id,
)

def add_edge(self, edge: GraphEdge) -> None:
attrs = {"edge_type": edge.edge_type.value}
if edge.distance_px is not None:
attrs["distance_px"] = edge.distance_px
self.graph.add_edge(edge.source, edge.target, **attrs)

@classmethod
def from_tracked_frame(cls, frame) -> "SceneGraph":
sg = cls(timestamp=getattr(frame, "timestamp", 0.0))

# Add zones (skip if missing id)
for zone in getattr(frame, "zones", []):
zone_id = zone.get("id")
if zone_id:
sg.add_node(GraphNode(id=zone_id, node_type=NodeType.ZONE))

# Add objects and their belongs_to relationships
for obj in getattr(frame, "objects", []):
obj_id = obj.get("id")
if not obj_id:
continue
sg.add_node(GraphNode(id=obj_id, node_type=NodeType.OBJECT))
belongs_to = obj.get("belongs_to")
if belongs_to:
sg.add_node(GraphNode(id=belongs_to, node_type=NodeType.ZONE))
sg.add_edge(GraphEdge(
source=obj_id,
target=belongs_to,
edge_type=EdgeType.INSIDE,
))

# Add persons and their relationships
for person in getattr(frame, "persons", []):
pid = person.get("id")
if not pid:
continue
sg.add_node(GraphNode(id=pid, node_type=NodeType.PERSON))

# INSIDE relationship
zone = person.get("zone")
if zone:
sg.add_node(GraphNode(id=zone, node_type=NodeType.ZONE))
sg.add_edge(GraphEdge(
source=pid,
target=zone,
edge_type=EdgeType.INSIDE,
))

# NEAR relationships
for nearby in person.get("nearby_objects", []):
obj_id = nearby.get("id")
if not obj_id:
continue
dist = nearby.get("distance_px")
sg.add_node(GraphNode(id=obj_id, node_type=NodeType.OBJECT))
sg.add_edge(GraphEdge(
source=pid,
target=obj_id,
edge_type=EdgeType.NEAR,
distance_px=dist,
))

# INTERACTING_WITH relationships
for obj_id in person.get("interacting_with", []):
if not obj_id:
continue
sg.add_node(GraphNode(id=obj_id, node_type=NodeType.OBJECT))
sg.add_edge(GraphEdge(
source=pid,
target=obj_id,
edge_type=EdgeType.INTERACTING_WITH,
))

return sg

def to_prompt_str(self) -> str:
lines = [f"Scene graph at t={self.timestamp:.1f}s:", ""]

for src, dst, data in self.graph.edges(data=True):
edge_type = data.get("edge_type", "RELATED_TO")
dist = data.get("distance_px")
if edge_type == EdgeType.NEAR.value and dist is not None:
edge_label = f"{edge_type}({int(dist)}px)"
else:
edge_label = edge_type
lines.append(f"{src} \u2192 {edge_label} \u2192 {dst}")

serialized = "\n".join(lines)
# Rough trim if over 200 words (≈260 tokens, safe under 300)
words = serialized.split()
if len(words) > 200:
serialized = " ".join(words[:200]) + "\n[...truncated]"
return serialized

def node_count(self) -> int:
return self.graph.number_of_nodes()

def edge_count(self) -> int:
return self.graph.number_of_edges()
Empty file added tests/__init__.py
Empty file.
Loading
Loading