Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified .gitignore
Binary file not shown.
29 changes: 26 additions & 3 deletions services/memory/action_classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ def classify_action(
obj: TrackedObject,
prev_obj: TrackedObject | None,
known_zone_entries: dict[int, set[str]], # track_id → set of zones already entered
zone_entry_counts: dict[int, dict[str, int]] | None = None,
last_repeated_approach: dict[int, float] | None = None,
current_time_ms: float = 0.0,
) -> ActionHint:
"""
Infer an ActionHint for this frame based on tracker state and history.
Expand All @@ -43,9 +46,29 @@ def classify_action(
if obj.zones_present:
zone = obj.zones_present[0]
entered = known_zone_entries.setdefault(obj.track_id, set())
if zone not in entered:
entered.add(zone)
return ActionHint.ZONE_ENTRY

# Check if we just entered the zone this frame
just_entered = False
if prev_obj is None or not prev_obj.zones_present or zone not in prev_obj.zones_present:
just_entered = True

if just_entered:
if zone_entry_counts is not None:
counts = zone_entry_counts.setdefault(obj.track_id, {})
counts[zone] = counts.get(zone, 0) + 1

if counts[zone] >= 2:
if last_repeated_approach is not None:
last_time = last_repeated_approach.get(obj.track_id, 0.0)
if last_time == 0.0 or (current_time_ms - last_time) > 10000.0: # 10 second cooldown
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Use an inclusive cooldown boundary (>=) for exact 10s re-trigger.

Line 62 currently uses > 10000.0, so an event at exactly 10,000 ms is still suppressed. That makes the effective cooldown slightly longer than configured.

Proposed fix
-                        if last_time == 0.0 or (current_time_ms - last_time) > 10000.0:  # 10 second cooldown
+                        if last_time == 0.0 or (current_time_ms - last_time) >= 10000.0:  # 10 second cooldown
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if last_time == 0.0 or (current_time_ms - last_time) > 10000.0: # 10 second cooldown
if last_time == 0.0 or (current_time_ms - last_time) >= 10000.0: # 10 second cooldown
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@services/memory/action_classifier.py` at line 62, Update the cooldown
condition in the check that uses last_time and current_time_ms (the if with
"last_time == 0.0 or (current_time_ms - last_time) > 10000.0") to use an
inclusive boundary so events at exactly 10,000 ms can re-trigger; replace the
strict greater-than with a greater-than-or-equal-to comparison on
(current_time_ms - last_time) to make the 10s cooldown inclusive.

last_repeated_approach[obj.track_id] = current_time_ms
return ActionHint.REPEATED_APPROACH
else:
return ActionHint.REPEATED_APPROACH

if zone not in entered:
entered.add(zone)
return ActionHint.ZONE_ENTRY

# ── Lingering ─────────────────────────────────────────────────────────
if obj.zones_present and obj.dwell_time_seconds > LINGERING_THRESHOLD_SEC:
Expand Down
36 changes: 4 additions & 32 deletions services/memory/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@

import json
import logging
import time
from typing import Optional

import numpy as np

from libs.observability.metrics import redis_write_latency
from libs.schemas.memory import ActionHint, TrackEvent, TrackSequence
from libs.schemas.tracking import TrackLifecycleEvent, TrackState
from services.tracking.cross_camera_reid import CrossCameraReID
from services.memory.baseline import ZoneBaseline
Expand All @@ -47,9 +47,6 @@
TRACK_TTL_SECONDS = 86_400 # 24 h — keep per-track state for a full day
EVENT_TTL_SECONDS = 86_400

# ── MemoryStore constants ─────────────────────────────────────────────────────
MAX_EVENTS_PER_TRACK = 50 # ring-buffer cap per track_id


class MemoryService:
"""
Expand Down Expand Up @@ -257,33 +254,10 @@ def _append_event(
json.dumps(evts),
)


# ── MemoryStore ───────────────────────────────────────────────────────────────
MAX_EVENTS_PER_TRACK = 100

class MemoryStore:
"""
Lightweight ring-buffer event store for per-track behavioural sequences.

Stores ``TrackEvent`` objects (Phase 3 schema) in Redis lists capped at
``MAX_EVENTS_PER_TRACK`` entries. Designed for the action-classifier →
VLM/LLM reasoning pipeline.

Redis key schema
----------------
- ``seq:{camera_id}:{track_id}`` → JSON list of TrackEvent dicts
- ``zones:{camera_id}:{track_id}`` → Redis set of zone names visited
- ``zone_count:{camera_id}:{track_id}:{zone}`` → integer entry count
- ``active:{camera_id}`` → Redis set of active track_ids

Parameters
----------
redis_client:
Connected ``redis.Redis`` (or FakeRedis for tests).
camera_id:
Default camera identifier used when none is supplied per-event.
"""

