Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 80 additions & 58 deletions services/detection/detection.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
"""
detector.py YOLOv8/v9 frame-level object detection.
detection.py YOLOv8/v9 frame-level object detection.

Usage (CLI):
python detector.py --source data/sample_videos/sample.mp4
python detector.py --source 0 # webcam
python detection.py --source data/sample_videos/sample.mp4
python detection.py --source 0 # webcam

Usage (API):
from services.detection.detector import Detector
from services.detection.detection import Detector
detector = Detector()
results = detector.detect(frame)
"""
from __future__ import annotations
import argparse
import logging
from pathlib import Path

import cv2
import numpy as np
Expand All @@ -26,13 +25,20 @@
logger = logging.getLogger(__name__)


# ─── Detector Class ──────────────────────────────────────────────────────────

class Detector:
"""Wraps a YOLO model for frame-by-frame inference."""
"""YOLOv8/v9 wrapper for frame-level object detection.

Runs inference on individual BGR frames and returns structured
DetectionFrameSchema objects with bounding boxes, labels, confidence
scores, and zone memberships.

Attributes:
PERSON_CLASS_ID: YOLO class index for 'person'.
TARGET_LABELS: Set of object labels to retain from YOLO output.
"""

PERSON_CLASS_ID = 0 # COCO class ID for 'person'
TARGET_LABELS = { # labels to pass downstream (filter noise)
PERSON_CLASS_ID = 0
TARGET_LABELS = {
"person", "backpack", "handbag", "cell phone", "laptop"
}

Expand All @@ -42,24 +48,34 @@ def __init__(
confidence_threshold: float = 0.45,
device: str = "cpu",
) -> None:
"""Initialize the Detector with a YOLO model.

Args:
model_name: Name or path of the YOLO model file.
confidence_threshold: Minimum confidence score to keep a detection.
device: Inference device, e.g. 'cpu' or 'cuda'.
"""
logger.info(f"Loading YOLO model: {model_name} on {device}")
self.model = YOLO(model_name)
self.conf = confidence_threshold
self.device = device

def detect(self, frame: np.ndarray, frame_id: int = 0) -> DetectionFrame:
"""
Run YOLO inference on a single BGR frame.
def detect(self, frame: np.ndarray, frame_id: int = 0) -> DetectionFrameSchema:
"""Run YOLO inference on a single BGR frame.

Args:
frame: BGR image as numpy array (H, W, 3).
frame: BGR image as numpy array (H, W, 3).
frame_id: Frame index for downstream tracking.

Returns:
DetectionFrame with all detected objects and zone memberships.
DetectionFrameSchema with all detected objects and zone memberships.

Example:
detector = Detector()
det_frame = detector.detect(frame, frame_id=42)
"""
results = self.model(frame, conf=self.conf, device=self.device, verbose=False)
detections: list[Detection] = []
detections: list[DetectionSchema] = []

for box, conf, cls_id in zip(
results[0].boxes.xyxy.cpu().numpy(),
Expand All @@ -73,38 +89,47 @@ def detect(self, frame: np.ndarray, frame_id: int = 0) -> DetectionFrame:
x1, y1, x2, y2 = box.tolist()
cx, cy = (x1 + x2) / 2, (y1 + y2) / 2

zones = [z.name for z in get_zones_for_point(cx, cy)]
_ = [z.name for z in get_zones_for_point(cx, cy)]

detections.append(Detection(
detections.append(DetectionSchema(
label=label,
bbox=[x1, y1, x2, y2],
bbox=BoundingBox(x1=x1, y1=y1, x2=x2, y2=y2),
confidence=float(conf),
center=(cx, cy),
zones_present=zones,
class_id=int(cls_id),
))

return DetectionFrame(
return DetectionFrameSchema(
frame_id=frame_id,
detections=detections,
timestamp_ms=cv2.getTickCount() / cv2.getTickFrequency() * 1000,
)


# ─── Rendering ────────────────────────────────────────────────────────────────

LABEL_COLORS: dict[str, tuple[int, int, int]] = {
"person": (0, 120, 255),
"backpack": (255, 165, 0),
"handbag": (255, 165, 0),
"cell phone":(0, 200, 200),
"laptop": (200, 0, 200),
"person": (0, 120, 255),
"backpack": (255, 165, 0),
"handbag": (255, 165, 0),
"cell phone": (0, 200, 200),
"laptop": (200, 0, 200),
}

def draw_detections(frame: np.ndarray, det_frame: DetectionFrame) -> np.ndarray:
"""Draw bounding boxes, labels, and zone overlays onto frame."""

def draw_detections(frame: np.ndarray, det_frame: DetectionFrameSchema) -> np.ndarray:
"""Draw bounding boxes, labels, and zone overlays onto a BGR frame.

Args:
frame: Original BGR image as numpy array (H, W, 3).
det_frame: DetectionFrameSchema containing all detected objects.

Returns:
Annotated BGR frame with boxes, labels, zones, and HUD overlay.

Example:
annotated = draw_detections(frame, det_frame)
cv2.imshow("Output", annotated)
"""
out = frame.copy()

# Draw zone polygons
for zone in DEFAULT_ZONES:
pts = zone.as_array().reshape((-1, 1, 2))
overlay = out.copy()
Expand All @@ -114,32 +139,41 @@ def draw_detections(frame: np.ndarray, det_frame: DetectionFrame) -> np.ndarray:
cv2.putText(out, zone.name, zone.polygon[0],
cv2.FONT_HERSHEY_SIMPLEX, 0.5, zone.color_bgr, 1)

# Draw detections
for det in det_frame.detections:
x1, y1, x2, y2 = [int(v) for v in det.bbox]
x1, y1, x2, y2 = int(det.bbox.x1), int(det.bbox.y1), int(det.bbox.x2), int(det.bbox.y2)
cx, cy = det.bbox.center
color = LABEL_COLORS.get(det.label, (200, 200, 200))
cv2.rectangle(out, (x1, y1), (x2, y2), color, 2)

label_text = f"{det.label} {det.confidence:.2f}"
if det.zones_present:
label_text += f" [{', '.join(det.zones_present)}]"

cv2.putText(out, label_text, (x1, y1 - 8),
cv2.FONT_HERSHEY_SIMPLEX, 0.55, color, 2)

# Centroid dot
cv2.circle(out, (int(det.center[0]), int(det.center[1])), 4, color, -1)
cv2.circle(out, (int(cx), int(cy)), 4, color, -1)

# HUD
cv2.putText(out, f"Frame: {det_frame.frame_id} | Detections: {len(det_frame.detections)}",
(10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.65, (255, 255, 255), 2)
cv2.putText(
out,
f"Frame: {det_frame.frame_id} | Detections: {len(det_frame.detections)}",
(10, 28),
cv2.FONT_HERSHEY_SIMPLEX,
0.65,
(255, 255, 255),
2,
)

return out


# ─── CLI Entry Point ─────────────────────────────────────────────────────────

def main() -> None:
"""CLI entry point for running the detection demo on video or webcam.

Parses arguments, initializes the Detector, and runs the inference loop.
Optionally writes annotated output to a video file.

Example:
python detection.py --source data/sample_videos/sample.mp4 --output out.mp4
"""
parser = argparse.ArgumentParser(description="Run Agentic Vision detection demo")
parser.add_argument("--source", default="0", help="Video file path or camera index")
parser.add_argument("--model", default="yolov8n.pt", help="YOLO model name")
Expand All @@ -155,7 +189,7 @@ def main() -> None:
raise RuntimeError(f"Cannot open source: {source}")

fps = cap.get(cv2.CAP_PROP_FPS) or 30
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
logger.info(f"Stream: {width}x{height} @ {fps:.1f} FPS")

Expand All @@ -171,21 +205,9 @@ def main() -> None:
break

det_frame = detector.detect(frame, frame_id=frame_id)
builder = SceneGraphBuilder(det_frame)

builder.build_graph()
graph_text = builder.serialize_graph()

if frame_id % 30 == 0 and graph_text:
prompt = build_reasoning_prompt(graph_text)
print("\nLLM PROMPT:\n")
print(prompt)



annotated = draw_detections(frame, det_frame)
annotated = draw_detections(frame, det_frame)

cv2.imshow("Agentic Vision Detection", annotated)
cv2.imshow("Agentic Vision Detection", annotated)
if writer:
writer.write(annotated)

Expand Down
1 change: 0 additions & 1 deletion services/memory/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import json
import logging
import time
from typing import Optional

import numpy as np
Expand Down
Loading
Loading