From 69fc14d8c5c18ce6b42b5ef0e46fb3f0bd6e1b85 Mon Sep 17 00:00:00 2001 From: dilshadalikhan2004 Date: Tue, 19 May 2026 10:36:20 +0530 Subject: [PATCH] feat: adaptive anomaly baseline per zone using Welford's algorithm --- apps/backend/main.py | 24 +++++ apps/backend/routes/zones.py | 44 +++++++++ libs/config/settings.py | 7 ++ libs/schemas/tracking.py | 1 + services/memory/baseline.py | 128 +++++++++++++++++++++++++ services/memory/memory.py | 64 +++++-------- services/memory/requirements.txt | 3 + tests/test_baseline.py | 158 +++++++++++++++++++++++++++++++ 8 files changed, 391 insertions(+), 38 deletions(-) create mode 100644 apps/backend/routes/zones.py create mode 100644 services/memory/baseline.py create mode 100644 tests/test_baseline.py diff --git a/apps/backend/main.py b/apps/backend/main.py index ce39c46..7b360b3 100644 --- a/apps/backend/main.py +++ b/apps/backend/main.py @@ -23,15 +23,22 @@ import logging import os +<<<<<<< HEAD import redis as redis_sync from fastapi import FastAPI, Query from fastapi.responses import Response from prometheus_client import generate_latest +======= +from apps.backend.routes.zones import router as zones_router + +app = FastAPI() +>>>>>>> 4d99088 (feat: adaptive anomaly baseline per zone using Welford's algorithm) from apps.backend.routes.cameras import identity_router, router as cameras_router from apps.backend.routes.feedback import router as feedback_router from libs.observability.metrics import frames_processed_total +<<<<<<< HEAD logger = logging.getLogger(__name__) # ── App ─────────────────────────────────────────────────────────────────────── @@ -57,6 +64,23 @@ def _get_redis() -> redis_sync.Redis | None: """ global _redis if _redis is None: +======= +try: + r = redis.from_url(REDIS_URL) + r.ping() + app.state.redis = r + print(f"[INFO] Connected to Redis at {REDIS_URL}") +except (redis.RedisError, redis.ConnectionError) as e: + print(f"[WARN] Redis not available: {e}") + r = None + +app.include_router(zones_router) + +@app.get("/health") +def health(): + redis_status = "healthy" + if r is not None: +>>>>>>> 4d99088 (feat: adaptive anomaly baseline per zone using Welford's algorithm) try: client = redis_sync.from_url(REDIS_URL, socket_connect_timeout=2) client.ping() diff --git a/apps/backend/routes/zones.py b/apps/backend/routes/zones.py new file mode 100644 index 0000000..c7996ea --- /dev/null +++ b/apps/backend/routes/zones.py @@ -0,0 +1,44 @@ +""" +apps/backend/routes/zones.py — Zone adaptive baseline statistics endpoint. + +GET /zones/{name}/stats + Returns Welford running statistics for the named zone. +""" +from __future__ import annotations + +from fastapi import APIRouter, Depends, HTTPException, Request +from pydantic import BaseModel + +from services.memory.baseline import ZoneBaseline, _ZONE_NAME_RE + +router = APIRouter(prefix="/zones", tags=["zones"]) + + +class ZoneStatsResponse(BaseModel): + zone: str + count: int + mean: float + variance: float + std: float + m2: float + + +def _get_redis(request: Request): + try: + return request.app.state.redis + except AttributeError: + raise HTTPException(status_code=503, detail="Redis not initialised in app.state") + + +@router.get("/{name}/stats", response_model=ZoneStatsResponse) +def get_zone_stats(name: str, redis=Depends(_get_redis)) -> ZoneStatsResponse: + """ + Return adaptive dwell-time statistics for *name* zone. + + Statistics are computed incrementally via Welford's algorithm and + persisted in Redis under ``zone:{name}:stats``. + """ + if not _ZONE_NAME_RE.match(name): + raise HTTPException(status_code=422, detail="Invalid zone name") + stats = ZoneBaseline(redis, name).get_stats() + return ZoneStatsResponse(**stats) diff --git a/libs/config/settings.py b/libs/config/settings.py index e3dfe2f..ce8f2e4 100644 --- a/libs/config/settings.py +++ b/libs/config/settings.py @@ -17,6 +17,13 @@ class Settings(BaseSettings): reasoning_dwell_threshold_seconds: float = 5.0 reasoning_cooldown_seconds: float = 5.0 + # Action classifier settings + lingering_threshold_sec: float = 10.0 + movement_threshold_px: float = 5.0 + near_keypad_dist_px: float = 80.0 + keypad_center_x: float = 640.0 + keypad_center_y: float = 360.0 + # Kafka Settings use_kafka: bool = False kafka_bootstrap_servers: str = "localhost:9092" diff --git a/libs/schemas/tracking.py b/libs/schemas/tracking.py index 274ae5f..fd065b9 100644 --- a/libs/schemas/tracking.py +++ b/libs/schemas/tracking.py @@ -32,6 +32,7 @@ class TrackedObject(BaseModel): trajectory: list[TrajectoryPoint] = Field(default_factory=list) zones_present: list[str] = Field(default_factory=list) last_seen_frame: int = 0 + is_anomalous: bool = Field(False, description="True when dwell exceeds adaptive zone baseline") class TrackedFrame(BaseModel): diff --git a/services/memory/baseline.py b/services/memory/baseline.py new file mode 100644 index 0000000..bfa290d --- /dev/null +++ b/services/memory/baseline.py @@ -0,0 +1,128 @@ +""" +baseline.py — Adaptive anomaly baseline per surveillance zone. + +Uses Welford's online algorithm to maintain a running mean and variance of +dwell times without batch recomputation. Statistics are persisted in Redis +under ``zone:{name}:stats`` so they survive restarts. + +Anomaly rule +------------ + dwell > mean + 2.5 * std +""" +from __future__ import annotations + +import json +import math +import re +from dataclasses import dataclass +from typing import Optional + +_ZONE_NAME_RE = re.compile(r"^[a-zA-Z0-9_\-]{1,64}$") + +STATS_TTL = 0 # 0 = no expiry — stats should persist indefinitely +ANOMALY_THRESHOLD = 2.5 +MIN_COUNT_FOR_ANOMALY = 10 # need enough samples before flagging outliers + + +@dataclass +class WelfordStats: + count: int = 0 + mean: float = 0.0 + m2: float = 0.0 # sum of squared deviations (Welford accumulator) + + @property + def variance(self) -> float: + # Sample variance (Bessel's correction) — correct for anomaly detection + return self.m2 / (self.count - 1) if self.count > 1 else 0.0 + + @property + def std(self) -> float: + return math.sqrt(self.variance) + + +class ZoneBaseline: + """ + Per-zone adaptive dwell-time baseline backed by Redis. + + Parameters + ---------- + redis_client: + Connected ``redis.Redis`` (or FakeRedis for tests). + zone_name: + Logical zone identifier, e.g. ``"restricted_exit"``. + """ + + def __init__(self, redis_client, zone_name: str) -> None: + if not _ZONE_NAME_RE.match(zone_name): + raise ValueError( + f"Invalid zone name '{zone_name}'. " + "Only alphanumeric characters, hyphens, and underscores are allowed (max 64 chars)." + ) + self._r = redis_client + self._zone = zone_name + self._key = f"zone:{zone_name}:stats" + self._stats: Optional[WelfordStats] = None # lazy-loaded + + # ── Public API ──────────────────────────────────────────────────────────── + + def update(self, dwell: float) -> None: + """Ingest one dwell-time observation and persist updated stats.""" + s = self._load() + s.count += 1 + delta = dwell - s.mean + s.mean += delta / s.count + delta2 = dwell - s.mean + s.m2 += delta * delta2 + self._save(s) + + def is_anomalous(self, dwell: float) -> bool: + """ + Return True when *dwell* exceeds mean + 2.5 * std. + + Returns False until MIN_COUNT_FOR_ANOMALY samples have been collected + so early noise doesn't produce false positives. + Returns False when std == 0 (all identical values) to avoid flagging + any noise as anomalous. + """ + s = self._load() + if s.count < MIN_COUNT_FOR_ANOMALY: + return False + if s.std == 0: + return False + return dwell > s.mean + ANOMALY_THRESHOLD * s.std + + def get_stats(self) -> dict: + """Return serialisable stats dict for the API response.""" + s = self._load() + return { + "zone": self._zone, + "count": s.count, + "mean": round(s.mean, 4), + "variance": round(s.variance, 4), + "std": round(s.std, 4), + "m2": round(s.m2, 6), + } + + # ── Redis helpers ───────────────────────────────────────────────────────── + + def _load(self) -> WelfordStats: + if self._stats is not None: + return self._stats + raw = self._r.get(self._key) + if raw is None: + self._stats = WelfordStats() + else: + d = json.loads(raw) + self._stats = WelfordStats( + count = d["count"], + mean = d["mean"], + m2 = d["m2"], + ) + return self._stats + + def _save(self, s: WelfordStats) -> None: + payload = json.dumps({"count": s.count, "mean": s.mean, "m2": s.m2}) + if STATS_TTL: + self._r.setex(self._key, STATS_TTL, payload) + else: + self._r.set(self._key, payload) diff --git a/services/memory/memory.py b/services/memory/memory.py index 2379975..efa076c 100644 --- a/services/memory/memory.py +++ b/services/memory/memory.py @@ -38,8 +38,8 @@ from libs.observability.metrics import redis_write_latency from libs.schemas.memory import ActionHint, TrackEvent, TrackSequence from libs.schemas.tracking import TrackLifecycleEvent, TrackState -from libs.schemas.memory import TrackEvent, TrackSequence from services.tracking.cross_camera_reid import CrossCameraReID +from services.memory.baseline import ZoneBaseline logger = logging.getLogger(__name__) @@ -91,18 +91,19 @@ def handle_lifecycle_event( The global_id string if one was assigned, else None. """ global_id: Optional[str] = None + zone_anomalous: bool = False if event.event == TrackState.BORN: global_id = self._handle_born(event, embedding) elif event.event == TrackState.LOST: - global_id = self._handle_lost(event, embedding) + global_id, zone_anomalous = self._handle_lost(event, embedding) elif event.event == TrackState.DEAD: self._handle_dead(event) # Always append the raw event to the event log - self._append_event(event, global_id) + self._append_event(event, global_id, zone_anomalous) return global_id def get_track_record(self, camera_id: str, track_id: int) -> Optional[dict]: @@ -165,7 +166,7 @@ def _handle_lost( self, event: TrackLifecycleEvent, embedding: Optional[np.ndarray], - ) -> Optional[str]: + ) -> tuple[Optional[str], bool]: record = self._load_record(event.camera_id, event.track_id) global_id = record.get("global_id") if record else None @@ -178,9 +179,19 @@ def _handle_lost( global_id=global_id, ) - self._update_record(event, TrackState.LOST.value) - logger.info("LOST cam=%s track=%d gid=%s", event.camera_id, event.track_id, global_id) - return global_id + # Detect anomaly BEFORE updating baseline (avoid contaminating with outlier) + # then update baseline for each zone this track visited + zone_anomalous = False + for zone in event.zones_present: + baseline = ZoneBaseline(self._r, zone) + if baseline.is_anomalous(event.dwell_time_seconds): + zone_anomalous = True + baseline.update(event.dwell_time_seconds) + + self._update_record(event, TrackState.LOST.value, zone_anomalous) + logger.info("LOST cam=%s track=%d gid=%s anomalous=%s", + event.camera_id, event.track_id, global_id, zone_anomalous) + return global_id, zone_anomalous def _handle_dead(self, event: TrackLifecycleEvent) -> None: self._update_record(event, TrackState.DEAD.value) @@ -200,7 +211,7 @@ def _load_record(self, camera_id: str, track_id: int) -> Optional[dict]: raw = self._r.get(self._track_key(camera_id, track_id)) return json.loads(raw) if raw else None - def _update_record(self, event: TrackLifecycleEvent, state: str) -> None: + def _update_record(self, event: TrackLifecycleEvent, state: str, anomalous: bool = False) -> None: record = self._load_record(event.camera_id, event.track_id) or {} record.update( { @@ -209,6 +220,7 @@ def _update_record(self, event: TrackLifecycleEvent, state: str) -> None: "last_seen_ms": event.timestamp_ms, "dwell_time_seconds": event.dwell_time_seconds, "zones_present": event.zones_present, + "anomalous": anomalous, } ) self._r.setex( @@ -221,6 +233,7 @@ def _append_event( self, event: TrackLifecycleEvent, global_id: Optional[str], + anomalous: bool = False, ) -> None: key = self._event_key(event.camera_id, event.frame_id) raw = self._r.get(key) @@ -234,6 +247,7 @@ def _append_event( "timestamp_ms": event.timestamp_ms, "dwell_time_seconds": event.dwell_time_seconds, "zones_present": event.zones_present, + "anomalous": anomalous, } ) with redis_write_latency.time(): @@ -287,31 +301,6 @@ def _zone_count_key(self, track_id: int, zone: str) -> str: def _active_key(self) -> str: return f"active:{self._camera_id}" - def get_sequence(self, track_id: int, last_n: Optional[int] = None) -> "TrackSequence": - key = self._events_key(track_id) - raw = self._r.lrange(key, 0, -1) - events: list[TrackEvent] = [] - for item in raw: - data = json.loads(item) - events.append(TrackEvent(**data)) - if last_n is not None: - events = events[-last_n:] - # Populate summary fields expected by consumers/tests - camera_id = events[0].camera_id if events else "cam_01" - total_dwell = sum(e.dwell_time_seconds for e in events) - zones_visited: list[str] = [] - for e in events: - if e.zone and e.zone not in zones_visited: - zones_visited.append(e.zone) - - return TrackSequence( - track_id=track_id, - camera_id=camera_id, - events=events, - total_dwell=total_dwell, - zones_visited=zones_visited, - ) - def store_event(self, event) -> None: """ Append a ``TrackEvent`` to the ring buffer for its track. @@ -323,7 +312,6 @@ def store_event(self, event) -> None: Args: event: ``TrackEvent`` instance (from ``libs.schemas.memory``). """ - from libs.schemas.memory import ActionHint key = self._seq_key(event.track_id) serialised = event.model_dump_json() @@ -340,7 +328,7 @@ def store_event(self, event) -> None: pipe.execute() - def get_sequence(self, track_id: int, last_n: Optional[int] = None): + def get_sequence(self, track_id: int, last_n: Optional[int] = None, camera_id: Optional[str] = None): """ Return a ``TrackSequence`` for the given track. @@ -351,7 +339,7 @@ def get_sequence(self, track_id: int, last_n: Optional[int] = None): Returns: ``TrackSequence`` (empty if the track has no stored events). """ - from libs.schemas.memory import TrackEvent, TrackSequence + from libs.schemas.memory import TrackEvent key = self._seq_key(track_id) raw_list = self._r.lrange(key, -last_n, -1) if last_n else self._r.lrange(key, 0, -1) @@ -376,7 +364,7 @@ def get_sequence(self, track_id: int, last_n: Optional[int] = None): total_dwell=total_dwell, ) - def get_zone_entry_count(self, track_id: int, zone: str) -> int: + def get_zone_entry_count(self, track_id: int, zone: str, camera_id: Optional[str] = None) -> int: """Return the number of times *track_id* has entered *zone*.""" raw = self._r.get(self._zone_count_key(track_id, zone)) if raw is None: @@ -388,7 +376,7 @@ def get_active_track_ids(self, camera_id: str) -> set[int]: members = self._r.smembers(f"active:{camera_id}") return {int(m if isinstance(m, (int, str)) else m.decode()) for m in members} - def expire_track(self, track_id: int) -> None: + def expire_track(self, track_id: int, camera_id: Optional[str] = None) -> None: """Remove all stored data for *track_id* and deregister it as active.""" pipe = self._r.pipeline() pipe.delete(self._seq_key(track_id)) diff --git a/services/memory/requirements.txt b/services/memory/requirements.txt index b8ade6e..82cdd76 100644 --- a/services/memory/requirements.txt +++ b/services/memory/requirements.txt @@ -1,6 +1,9 @@ # Add to services/memory/requirements.txt redis==5.0.4 +numpy>=1.24.0 pydantic>=2.10.0 +pydantic-settings>=2.0.0 +confluent-kafka>=2.3.0 pytest==8.1.1 pytest-mock==3.14.0 fakeredis==2.21.3 diff --git a/tests/test_baseline.py b/tests/test_baseline.py new file mode 100644 index 0000000..41699e2 --- /dev/null +++ b/tests/test_baseline.py @@ -0,0 +1,158 @@ +""" +tests/test_baseline.py — Unit tests for ZoneBaseline (Welford's algorithm). + +Reference values computed independently with Python's statistics module. +""" +import math +import statistics + +import pytest +import fakeredis + +from services.memory.baseline import ZoneBaseline, MIN_COUNT_FOR_ANOMALY + + +@pytest.fixture +def redis_client(): + return fakeredis.FakeRedis() + + +@pytest.fixture +def baseline(redis_client): + return ZoneBaseline(redis_client, "test_zone") + + +# ── Welford math ────────────────────────────────────────────────────────────── + +SAMPLES = [10.0, 15.0, 12.0, 30.0, 11.0, 14.0, 13.0, 9.0, 16.0, 20.0] + + +def test_welford_mean_matches_reference(baseline): + for s in SAMPLES: + baseline.update(s) + stats = baseline.get_stats() + assert math.isclose(stats["mean"], statistics.mean(SAMPLES), rel_tol=1e-6) + + +def test_welford_std_matches_reference(baseline): + for s in SAMPLES: + baseline.update(s) + stats = baseline.get_stats() + # get_stats() rounds to 4dp; compare with abs_tol matching that precision + assert math.isclose(stats["std"], statistics.stdev(SAMPLES), abs_tol=1e-4) + + +def test_welford_variance_matches_reference(baseline): + for s in SAMPLES: + baseline.update(s) + stats = baseline.get_stats() + assert math.isclose(stats["variance"], statistics.variance(SAMPLES), rel_tol=1e-6) + + +def test_welford_count(baseline): + for s in SAMPLES: + baseline.update(s) + assert baseline.get_stats()["count"] == len(SAMPLES) + + +# ── Redis persistence ───────────────────────────────────────────────────────── + +def test_stats_persist_across_instances(redis_client): + """A new ZoneBaseline instance reading the same Redis key sees prior data.""" + b1 = ZoneBaseline(redis_client, "persist_zone") + for s in SAMPLES: + b1.update(s) + + b2 = ZoneBaseline(redis_client, "persist_zone") + stats = b2.get_stats() + assert stats["count"] == len(SAMPLES) + assert math.isclose(stats["mean"], statistics.mean(SAMPLES), rel_tol=1e-6) + + +def test_in_memory_cache_used_after_first_load(redis_client): + """Second call to _load() returns cached object without hitting Redis.""" + b = ZoneBaseline(redis_client, "cache_zone") + b.update(10.0) + first = b._load() + second = b._load() + assert first is second + + +def test_different_zones_are_independent(redis_client): + za = ZoneBaseline(redis_client, "zone_a") + zb = ZoneBaseline(redis_client, "zone_b") + + for s in SAMPLES: + za.update(s) + zb.update(999.0) + + assert za.get_stats()["count"] == len(SAMPLES) + assert zb.get_stats()["count"] == 1 + + +# ── Anomaly detection ───────────────────────────────────────────────────────── + +def test_zero_std_not_flagged(baseline): + """When std==0 (all identical values) nothing should be anomalous.""" + for _ in range(MIN_COUNT_FOR_ANOMALY): + baseline.update(10.0) + assert baseline.is_anomalous(10.001) is False + assert baseline.is_anomalous(9999.0) is False + + +def test_detect_before_update_does_not_contaminate(redis_client): + """Anomaly detection must happen before baseline update.""" + b = ZoneBaseline(redis_client, "order_zone") + # Use varied samples so std > 0, enabling anomaly detection + for v in [10.0, 11.0, 9.0, 10.5, 9.5, 10.2, 9.8, 10.1, 9.9, 10.3]: + b.update(v) + mean_before = b.get_stats()["mean"] + outlier = 9999.0 + # std > 0 now, so outlier is correctly flagged + assert b.is_anomalous(outlier) is True + b.update(outlier) + # mean shifts only after detection — correct order confirmed + assert b.get_stats()["mean"] > mean_before + + +def test_no_anomaly_before_min_count(baseline): + """Anomaly detection is suppressed until MIN_COUNT_FOR_ANOMALY samples.""" + for _ in range(MIN_COUNT_FOR_ANOMALY - 1): + baseline.update(10.0) + assert baseline.is_anomalous(9999.0) is False + + +def test_anomaly_flagged_for_outlier(baseline): + # Use varied samples so std > 0, then test a clear outlier + for v in [10.0, 11.0, 9.0, 10.5, 9.5, 10.2, 9.8, 10.1, 9.9, 10.3]: + baseline.update(v) + # mean ~10.1, std ~0.45 → threshold ~10.1 + 2.5*0.45 ~11.2 → 50.0 is anomalous + assert baseline.is_anomalous(50.0) is True + + +def test_normal_dwell_not_flagged(baseline): + for s in SAMPLES: + baseline.update(s) + mean = statistics.mean(SAMPLES) + assert baseline.is_anomalous(mean) is False + + +def test_empty_stats_returns_zero(baseline): + stats = baseline.get_stats() + assert stats["count"] == 0 + assert stats["mean"] == 0.0 + assert stats["std"] == 0.0 + + +# ── Input validation ────────────────────────────────────────────────────────── + +def test_invalid_zone_name_raises(redis_client): + import pytest + with pytest.raises(ValueError, match="Invalid zone name"): + ZoneBaseline(redis_client, "../../etc/passwd") + + +def test_zone_name_with_special_chars_raises(redis_client): + import pytest + with pytest.raises(ValueError): + ZoneBaseline(redis_client, "zone")