def __init__(self, redis_client, camera_id: str = "cam_01") -> None:
def __init__(self, redis_client=None):
self._r = redis_client
self._camera_id = camera_id

Expand Down Expand Up @@ -322,9 +296,7 @@ def store_event(self, event) -> None:
pipe.sadd(self._active_key(), str(event.track_id))

if event.zone:
pipe.sadd(self._zones_key(event.track_id), event.zone)
if event.action_hint == ActionHint.ZONE_ENTRY:
pipe.incr(self._zone_count_key(event.track_id, event.zone))
self._zones.setdefault(event.track_id, set()).add(event.zone)

pipe.execute()

Expand Down
12 changes: 11 additions & 1 deletion services/memory/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

# Shared state for action classifier (tracks zone-entry history)
_zone_entry_registry: dict[int, set[str]] = {}
_zone_entry_counts: dict[int, dict[str, int]] = {}
_last_repeated_approach: dict[int, float] = {}
_prev_objects: dict[int, object] = {}

# Global Kafka producer instance
Expand All @@ -48,7 +50,15 @@ def process_tracked_frame(

for obj in tracked.tracks:
prev = _prev_objects.get(obj.track_id)
hint = classify_action(obj, prev, _zone_entry_registry)
current_time_ms = time.time() * 1000
hint = classify_action(
obj,
prev,
_zone_entry_registry,
zone_entry_counts=_zone_entry_counts,
last_repeated_approach=_last_repeated_approach,
current_time_ms=current_time_ms
)

event = TrackEvent(
track_id = obj.track_id,
Expand Down
83 changes: 82 additions & 1 deletion tests/test_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,85 @@ def test_lingering_hint():
)
registry = {2: {"restricted_door"}} # already entered
hint = classify_action(obj, obj, registry)
assert hint == ActionHint.LINGERING
assert hint == ActionHint.LINGERING


def test_repeated_approach_second_entry():
from services.memory.action_classifier import classify_action
from libs.schemas.tracking import TrackedObject, TrackState

obj = TrackedObject(
track_id=3, label="person", bbox=[100,80,200,300],
confidence=0.9, center=(150,190), dwell_time_frames=1,
dwell_time_seconds=0.0, state=TrackState.ACTIVE,
zones_present=["restricted_door"],
)
registry = {}
counts = {}
cooldown = {}

# First entry
hint1 = classify_action(obj, None, registry, counts, cooldown, 1000.0)
assert hint1 == ActionHint.ZONE_ENTRY

# Second entry
hint2 = classify_action(obj, None, registry, counts, cooldown, 2000.0)
assert hint2 == ActionHint.REPEATED_APPROACH


def test_repeated_approach_no_spam():
from services.memory.action_classifier import classify_action
from libs.schemas.tracking import TrackedObject, TrackState

obj = TrackedObject(
track_id=4, label="person", bbox=[100,80,200,300],
confidence=0.9, center=(150,190), dwell_time_frames=1,
dwell_time_seconds=0.0, state=TrackState.ACTIVE,
zones_present=["restricted_door"],
)
registry = {}
counts = {}
cooldown = {}

# First entry
hint1 = classify_action(obj, None, registry, counts, cooldown, 1000.0)
assert hint1 == ActionHint.ZONE_ENTRY

# Still inside the zone, not a new entry
hint_stay = classify_action(obj, obj, registry, counts, cooldown, 2000.0)
assert hint_stay != ActionHint.REPEATED_APPROACH
assert hint_stay != ActionHint.ZONE_ENTRY

# Leave and enter again
hint2 = classify_action(obj, None, registry, counts, cooldown, 3000.0)
assert hint2 == ActionHint.REPEATED_APPROACH


def test_repeated_approach_cooldown():
from services.memory.action_classifier import classify_action
from libs.schemas.tracking import TrackedObject, TrackState

obj = TrackedObject(
track_id=5, label="person", bbox=[100,80,200,300],
confidence=0.9, center=(150,190), dwell_time_frames=1,
dwell_time_seconds=0.0, state=TrackState.ACTIVE,
zones_present=["restricted_door"],
)
registry = {}
counts = {}
cooldown = {}

# First entry
classify_action(obj, None, registry, counts, cooldown, 1000.0)

# Second entry (triggers REPEATED_APPROACH, sets cooldown)
hint2 = classify_action(obj, None, registry, counts, cooldown, 2000.0)
assert hint2 == ActionHint.REPEATED_APPROACH

# Third entry right after (within 10s cooldown)
hint3 = classify_action(obj, None, registry, counts, cooldown, 5000.0)
assert hint3 != ActionHint.REPEATED_APPROACH

# Fourth entry after cooldown expires (15000 is > 10000 ms since 2000)
hint4 = classify_action(obj, None, registry, counts, cooldown, 15000.0)
assert hint4 == ActionHint.REPEATED_APPROACH
Loading