Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions wavefront/server/apps/floware/floware/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ async def start_redis_listener(
queue = asyncio.Queue()

pubsub = cache_manager.subscribe(channels=[REDIS_API_SERVICE_UPDATES_CHANNEL])

if pubsub is None:
logger.info('Cache is disabled — skipping Redis pubsub listener')
return

logger.info('Subscribed to Redis channel: %s', REDIS_API_SERVICE_UPDATES_CHANNEL)

# Capture the running loop from the main thread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,32 @@ def __init__(
self.initial_backoff = initial_backoff
self.max_backoff = max_backoff

self.pool = self._create_connection_pool(
connection_timeout=connection_timeout,
socket_timeout=socket_timeout,
socket_keepalive=socket_keepalive,
pool_size=pool_size,
self.cache_enabled = (
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DId you test this? Is it working fine with cache disabled ?

os.getenv('WAVEFRONT_CACHE_ENABLED', 'true').lower() == 'true'
)

self.redis = self._create_redis_connection()
if self.cache_enabled:
self.pool = self._create_connection_pool(
connection_timeout=connection_timeout,
socket_timeout=socket_timeout,
socket_keepalive=socket_keepalive,
pool_size=pool_size,
)

# Test the connection immediately - fail fast if Redis is unreachable
try:
self.redis.ping()
logger.info('Connected to Redis with redis ping')
except (ConnectionError, TimeoutError, RedisError) as e:
logger.error(f'Failed to connect to Redis during initialization: {e}')
logger.error('Server will not start without Redis connectivity')
raise RuntimeError(f'Redis connection test failed: {e}') from e
self.redis = self._create_redis_connection()

# Test the connection immediately - fail fast if Redis is unreachable
try:
self.redis.ping()
logger.info('Connected to Redis with redis ping')
except (ConnectionError, TimeoutError, RedisError) as e:
logger.error(f'Failed to connect to Redis during initialization: {e}')
logger.error('Server will not start without Redis connectivity')
raise RuntimeError(f'Redis connection test failed: {e}') from e
else:
self.pool = None
self.redis = None
logger.info('Cache is disabled via WAVEFRONT_CACHE_ENABLED env variable')

def _create_connection_pool(
self,
Expand Down Expand Up @@ -80,6 +89,9 @@ def _create_redis_connection(self) -> Redis:
return Redis(connection_pool=self.pool)

def _checking_redis_connection(self):
if not self.cache_enabled:
return False

try:
self.redis.ping()
return True
Expand All @@ -100,6 +112,9 @@ def add(
expiry: int = 3600,
nx: bool = False,
) -> bool:
if not self.cache_enabled:
return False

try:
logger.info(f'Adding key: {key} to cache with expiry: {expiry} seconds')
return bool(
Expand All @@ -115,6 +130,9 @@ def add(
retry=retry_if_exception_type((RedisError, ConnectionError, TimeoutError)),
)
def get_str(self, key: str, default: Any = None) -> Optional[str]:
if not self.cache_enabled:
return default

try:
value = self.redis.get(f'{self.namespace}/{key}')
return value if value is not None else default
Expand All @@ -124,10 +142,16 @@ def get_str(self, key: str, default: Any = None) -> Optional[str]:
raise

def get_int(self, key: str, default: int = 0) -> int:
if not self.cache_enabled:
return default

value = self.get_str(key, default)
return int(value) if value is not None else default

def remove(self, key: str) -> bool:
if not self.cache_enabled:
return False

try:
return bool(self.redis.delete(f'{self.namespace}/{key}'))
except (RedisError, ConnectionError, TimeoutError) as e:
Expand All @@ -136,6 +160,9 @@ def remove(self, key: str) -> bool:

def invalidate_query(self, pattern: str) -> int:
"""Remove all keys matching the given pattern"""
if not self.cache_enabled:
return 0

try:
# Get all keys matching the pattern
search_pattern = f'{self.namespace}/{pattern}'
Expand Down Expand Up @@ -170,6 +197,9 @@ def publish(self, channel: str, message: str) -> int:
Raises:
RedisError: If publishing fails
"""
if not self.cache_enabled:
return 0

Comment on lines +200 to +202
Copy link
Copy Markdown

@coderabbitai coderabbitai bot Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== CacheManager publish/subscribe call sites ==="
rg -n -C3 '\bcache_manager\.(publish|subscribe)\s*\(' wavefront

echo
echo "=== Redis API update channel usage ==="
rg -n -C3 'REDIS_API_SERVICE_UPDATES_CHANNEL|process_message\s*\(' wavefront

echo
echo "=== CommonCache / subscribe contract ==="
rg -n -C3 '\bclass CommonCache\b|\bdef subscribe\s*\(' wavefront

echo
echo "=== Publisher / processor sources ==="
fd 'api_change_publisher\.py$' wavefront -x sed -n '1,220p' {}
fd 'api_change_processor\.py$' wavefront -x sed -n '1,260p' {}

Repository: rootflo/wavefront

Length of output: 11393


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Full publish and subscribe methods ==="
sed -n '195,250p' wavefront/server/modules/db_repo_module/db_repo_module/cache/cache_manager.py

echo
echo "=== Check return type hints for subscribe method ==="
rg -n 'def subscribe' wavefront/server/modules/db_repo_module/db_repo_module/cache/cache_manager.py -A 50 | head -60

echo
echo "=== Any other usages of cache_manager.publish/subscribe beyond API updates ==="
rg -n '\.publish\(|\.subscribe\(' wavefront --type py | grep -v 'test' | grep -v '.pyc'

Repository: rootflo/wavefront

Length of output: 5441


Update subscribe() and publish() docstrings to document behavior when cache is disabled.

The cache_enabled flag at lines 200–202 and 211–237 disables more than key/value caching. The REDIS_API_SERVICE_UPDATES_CHANNEL carries API service updates (create/update/delete operations that reload/remove services in the proxy router), which is critical infrastructure—not cache invalidation.

  • publish() becomes a no-op (returns 0) when cache is off, silencing service updates across the system.
  • subscribe() returns None instead of a PubSub object, but the docstring only documents the normal case and omits this behavior. Callers must check for None explicitly (as floware/channels.py line 22 does).

Either document the optional None return in both methods' docstrings, or reconsider whether disabling pub/sub is the right behavior when cache is disabled. If the current design is intentional, make the contract explicit in the type hints and docstrings.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/db_repo_module/db_repo_module/cache/cache_manager.py`
around lines 200 - 202, The subscribe() and publish() docstrings currently omit
the behavior when cache_enabled is False (publish returning 0 and subscribe
returning None), so update both docstrings and accompanying type hints to
document and reflect this contract: in cache_manager.py, amend the publish()
docstring to state that when self.cache_enabled is False it is a no-op and
returns 0, and amend the subscribe() docstring and signature to indicate it may
return None (Optional[PubSub]) when cache is disabled; reference the
cache_enabled flag, REDIS_API_SERVICE_UPDATES_CHANNEL, and the
subscribe()/publish() methods so callers (e.g., floware/channels.py) know to
check for None or a 0 result.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rootflo-hardik Lets not disable publish and subscribe

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

try:
full_channel = f'{self.namespace}/{channel}'
logger.info(f'Publishing message to channel: {full_channel}')
Expand Down Expand Up @@ -203,6 +233,9 @@ def subscribe(
if message['type'] == 'message':
print(f"Received: {message['data']}")
"""
if not self.cache_enabled:
return None

try:
pubsub = self.redis.pubsub()

Expand All @@ -225,6 +258,9 @@ def subscribe(
raise

def close(self):
if not self.cache_enabled:
return

try:
self.pool.disconnect()
logger.info('Redis connection pool closed successfully')
Expand Down
Loading