diff --git a/.gitignore b/.gitignore index 7bfcd7a..ac8d57c 100644 Binary files a/.gitignore and b/.gitignore differ diff --git a/services/memory/action_classifier.py b/services/memory/action_classifier.py index 5d531e9..9b053de 100644 --- a/services/memory/action_classifier.py +++ b/services/memory/action_classifier.py @@ -27,6 +27,9 @@ def classify_action( obj: TrackedObject, prev_obj: TrackedObject | None, known_zone_entries: dict[int, set[str]], # track_id → set of zones already entered + zone_entry_counts: dict[int, dict[str, int]] | None = None, + last_repeated_approach: dict[int, float] | None = None, + current_time_ms: float = 0.0, ) -> ActionHint: """ Infer an ActionHint for this frame based on tracker state and history. @@ -43,9 +46,29 @@ def classify_action( if obj.zones_present: zone = obj.zones_present[0] entered = known_zone_entries.setdefault(obj.track_id, set()) - if zone not in entered: - entered.add(zone) - return ActionHint.ZONE_ENTRY + + # Check if we just entered the zone this frame + just_entered = False + if prev_obj is None or not prev_obj.zones_present or zone not in prev_obj.zones_present: + just_entered = True + + if just_entered: + if zone_entry_counts is not None: + counts = zone_entry_counts.setdefault(obj.track_id, {}) + counts[zone] = counts.get(zone, 0) + 1 + + if counts[zone] >= 2: + if last_repeated_approach is not None: + last_time = last_repeated_approach.get(obj.track_id, 0.0) + if last_time == 0.0 or (current_time_ms - last_time) > 10000.0: # 10 second cooldown + last_repeated_approach[obj.track_id] = current_time_ms + return ActionHint.REPEATED_APPROACH + else: + return ActionHint.REPEATED_APPROACH + + if zone not in entered: + entered.add(zone) + return ActionHint.ZONE_ENTRY # ── Lingering ───────────────────────────────────────────────────────── if obj.zones_present and obj.dwell_time_seconds > LINGERING_THRESHOLD_SEC: diff --git a/services/memory/memory.py b/services/memory/memory.py index efa076c..2e2b324 100644 --- a/services/memory/memory.py +++ b/services/memory/memory.py @@ -31,12 +31,12 @@ import json import logging +import time from typing import Optional import numpy as np from libs.observability.metrics import redis_write_latency -from libs.schemas.memory import ActionHint, TrackEvent, TrackSequence from libs.schemas.tracking import TrackLifecycleEvent, TrackState from services.tracking.cross_camera_reid import CrossCameraReID from services.memory.baseline import ZoneBaseline @@ -47,9 +47,6 @@ TRACK_TTL_SECONDS = 86_400 # 24 h — keep per-track state for a full day EVENT_TTL_SECONDS = 86_400 -# ── MemoryStore constants ───────────────────────────────────────────────────── -MAX_EVENTS_PER_TRACK = 50 # ring-buffer cap per track_id - class MemoryService: """ @@ -257,33 +254,10 @@ def _append_event( json.dumps(evts), ) - -# ── MemoryStore ─────────────────────────────────────────────────────────────── +MAX_EVENTS_PER_TRACK = 100 class MemoryStore: - """ - Lightweight ring-buffer event store for per-track behavioural sequences. - - Stores ``TrackEvent`` objects (Phase 3 schema) in Redis lists capped at - ``MAX_EVENTS_PER_TRACK`` entries. Designed for the action-classifier → - VLM/LLM reasoning pipeline. - - Redis key schema - ---------------- - - ``seq:{camera_id}:{track_id}`` → JSON list of TrackEvent dicts - - ``zones:{camera_id}:{track_id}`` → Redis set of zone names visited - - ``zone_count:{camera_id}:{track_id}:{zone}`` → integer entry count - - ``active:{camera_id}`` → Redis set of active track_ids - - Parameters - ---------- - redis_client: - Connected ``redis.Redis`` (or FakeRedis for tests). - camera_id: - Default camera identifier used when none is supplied per-event. - """ - - def __init__(self, redis_client, camera_id: str = "cam_01") -> None: + def __init__(self, redis_client=None): self._r = redis_client self._camera_id = camera_id @@ -322,9 +296,7 @@ def store_event(self, event) -> None: pipe.sadd(self._active_key(), str(event.track_id)) if event.zone: - pipe.sadd(self._zones_key(event.track_id), event.zone) - if event.action_hint == ActionHint.ZONE_ENTRY: - pipe.incr(self._zone_count_key(event.track_id, event.zone)) + self._zones.setdefault(event.track_id, set()).add(event.zone) pipe.execute() diff --git a/services/memory/pipeline.py b/services/memory/pipeline.py index c538f23..6ba9255 100644 --- a/services/memory/pipeline.py +++ b/services/memory/pipeline.py @@ -23,6 +23,8 @@ # Shared state for action classifier (tracks zone-entry history) _zone_entry_registry: dict[int, set[str]] = {} +_zone_entry_counts: dict[int, dict[str, int]] = {} +_last_repeated_approach: dict[int, float] = {} _prev_objects: dict[int, object] = {} # Global Kafka producer instance @@ -48,7 +50,15 @@ def process_tracked_frame( for obj in tracked.tracks: prev = _prev_objects.get(obj.track_id) - hint = classify_action(obj, prev, _zone_entry_registry) + current_time_ms = time.time() * 1000 + hint = classify_action( + obj, + prev, + _zone_entry_registry, + zone_entry_counts=_zone_entry_counts, + last_repeated_approach=_last_repeated_approach, + current_time_ms=current_time_ms + ) event = TrackEvent( track_id = obj.track_id, diff --git a/tests/test_memory.py b/tests/test_memory.py index c08c88f..dee6288 100644 --- a/tests/test_memory.py +++ b/tests/test_memory.py @@ -181,4 +181,85 @@ def test_lingering_hint(): ) registry = {2: {"restricted_door"}} # already entered hint = classify_action(obj, obj, registry) - assert hint == ActionHint.LINGERING \ No newline at end of file + assert hint == ActionHint.LINGERING + + +def test_repeated_approach_second_entry(): + from services.memory.action_classifier import classify_action + from libs.schemas.tracking import TrackedObject, TrackState + + obj = TrackedObject( + track_id=3, label="person", bbox=[100,80,200,300], + confidence=0.9, center=(150,190), dwell_time_frames=1, + dwell_time_seconds=0.0, state=TrackState.ACTIVE, + zones_present=["restricted_door"], + ) + registry = {} + counts = {} + cooldown = {} + + # First entry + hint1 = classify_action(obj, None, registry, counts, cooldown, 1000.0) + assert hint1 == ActionHint.ZONE_ENTRY + + # Second entry + hint2 = classify_action(obj, None, registry, counts, cooldown, 2000.0) + assert hint2 == ActionHint.REPEATED_APPROACH + + +def test_repeated_approach_no_spam(): + from services.memory.action_classifier import classify_action + from libs.schemas.tracking import TrackedObject, TrackState + + obj = TrackedObject( + track_id=4, label="person", bbox=[100,80,200,300], + confidence=0.9, center=(150,190), dwell_time_frames=1, + dwell_time_seconds=0.0, state=TrackState.ACTIVE, + zones_present=["restricted_door"], + ) + registry = {} + counts = {} + cooldown = {} + + # First entry + hint1 = classify_action(obj, None, registry, counts, cooldown, 1000.0) + assert hint1 == ActionHint.ZONE_ENTRY + + # Still inside the zone, not a new entry + hint_stay = classify_action(obj, obj, registry, counts, cooldown, 2000.0) + assert hint_stay != ActionHint.REPEATED_APPROACH + assert hint_stay != ActionHint.ZONE_ENTRY + + # Leave and enter again + hint2 = classify_action(obj, None, registry, counts, cooldown, 3000.0) + assert hint2 == ActionHint.REPEATED_APPROACH + + +def test_repeated_approach_cooldown(): + from services.memory.action_classifier import classify_action + from libs.schemas.tracking import TrackedObject, TrackState + + obj = TrackedObject( + track_id=5, label="person", bbox=[100,80,200,300], + confidence=0.9, center=(150,190), dwell_time_frames=1, + dwell_time_seconds=0.0, state=TrackState.ACTIVE, + zones_present=["restricted_door"], + ) + registry = {} + counts = {} + cooldown = {} + + # First entry + classify_action(obj, None, registry, counts, cooldown, 1000.0) + + # Second entry (triggers REPEATED_APPROACH, sets cooldown) + hint2 = classify_action(obj, None, registry, counts, cooldown, 2000.0) + assert hint2 == ActionHint.REPEATED_APPROACH + + # Third entry right after (within 10s cooldown) + hint3 = classify_action(obj, None, registry, counts, cooldown, 5000.0) + assert hint3 != ActionHint.REPEATED_APPROACH + + # Fourth entry after cooldown expires (15000 is > 10000 ms since 2000) + hint4 = classify_action(obj, None, registry, counts, cooldown, 15000.0) + assert hint4 == ActionHint.REPEATED_APPROACH \ No newline at end of file