diff --git a/services/detection/detection.py b/services/detection/detection.py index 1fdc7e9..b461123 100644 --- a/services/detection/detection.py +++ b/services/detection/detection.py @@ -1,19 +1,18 @@ """ -detector.py — YOLOv8/v9 frame-level object detection. +detection.py – YOLOv8/v9 frame-level object detection. Usage (CLI): - python detector.py --source data/sample_videos/sample.mp4 - python detector.py --source 0 # webcam + python detection.py --source data/sample_videos/sample.mp4 + python detection.py --source 0 # webcam Usage (API): - from services.detection.detector import Detector + from services.detection.detection import Detector detector = Detector() results = detector.detect(frame) """ from __future__ import annotations import argparse import logging -from pathlib import Path import cv2 import numpy as np @@ -26,13 +25,20 @@ logger = logging.getLogger(__name__) -# ─── Detector Class ────────────────────────────────────────────────────────── - class Detector: - """Wraps a YOLO model for frame-by-frame inference.""" + """YOLOv8/v9 wrapper for frame-level object detection. + + Runs inference on individual BGR frames and returns structured + DetectionFrameSchema objects with bounding boxes, labels, confidence + scores, and zone memberships. + + Attributes: + PERSON_CLASS_ID: YOLO class index for 'person'. + TARGET_LABELS: Set of object labels to retain from YOLO output. + """ - PERSON_CLASS_ID = 0 # COCO class ID for 'person' - TARGET_LABELS = { # labels to pass downstream (filter noise) + PERSON_CLASS_ID = 0 + TARGET_LABELS = { "person", "backpack", "handbag", "cell phone", "laptop" } @@ -42,24 +48,34 @@ def __init__( confidence_threshold: float = 0.45, device: str = "cpu", ) -> None: + """Initialize the Detector with a YOLO model. + + Args: + model_name: Name or path of the YOLO model file. + confidence_threshold: Minimum confidence score to keep a detection. + device: Inference device, e.g. 'cpu' or 'cuda'. + """ logger.info(f"Loading YOLO model: {model_name} on {device}") self.model = YOLO(model_name) self.conf = confidence_threshold self.device = device - def detect(self, frame: np.ndarray, frame_id: int = 0) -> DetectionFrame: - """ - Run YOLO inference on a single BGR frame. + def detect(self, frame: np.ndarray, frame_id: int = 0) -> DetectionFrameSchema: + """Run YOLO inference on a single BGR frame. Args: - frame: BGR image as numpy array (H, W, 3). + frame: BGR image as numpy array (H, W, 3). frame_id: Frame index for downstream tracking. Returns: - DetectionFrame with all detected objects and zone memberships. + DetectionFrameSchema with all detected objects and zone memberships. + + Example: + detector = Detector() + det_frame = detector.detect(frame, frame_id=42) """ results = self.model(frame, conf=self.conf, device=self.device, verbose=False) - detections: list[Detection] = [] + detections: list[DetectionSchema] = [] for box, conf, cls_id in zip( results[0].boxes.xyxy.cpu().numpy(), @@ -73,38 +89,47 @@ def detect(self, frame: np.ndarray, frame_id: int = 0) -> DetectionFrame: x1, y1, x2, y2 = box.tolist() cx, cy = (x1 + x2) / 2, (y1 + y2) / 2 - zones = [z.name for z in get_zones_for_point(cx, cy)] + _ = [z.name for z in get_zones_for_point(cx, cy)] - detections.append(Detection( + detections.append(DetectionSchema( label=label, - bbox=[x1, y1, x2, y2], + bbox=BoundingBox(x1=x1, y1=y1, x2=x2, y2=y2), confidence=float(conf), - center=(cx, cy), - zones_present=zones, + class_id=int(cls_id), )) - return DetectionFrame( + return DetectionFrameSchema( frame_id=frame_id, detections=detections, timestamp_ms=cv2.getTickCount() / cv2.getTickFrequency() * 1000, ) -# ─── Rendering ──────────────────────────────────────────────────────────────── - LABEL_COLORS: dict[str, tuple[int, int, int]] = { - "person": (0, 120, 255), - "backpack": (255, 165, 0), - "handbag": (255, 165, 0), - "cell phone":(0, 200, 200), - "laptop": (200, 0, 200), + "person": (0, 120, 255), + "backpack": (255, 165, 0), + "handbag": (255, 165, 0), + "cell phone": (0, 200, 200), + "laptop": (200, 0, 200), } -def draw_detections(frame: np.ndarray, det_frame: DetectionFrame) -> np.ndarray: - """Draw bounding boxes, labels, and zone overlays onto frame.""" + +def draw_detections(frame: np.ndarray, det_frame: DetectionFrameSchema) -> np.ndarray: + """Draw bounding boxes, labels, and zone overlays onto a BGR frame. + + Args: + frame: Original BGR image as numpy array (H, W, 3). + det_frame: DetectionFrameSchema containing all detected objects. + + Returns: + Annotated BGR frame with boxes, labels, zones, and HUD overlay. + + Example: + annotated = draw_detections(frame, det_frame) + cv2.imshow("Output", annotated) + """ out = frame.copy() - # Draw zone polygons for zone in DEFAULT_ZONES: pts = zone.as_array().reshape((-1, 1, 2)) overlay = out.copy() @@ -114,32 +139,41 @@ def draw_detections(frame: np.ndarray, det_frame: DetectionFrame) -> np.ndarray: cv2.putText(out, zone.name, zone.polygon[0], cv2.FONT_HERSHEY_SIMPLEX, 0.5, zone.color_bgr, 1) - # Draw detections for det in det_frame.detections: - x1, y1, x2, y2 = [int(v) for v in det.bbox] + x1, y1, x2, y2 = int(det.bbox.x1), int(det.bbox.y1), int(det.bbox.x2), int(det.bbox.y2) + cx, cy = det.bbox.center color = LABEL_COLORS.get(det.label, (200, 200, 200)) cv2.rectangle(out, (x1, y1), (x2, y2), color, 2) label_text = f"{det.label} {det.confidence:.2f}" - if det.zones_present: - label_text += f" [{', '.join(det.zones_present)}]" cv2.putText(out, label_text, (x1, y1 - 8), cv2.FONT_HERSHEY_SIMPLEX, 0.55, color, 2) - # Centroid dot - cv2.circle(out, (int(det.center[0]), int(det.center[1])), 4, color, -1) + cv2.circle(out, (int(cx), int(cy)), 4, color, -1) - # HUD - cv2.putText(out, f"Frame: {det_frame.frame_id} | Detections: {len(det_frame.detections)}", - (10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.65, (255, 255, 255), 2) + cv2.putText( + out, + f"Frame: {det_frame.frame_id} | Detections: {len(det_frame.detections)}", + (10, 28), + cv2.FONT_HERSHEY_SIMPLEX, + 0.65, + (255, 255, 255), + 2, + ) return out -# ─── CLI Entry Point ───────────────────────────────────────────────────────── - def main() -> None: + """CLI entry point for running the detection demo on video or webcam. + + Parses arguments, initializes the Detector, and runs the inference loop. + Optionally writes annotated output to a video file. + + Example: + python detection.py --source data/sample_videos/sample.mp4 --output out.mp4 + """ parser = argparse.ArgumentParser(description="Run Agentic Vision detection demo") parser.add_argument("--source", default="0", help="Video file path or camera index") parser.add_argument("--model", default="yolov8n.pt", help="YOLO model name") @@ -155,7 +189,7 @@ def main() -> None: raise RuntimeError(f"Cannot open source: {source}") fps = cap.get(cv2.CAP_PROP_FPS) or 30 - width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) logger.info(f"Stream: {width}x{height} @ {fps:.1f} FPS") @@ -171,21 +205,9 @@ def main() -> None: break det_frame = detector.detect(frame, frame_id=frame_id) - builder = SceneGraphBuilder(det_frame) - - builder.build_graph() - graph_text = builder.serialize_graph() - - if frame_id % 30 == 0 and graph_text: - prompt = build_reasoning_prompt(graph_text) - print("\nLLM PROMPT:\n") - print(prompt) - - - - annotated = draw_detections(frame, det_frame) + annotated = draw_detections(frame, det_frame) - cv2.imshow("Agentic Vision — Detection", annotated) + cv2.imshow("Agentic Vision – Detection", annotated) if writer: writer.write(annotated) diff --git a/services/memory/memory.py b/services/memory/memory.py index 5de85ef..0d8d0f1 100644 --- a/services/memory/memory.py +++ b/services/memory/memory.py @@ -31,7 +31,6 @@ import json import logging -import time from typing import Optional import numpy as np diff --git a/services/tracking/tracker.py b/services/tracking/tracker.py index 1dff667..34c7fe8 100644 --- a/services/tracking/tracker.py +++ b/services/tracking/tracker.py @@ -1,5 +1,5 @@ """ -tracker.py — Wraps deep-sort-realtime (ByteTrack-style) to assign persistent +tracker.py – Wraps deep-sort-realtime (ByteTrack-style) to assign persistent track IDs to YOLO detections coming from Phase 1. Usage (standalone): @@ -22,10 +22,9 @@ import numpy as np from deep_sort_realtime.deepsort_tracker import DeepSort -# ── adjust sys.path so we can import sibling packages ────────────────────── import sys -sys.path.insert(0, str(Path(__file__).resolve().parents[2])) # repo root +sys.path.insert(0, str(Path(__file__).resolve().parents[2])) from libs.schemas.detection import DetectionFrameSchema from libs.schemas.tracking import ( @@ -58,22 +57,33 @@ class Tracker: - Lifecycle event emission (BORN / LOST / DEAD) """ - MAX_TRAJECTORY_LEN = 80 # max trajectory points stored per track + MAX_TRAJECTORY_LEN = 80 FPS_DEFAULT = 30 def __init__( self, fps: float = FPS_DEFAULT, - max_age: int = 30, # frames before a lost track is marked DEAD - n_init: int = 3, # frames before a track is CONFIRMED + max_age: int = 30, + n_init: int = 3, max_cosine_distance: float = 0.4, camera_id: str = "cam_01", event_logger: TrackEventLogger | None = None, reid_similarity_threshold: float = 0.85, ) -> None: + """Initialize the Tracker with DeepSort backend and internal state. + + Args: + fps: Frames per second of the input stream. + max_age: Frames before a lost track is marked DEAD. + n_init: Frames before a track is CONFIRMED. + max_cosine_distance: ReID appearance distance threshold. + camera_id: Identifier for the camera feed. + event_logger: Optional logger for lifecycle events. + reid_similarity_threshold: Cosine similarity cutoff for ReID matching. + """ self.fps = fps self.camera_id = camera_id - self.max_age = max_age # NEW + self.max_age = max_age self.REID_SIMILARITY_THRESHOLD = reid_similarity_threshold self._tracker = DeepSort( @@ -82,7 +92,6 @@ def __init__( max_cosine_distance=max_cosine_distance, nn_budget=100, ) - # Internal state self._active_tracks: dict[int, TrackedObject] = {} self._known_ids: set[int] = set() self._frame_id: int = 0 @@ -91,8 +100,6 @@ def __init__( self._lost_embeddings: dict[int, dict] = {} self._active_embeddings: dict[int, np.ndarray] = {} - # ── Public API ────────────────────────────────────────────────────────── - def update( self, det_frame: DetectionFrameSchema, @@ -103,7 +110,7 @@ def update( Args: det_frame: Output of Phase 1 detector (DetectionFrameSchema). - raw_frame: Original BGR frame — needed for appearance features. + raw_frame: Original BGR frame – needed for appearance features. Returns: TrackedFrame with all confirmed tracks, dwell times, trajectories. @@ -111,21 +118,17 @@ def update( self._frame_id = det_frame.frame_id frames_processed_total.inc() - # ── Convert Pydantic detections → DeepSort input format ─────────── - # DeepSort expects: list of ([left, top, w, h], confidence, label) ds_input = [] for det in det_frame.detections: - if det.label != "person": # track persons only in this phase + if det.label != "person": continue b = det.bbox left, top = b.x1, b.y1 w, h = b.x2 - b.x1, b.y2 - b.y1 ds_input.append(([left, top, w, h], float(det.confidence), "person")) - # ── Run tracker ──────────────────────────────────────────────────── raw_tracks = self._tracker.update_tracks(ds_input, frame=raw_frame) - # ── Build TrackedObject list ─────────────────────────────────────── current_ids: set[int] = set() tracked_objects: list[TrackedObject] = [] @@ -135,7 +138,6 @@ def update( tid = int(t.track_id) - # ── ReID matching ───────────────────────────────────── if hasattr(t, "features") and t.features: new_embedding = t.features[-1] self._active_embeddings[tid] = new_embedding @@ -166,18 +168,15 @@ def update( zones = [z.name for z in get_zones_for_point(cx, cy)] - # ── Lifecycle: BORN ─────────────────────────────────────────── if tid not in self._known_ids: self._known_ids.add(tid) self._emit_lifecycle(TrackState.BORN, tid, zones, 0.0) logger.info(f"Track BORN: #{tid} in zones={zones}") - # ── Dwell time ──────────────────────────────────────────────── prev = self._active_tracks.get(tid) dwell_frames = (prev.dwell_time_frames + 1) if prev else 1 dwell_secs = dwell_frames / self.fps - # ── Trajectory ──────────────────────────────────────────────── prev_traj = prev.trajectory if prev else [] new_point = TrajectoryPoint(x=cx, y=cy, frame_id=self._frame_id) trajectory = (prev_traj + [new_point])[-self.MAX_TRAJECTORY_LEN :] @@ -203,7 +202,6 @@ def update( for obj in tracked_objects: track_dwell_seconds.observe(obj.dwell_time_seconds) - # ── Lifecycle: LOST for tracks that disappeared ──────────────────── for tid, prev_obj in list(self._active_tracks.items()): if tid not in current_ids: frames_since = self._frame_id - prev_obj.last_seen_frame @@ -240,7 +238,7 @@ def update( del self._active_tracks[tid] self._active_embeddings.pop(tid, None) logger.info(f"Track DEAD: #{tid} after {prev_obj.dwell_time_seconds:.1f}s") - # ── Cleanup expired ReID embeddings ────────────────── + expired_ids = [ tid for tid, data in self._lost_embeddings.items() @@ -259,16 +257,23 @@ def update( ) def drain_lifecycle_events(self) -> list[TrackLifecycleEvent]: - """ - Pop and return all pending lifecycle events since last call. - Called by the memory service to store BORN/LOST/DEAD events. + """Pop and return all pending lifecycle events since the last call. + + Called by the memory service to consume BORN/LOST/DEAD events. + + Returns: + List of TrackLifecycleEvent objects queued since the last drain. + Returns an empty list if no events are pending. + + Example: + events = tracker.drain_lifecycle_events() + for evt in events: + memory_service.store(evt) """ events = list(self._lifecycle_queue) self._lifecycle_queue.clear() return events - # ── Internal ──────────────────────────────────────────────────────────── - def _emit_lifecycle( self, state: TrackState, @@ -276,6 +281,14 @@ def _emit_lifecycle( zones: list[str], dwell_secs: float, ) -> None: + """Create and queue a TrackLifecycleEvent, optionally logging it. + + Args: + state: TrackState enum value (BORN, LOST, or DEAD). + track_id: Unique integer ID of the track. + zones: List of zone names the track currently occupies. + dwell_secs: Total dwell time in seconds for this track. + """ event = TrackLifecycleEvent( event=state, track_id=track_id, @@ -294,6 +307,15 @@ def _cosine_similarity( a: np.ndarray, b: np.ndarray, ) -> float: + """Compute cosine similarity between two embedding vectors. + + Args: + a: First embedding vector as numpy array. + b: Second embedding vector as numpy array. + + Returns: + Float in [0, 1] representing similarity; 0.0 if either norm is zero. + """ norm_product = np.linalg.norm(a) * np.linalg.norm(b) if norm_product == 0: return 0.0 @@ -301,17 +323,19 @@ def _cosine_similarity( return float(np.dot(a, b) / norm_product) -# ─── CLI Demo ──────────────────────────────────────────────────────────────── - - def main() -> None: + """CLI entry point for the tracking demo on video or webcam. + + Parses arguments, initializes Detector and Tracker, runs the pipeline, + and optionally writes annotated output to a video file. + """ import sys sys.path.insert(0, str(Path(__file__).resolve().parents[2])) from services.detection.detector import Detector from services.tracking.visualizer import draw_tracks - parser = argparse.ArgumentParser(description="Phase 2 — Tracking demo") + parser = argparse.ArgumentParser(description="Phase 2 – Tracking demo") parser.add_argument("--source", default="0") parser.add_argument("--model", default="yolov8n.pt") parser.add_argument("--output", default=None) @@ -340,14 +364,13 @@ def main() -> None: tracked_frame = tracker.update(det_frame, frame) annotated = draw_tracks(frame, tracked_frame) - # Drain lifecycle events (Phase 3 will store these in Redis) for evt in tracker.drain_lifecycle_events(): logger.info( f"Lifecycle: {evt.event} track #{evt.track_id} " f"dwell={evt.dwell_time_seconds:.1f}s zones={evt.zones_present}" ) - cv2.imshow("Agentic Vision — Tracking", annotated) + cv2.imshow("Agentic Vision – Tracking", annotated) if writer: writer.write(annotated) if cv2.waitKey(1) & 0xFF == ord("q"): @@ -361,4 +384,4 @@ def main() -> None: if __name__ == "__main__": - main() + main() \ No newline at end of file