From 091a27b96433dc78aaaebace8eb07b62b109136c Mon Sep 17 00:00:00 2001 From: mrveiss Date: Wed, 1 Apr 2026 21:28:50 +0300 Subject: [PATCH] fix(backend): persist notification config to Redis across restarts (#3166) Add persistence.py with save/load to Redis key autobot:workflow:notif_config:{id} (7-day TTL). Wire into GET/PUT endpoints to hydrate on read and persist on write. Add 11 tests covering round-trip, validation, and Redis unavailability. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../workflow_automation/persistence.py | 79 +++++++ .../services/workflow_automation/routes.py | 11 +- .../test_workflow_notification_persistence.py | 201 ++++++++++++++++++ 3 files changed, 288 insertions(+), 3 deletions(-) create mode 100644 autobot-backend/services/workflow_automation/persistence.py create mode 100644 autobot-backend/tests/services/test_workflow_notification_persistence.py diff --git a/autobot-backend/services/workflow_automation/persistence.py b/autobot-backend/services/workflow_automation/persistence.py new file mode 100644 index 000000000..8e8722163 --- /dev/null +++ b/autobot-backend/services/workflow_automation/persistence.py @@ -0,0 +1,79 @@ +# AutoBot - AI-Powered Automation Platform +# Copyright (c) 2025 mrveiss +# Author: mrveiss +"""Redis persistence for per-workflow notification configuration (#3166). + +Key schema:: + + autobot:workflow:notif_config:{workflow_id} — JSON blob, 7-day TTL + +The notification config is a simple dataclass so we serialise it with +``dataclasses.asdict`` and reconstruct it with ``NotificationConfig(**data)``. +""" + +import json +import logging +from dataclasses import asdict +from typing import Optional + +from autobot_shared.redis_client import get_redis_client +from constants.redis_constants import REDIS_KEY +from services.notification_service import NotificationConfig + +logger = logging.getLogger(__name__) + +_NOTIF_CONFIG_TTL = 7 * 24 * 3600 # 7 days, matches notification store TTL + + +def _notif_config_key(workflow_id: str) -> str: + return f"{REDIS_KEY.NAMESPACE}:workflow:notif_config:{workflow_id}" + + +async def save_notification_config( + workflow_id: str, + config: Optional[NotificationConfig], +) -> None: + """Persist *config* to Redis, or delete the key when *config* is None.""" + redis = await get_redis_client(async_client=True, database="main") + if redis is None: + logger.warning( + "Redis unavailable — notification config not persisted (workflow=%s)", + workflow_id, + ) + return + key = _notif_config_key(workflow_id) + if config is None: + await redis.delete(key) + logger.debug("Deleted notification config from Redis (workflow=%s)", workflow_id) + return + payload = json.dumps(asdict(config), ensure_ascii=False) + await redis.set(key, payload, ex=_NOTIF_CONFIG_TTL) + logger.debug("Persisted notification config to Redis (workflow=%s)", workflow_id) + + +async def load_notification_config( + workflow_id: str, +) -> Optional[NotificationConfig]: + """Load notification config from Redis; returns None when not found.""" + redis = await get_redis_client(async_client=True, database="main") + if redis is None: + logger.warning( + "Redis unavailable — cannot load notification config (workflow=%s)", + workflow_id, + ) + return None + key = _notif_config_key(workflow_id) + raw = await redis.get(key) + if raw is None: + return None + text = raw.decode("utf-8") if isinstance(raw, bytes) else raw + try: + data = json.loads(text) + return NotificationConfig(**data) + except (json.JSONDecodeError, TypeError) as exc: + logger.error( + "Malformed notification config in Redis (workflow=%s): %s", + workflow_id, + exc, + ) + return None diff --git a/autobot-backend/services/workflow_automation/routes.py b/autobot-backend/services/workflow_automation/routes.py index 24cf37ddf..8cf03a5f4 100644 --- a/autobot-backend/services/workflow_automation/routes.py +++ b/autobot-backend/services/workflow_automation/routes.py @@ -29,6 +29,7 @@ WorkflowControlRequest, WorkflowStep, ) +from .persistence import load_notification_config, save_notification_config logger = logging.getLogger(__name__) @@ -501,9 +502,11 @@ async def get_notification_config( workflow_id: str, current_user: dict = Depends(get_current_user), ): - """Return the notification configuration for a workflow (#3139).""" + """Return the notification configuration for a workflow (#3139, #3166).""" try: wf = _find_workflow(workflow_id, current_user) + if wf.notification_config is None: + wf.notification_config = await load_notification_config(workflow_id) config_dict = None if wf.notification_config is not None: config_dict = asdict(wf.notification_config) @@ -531,10 +534,11 @@ async def update_notification_config( current_user: dict = Depends(get_current_user), ): """ - Create or update the notification configuration for a workflow (#3139). + Create or update the notification configuration for a workflow (#3139, #3166). When ``enabled`` is false the config is removed entirely so the - executor skips notification delivery. + executor skips notification delivery. The config is persisted to + Redis so it survives a backend restart. """ try: wf = _find_workflow(workflow_id, current_user) @@ -551,6 +555,7 @@ async def update_notification_config( slack_webhook_url=request.slack_webhook_url, webhook_url=request.webhook_url, ) + await save_notification_config(workflow_id, wf.notification_config) config_dict = None if wf.notification_config is not None: config_dict = asdict(wf.notification_config) diff --git a/autobot-backend/tests/services/test_workflow_notification_persistence.py b/autobot-backend/tests/services/test_workflow_notification_persistence.py new file mode 100644 index 000000000..9d347ccba --- /dev/null +++ b/autobot-backend/tests/services/test_workflow_notification_persistence.py @@ -0,0 +1,201 @@ +# AutoBot - AI-Powered Automation Platform +# Copyright (c) 2025 mrveiss +# Author: mrveiss +"""Tests for workflow notification config persistence (#3166). + +Covers: +- WorkflowNotificationStore save/load/delete via mocked Redis. +- Validation on NotificationConfigRequest (emails, slack URL, webhook URL). +- Route-level behaviour: PUT persists, GET hydrates from Redis when in-memory + value is missing. +""" + +import json +from dataclasses import asdict +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from pydantic import ValidationError + +from services.notification_service import NotificationConfig +from services.workflow_automation.models import NotificationConfigRequest +from services.workflow_automation.persistence import ( + load_notification_config, + save_notification_config, +) + +# =========================================================================== +# Helpers +# =========================================================================== + +_WF_ID = "wf-persist-test" + + +def _make_config(**kwargs) -> NotificationConfig: + defaults = dict( + workflow_id=_WF_ID, + channels={"workflow_completed": ["in_app"]}, + templates={}, + email_recipients=[], + slack_webhook_url=None, + webhook_url=None, + user_id="user-42", + ) + defaults.update(kwargs) + return NotificationConfig(**defaults) + + +def _make_redis(get_return=None): + mock = AsyncMock() + mock.set = AsyncMock(return_value=True) + mock.get = AsyncMock(return_value=get_return) + mock.delete = AsyncMock(return_value=1) + return mock + + +# =========================================================================== +# persistence.save_notification_config +# =========================================================================== + + +@pytest.mark.asyncio +async def test_save_persists_json_to_redis(): + config = _make_config() + redis_mock = _make_redis() + with patch( + "services.workflow_automation.persistence.get_redis_client", + return_value=redis_mock, + ): + await save_notification_config(_WF_ID, config) + + redis_mock.set.assert_awaited_once() + call_args = redis_mock.set.call_args + key = call_args[0][0] + payload_str = call_args[0][1] + assert f"notif_config:{_WF_ID}" in key + data = json.loads(payload_str) + assert data["workflow_id"] == _WF_ID + assert data["user_id"] == "user-42" + + +@pytest.mark.asyncio +async def test_save_none_deletes_key(): + redis_mock = _make_redis() + with patch( + "services.workflow_automation.persistence.get_redis_client", + return_value=redis_mock, + ): + await save_notification_config(_WF_ID, None) + + redis_mock.delete.assert_awaited_once() + redis_mock.set.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_save_graceful_when_redis_unavailable(): + with patch( + "services.workflow_automation.persistence.get_redis_client", + return_value=None, + ): + await save_notification_config(_WF_ID, _make_config()) + + +# =========================================================================== +# persistence.load_notification_config +# =========================================================================== + + +@pytest.mark.asyncio +async def test_load_returns_config_from_redis(): + config = _make_config() + raw_json = json.dumps(asdict(config)).encode("utf-8") + redis_mock = _make_redis(get_return=raw_json) + with patch( + "services.workflow_automation.persistence.get_redis_client", + return_value=redis_mock, + ): + result = await load_notification_config(_WF_ID) + + assert result is not None + assert result.workflow_id == _WF_ID + assert result.user_id == "user-42" + assert result.channels == {"workflow_completed": ["in_app"]} + + +@pytest.mark.asyncio +async def test_load_returns_none_when_key_missing(): + redis_mock = _make_redis(get_return=None) + with patch( + "services.workflow_automation.persistence.get_redis_client", + return_value=redis_mock, + ): + result = await load_notification_config(_WF_ID) + + assert result is None + + +@pytest.mark.asyncio +async def test_load_returns_none_when_redis_unavailable(): + with patch( + "services.workflow_automation.persistence.get_redis_client", + return_value=None, + ): + result = await load_notification_config(_WF_ID) + + assert result is None + + +@pytest.mark.asyncio +async def test_load_returns_none_on_malformed_json(): + redis_mock = _make_redis(get_return=b"not-valid-json") + with patch( + "services.workflow_automation.persistence.get_redis_client", + return_value=redis_mock, + ): + result = await load_notification_config(_WF_ID) + + assert result is None + + +# =========================================================================== +# NotificationConfigRequest validation +# =========================================================================== + + +def test_valid_request_passes(): + req = NotificationConfigRequest( + enabled=True, + email_recipients=["user@example.com"], + slack_webhook_url="https://hooks.slack.com/services/abc/def", + webhook_url="https://external.example.com/hook", + channels={"workflow_completed": ["slack", "email"]}, + ) + assert req.enabled is True + assert req.email_recipients == ["user@example.com"] + + +def test_invalid_email_rejected(): + with pytest.raises(ValidationError, match="Invalid email"): + NotificationConfigRequest(email_recipients=["not-an-email"]) + + +def test_invalid_slack_url_rejected(): + with pytest.raises(ValidationError, match="https://hooks.slack.com/"): + NotificationConfigRequest( + slack_webhook_url="https://discord.com/api/webhooks/xyz" + ) + + +def test_non_https_webhook_rejected(): + with pytest.raises(ValidationError, match="https://"): + NotificationConfigRequest(webhook_url="http://external.example.com/hook") + + +def test_private_ip_webhook_rejected(): + with pytest.raises(ValidationError, match="private"): + NotificationConfigRequest(webhook_url="https://192.168.1.1/hook") + + +def test_disabled_request_passes_without_urls(): + req = NotificationConfigRequest(enabled=False) + assert req.enabled is False