From eca63371fa63d8e70804d68a6ffe469132828de8 Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Tue, 17 Mar 2026 11:52:15 +0530 Subject: [PATCH] env variable for cache disable via WAVEFRONT_CACHE_ENABLED --- .../server/apps/floware/floware/channels.py | 5 ++ .../db_repo_module/cache/cache_manager.py | 64 +++++++++++++++---- 2 files changed, 55 insertions(+), 14 deletions(-) diff --git a/wavefront/server/apps/floware/floware/channels.py b/wavefront/server/apps/floware/floware/channels.py index a8b5be4f..b826f99f 100644 --- a/wavefront/server/apps/floware/floware/channels.py +++ b/wavefront/server/apps/floware/floware/channels.py @@ -18,6 +18,11 @@ async def start_redis_listener( queue = asyncio.Queue() pubsub = cache_manager.subscribe(channels=[REDIS_API_SERVICE_UPDATES_CHANNEL]) + + if pubsub is None: + logger.info('Cache is disabled — skipping Redis pubsub listener') + return + logger.info('Subscribed to Redis channel: %s', REDIS_API_SERVICE_UPDATES_CHANNEL) # Capture the running loop from the main thread diff --git a/wavefront/server/modules/db_repo_module/db_repo_module/cache/cache_manager.py b/wavefront/server/modules/db_repo_module/db_repo_module/cache/cache_manager.py index 14501e24..4e5162bf 100644 --- a/wavefront/server/modules/db_repo_module/db_repo_module/cache/cache_manager.py +++ b/wavefront/server/modules/db_repo_module/db_repo_module/cache/cache_manager.py @@ -32,23 +32,32 @@ def __init__( self.initial_backoff = initial_backoff self.max_backoff = max_backoff - self.pool = self._create_connection_pool( - connection_timeout=connection_timeout, - socket_timeout=socket_timeout, - socket_keepalive=socket_keepalive, - pool_size=pool_size, + self.cache_enabled = ( + os.getenv('WAVEFRONT_CACHE_ENABLED', 'true').lower() == 'true' ) - self.redis = self._create_redis_connection() + if self.cache_enabled: + self.pool = self._create_connection_pool( + connection_timeout=connection_timeout, + socket_timeout=socket_timeout, + socket_keepalive=socket_keepalive, + pool_size=pool_size, + ) - # Test the connection immediately - fail fast if Redis is unreachable - try: - self.redis.ping() - logger.info('Connected to Redis with redis ping') - except (ConnectionError, TimeoutError, RedisError) as e: - logger.error(f'Failed to connect to Redis during initialization: {e}') - logger.error('Server will not start without Redis connectivity') - raise RuntimeError(f'Redis connection test failed: {e}') from e + self.redis = self._create_redis_connection() + + # Test the connection immediately - fail fast if Redis is unreachable + try: + self.redis.ping() + logger.info('Connected to Redis with redis ping') + except (ConnectionError, TimeoutError, RedisError) as e: + logger.error(f'Failed to connect to Redis during initialization: {e}') + logger.error('Server will not start without Redis connectivity') + raise RuntimeError(f'Redis connection test failed: {e}') from e + else: + self.pool = None + self.redis = None + logger.info('Cache is disabled via WAVEFRONT_CACHE_ENABLED env variable') def _create_connection_pool( self, @@ -80,6 +89,9 @@ def _create_redis_connection(self) -> Redis: return Redis(connection_pool=self.pool) def _checking_redis_connection(self): + if not self.cache_enabled: + return False + try: self.redis.ping() return True @@ -100,6 +112,9 @@ def add( expiry: int = 3600, nx: bool = False, ) -> bool: + if not self.cache_enabled: + return False + try: logger.info(f'Adding key: {key} to cache with expiry: {expiry} seconds') return bool( @@ -115,6 +130,9 @@ def add( retry=retry_if_exception_type((RedisError, ConnectionError, TimeoutError)), ) def get_str(self, key: str, default: Any = None) -> Optional[str]: + if not self.cache_enabled: + return default + try: value = self.redis.get(f'{self.namespace}/{key}') return value if value is not None else default @@ -124,10 +142,16 @@ def get_str(self, key: str, default: Any = None) -> Optional[str]: raise def get_int(self, key: str, default: int = 0) -> int: + if not self.cache_enabled: + return default + value = self.get_str(key, default) return int(value) if value is not None else default def remove(self, key: str) -> bool: + if not self.cache_enabled: + return False + try: return bool(self.redis.delete(f'{self.namespace}/{key}')) except (RedisError, ConnectionError, TimeoutError) as e: @@ -136,6 +160,9 @@ def remove(self, key: str) -> bool: def invalidate_query(self, pattern: str) -> int: """Remove all keys matching the given pattern""" + if not self.cache_enabled: + return 0 + try: # Get all keys matching the pattern search_pattern = f'{self.namespace}/{pattern}' @@ -170,6 +197,9 @@ def publish(self, channel: str, message: str) -> int: Raises: RedisError: If publishing fails """ + if not self.cache_enabled: + return 0 + try: full_channel = f'{self.namespace}/{channel}' logger.info(f'Publishing message to channel: {full_channel}') @@ -203,6 +233,9 @@ def subscribe( if message['type'] == 'message': print(f"Received: {message['data']}") """ + if not self.cache_enabled: + return None + try: pubsub = self.redis.pubsub() @@ -225,6 +258,9 @@ def subscribe( raise def close(self): + if not self.cache_enabled: + return + try: self.pool.disconnect() logger.info('Redis connection pool closed successfully')