diff --git a/wavefront/server/apps/floware/floware/di/application_container.py b/wavefront/server/apps/floware/floware/di/application_container.py index 729d084b..60d2232c 100644 --- a/wavefront/server/apps/floware/floware/di/application_container.py +++ b/wavefront/server/apps/floware/floware/di/application_container.py @@ -16,9 +16,6 @@ class ApplicationContainer(containers.DeclarativeContainer): user_repository = providers.Dependency() task_repository = providers.Dependency() - insights_service = providers.Dependency() - pvo_repository = providers.Dependency() - notification_repository = providers.Dependency() notification_user_repository = providers.Dependency() config_repository = providers.Dependency() diff --git a/wavefront/server/apps/floware/floware/server.py b/wavefront/server/apps/floware/floware/server.py index b6827ca7..619d31f6 100644 --- a/wavefront/server/apps/floware/floware/server.py +++ b/wavefront/server/apps/floware/floware/server.py @@ -33,8 +33,6 @@ from fastapi.responses import JSONResponse from gold_module.controllers.router import gold_router from gold_module.gold_container import GoldContainer -from insights_module.controllers.router import insights_router -from insights_module.insights_container import InsightsContainer from knowledge_base_module.controllers.knowledge_base_controller import ( knowledge_base_router, @@ -115,12 +113,6 @@ user_module_container = UserContainer( db_client=db_repo_container.db_client, cache_manager=db_repo_container.cache_manager ) -insights_container = InsightsContainer( - notification_repository=db_repo_container.notification_repository, - cache_manager=db_repo_container.cache_manager, -) - - application_container = ApplicationContainer( db_client=db_repo_container.db_client, cloud_storage_manager=common_container.cloud_storage_manager, @@ -128,8 +120,6 @@ oauth_credential_repository=db_repo_container.oauth_credential_repository, user_repository=db_repo_container.user_repository, task_repository=db_repo_container.task_repository, - insights_service=insights_container.insights_service, - pvo_repository=insights_container.pvo_repository, notification_repository=db_repo_container.notification_repository, notification_user_repository=db_repo_container.notification_user_repository, config_repository=db_repo_container.config_repository, @@ -376,7 +366,6 @@ async def metrics(request: Request): app.include_router(notification_router, prefix='/floware') app.include_router(user_management_router, prefix='/floware') app.include_router(superset_controller, prefix='/floware') -app.include_router(insights_router, prefix='/floware') app.include_router(knowledge_base_router, prefix='/floware') app.include_router(kb_document_router, prefix='/floware') app.include_router(rag_retrieval_router, prefix='/floware') @@ -452,7 +441,6 @@ async def global_exception_handler(request: Request, exc: Exception): packages=[ 'auth_module.controllers', 'plugins_module.controllers', - 'insights_module.controllers', 'user_management_module.controllers', 'user_management_module.authorization', 'plugins_module.controllers', @@ -465,18 +453,12 @@ async def global_exception_handler(request: Request, exc: Exception): 'auth_module.controllers', 'user_management_module.authorization', 'user_management_module.controllers', - 'insights_module.controllers', 'plugins_module.services', 'plugins_module.controllers', 'llm_inference_config_module.controllers', ], ) -insights_container.wire( - modules=[__name__], - packages=['insights_module.controllers'], -) - gold_container.wire( modules=[__name__], packages=['gold_module.controllers'], @@ -488,7 +470,6 @@ async def global_exception_handler(request: Request, exc: Exception): 'auth_module.controllers', 'user_management_module.controllers', 'user_management_module.authorization', - 'insights_module.controllers', 'floware.controllers', 'knowledge_base_module.controllers', 'gold_module.controllers', diff --git a/wavefront/server/apps/floware/pyproject.toml b/wavefront/server/apps/floware/pyproject.toml index 9f00fa27..7c705295 100644 --- a/wavefront/server/apps/floware/pyproject.toml +++ b/wavefront/server/apps/floware/pyproject.toml @@ -12,7 +12,6 @@ dependencies = [ "common-module", "db-repo-module", "gold-module", - "insights-module", "knowledge-base-module", "user-management-module", "plugins-module", @@ -38,7 +37,6 @@ auth-module = { workspace = true } common-module = { workspace = true } db-repo-module = { workspace = true } gold-module = { workspace = true } -insights-module = { workspace = true } knowledge-base-module = { workspace = true } user-management-module = { workspace = true } plugins-module = {workspace = true} diff --git a/wavefront/server/docker/floware.Dockerfile b/wavefront/server/docker/floware.Dockerfile index a47141e5..15d1fe1c 100644 --- a/wavefront/server/docker/floware.Dockerfile +++ b/wavefront/server/docker/floware.Dockerfile @@ -17,7 +17,6 @@ COPY wavefront/server/modules/auth_module /app/modules/auth_module COPY wavefront/server/modules/common_module /app/modules/common_module COPY wavefront/server/modules/db_repo_module /app/modules/db_repo_module COPY wavefront/server/modules/gold_module /app/modules/gold_module -COPY wavefront/server/modules/insights_module /app/modules/insights_module COPY wavefront/server/modules/knowledge_base_module /app/modules/knowledge_base_module COPY wavefront/server/modules/user_management_module /app/modules/user_management_module COPY wavefront/server/modules/llm_inference_config_module /app/modules/llm_inference_config_module diff --git a/wavefront/server/modules/insights_module/insights_module/controllers/dynamic_query_controller.py b/wavefront/server/modules/insights_module/insights_module/controllers/dynamic_query_controller.py deleted file mode 100644 index 3227b6e7..00000000 --- a/wavefront/server/modules/insights_module/insights_module/controllers/dynamic_query_controller.py +++ /dev/null @@ -1,99 +0,0 @@ -from auth_module.auth_container import AuthContainer -from common_module.common_container import CommonContainer -from common_module.response_formatter import ResponseFormatter -from db_repo_module.models.resource import ResourceScope -from db_repo_module.models.role import Role -from db_repo_module.repositories.sql_alchemy_repository import SQLAlchemyRepository -from dependency_injector.wiring import inject -from dependency_injector.wiring import Provide -from fastapi import Depends -from fastapi import Query -from fastapi import Request -from fastapi import status -from fastapi.responses import JSONResponse -from fastapi.routing import APIRouter -from insights_module.insights_container import InsightsContainer -from insights_module.service.dynamic_query_service import DynamicQueryService -from insights_module.utils.helper import fetch_data_filters -from user_management_module.user_container import UserContainer -from user_management_module.services.user_service import UserService - - -dynamic_query_router = APIRouter() - - -@inject -async def check_admin( - role_id: str, - role_repositroy: SQLAlchemyRepository[Role] = Depends( - Provide(AuthContainer.role_repository) - ), -) -> bool: - role = await role_repositroy.find_one(id=role_id) - if not role: - return False - return role.name == 'admin' - - -@dynamic_query_router.get('/dynamic-queries/{query_id}') -@inject -async def execute_dynamic_query( - request: Request, - query_id: str, - response_formatter: ResponseFormatter = Depends( - Provide[CommonContainer.response_formatter] - ), - user_service: UserService = Depends(Provide[UserContainer.user_service]), - dynamic_query_service: DynamicQueryService = Depends( - Provide[InsightsContainer.dynamic_query_service] - ), - filter: str | None = Query(None, alias='$filter'), - start_date: str | None = None, - end_date: str | None = None, - limit: str | None = None, - offset: str | None = None, - force: str | None = None, -): - user_id = request.state.session.user_id - role_id = request.state.session.role_id - - if not dynamic_query_service.is_valid_query(query_id): - return JSONResponse( - status_code=status.HTTP_400_BAD_REQUEST, - content=response_formatter.buildErrorResponse( - 'Invalid query ID or query params' - ), - ) - - rls_filter_str = None - is_admin = await check_admin(role_id) - if not is_admin: - rls_filters = await user_service.get_user_resources( - user_id=user_id, scope=ResourceScope.DATA - ) - - if len(rls_filters) == 0: - return JSONResponse( - status_code=status.HTTP_403_FORBIDDEN, - content=response_formatter.buildErrorResponse( - 'Data access not set for non-admin user' - ), - ) - - rls_filters = fetch_data_filters(rls_filters) - rls_filter_str = f"{ ' $and '.join(rls_filters)}" - - all_query_params = dict(request.query_params) - query_results = await dynamic_query_service.execute_dynamic_query( - query_id=query_id, - params=all_query_params, - filter=filter, - rls_filter_str=rls_filter_str, - limit=limit, - offset=offset, - force=(force == 'true'), - ) - return JSONResponse( - status_code=status.HTTP_200_OK, - content=response_formatter.buildSuccessResponse(query_results), - ) diff --git a/wavefront/server/modules/insights_module/insights_module/controllers/pdo_controller.py b/wavefront/server/modules/insights_module/insights_module/controllers/pdo_controller.py deleted file mode 100644 index 3d24cfa4..00000000 --- a/wavefront/server/modules/insights_module/insights_module/controllers/pdo_controller.py +++ /dev/null @@ -1,223 +0,0 @@ -from auth_module.auth_container import AuthContainer -from common_module.common_container import CommonContainer -from common_module.response_formatter import ResponseFormatter -from common_module.utils.serializer import serialize_values -from db_repo_module.models.resource import Resource -from db_repo_module.models.resource import ResourceScope -from db_repo_module.models.role import Role -from db_repo_module.models.user import User -from db_repo_module.models.role_resource import RoleResource -from db_repo_module.models.user_role import UserRole -from db_repo_module.repositories.sql_alchemy_repository import SQLAlchemyRepository -from dependency_injector.wiring import inject -from dependency_injector.wiring import Provide -from fastapi import Depends -from fastapi import Query -from fastapi import Request -from fastapi import status -from fastapi.responses import JSONResponse -from fastapi.routing import APIRouter -from insights_module.insights_container import InsightsContainer -from insights_module.service.pdo_service import PdoService -from insights_module.utils.helper import fetch_data_filters -from user_management_module.user_container import UserContainer -from user_management_module.services.user_service import UserService -from sqlalchemy import Result -from sqlalchemy import select -from dataclasses import dataclass - -pdo_router = APIRouter() - - -@dataclass -class UpdateRequest: - data: dict - - -@inject -async def check_admin( - role_id: str, - role_repositroy: SQLAlchemyRepository[Role] = Depends( - Provide(AuthContainer.role_repository) - ), -) -> bool: - role = await role_repositroy.find_one(id=role_id) - if not role: - return False - return role.name == 'admin' - - -@pdo_router.get('/{resource_name}') -@inject -async def fetch_pvo_records( - request: Request, - resource_name: str, - response_formatter: ResponseFormatter = Depends( - Provide[CommonContainer.response_formatter] - ), - resource_repository: SQLAlchemyRepository[Resource] = Depends( - Provide[AuthContainer.resource_repository] - ), - user_service: UserService = Depends(Provide[UserContainer.user_service]), - cloud_service: PdoService = Depends(Provide[InsightsContainer.cloud_service]), - filter: str | None = Query(None, alias='$filter'), - limit: str | None = None, - offset: str | None = None, -): - user_id = request.state.session.user_id - role_id = request.state.session.role_id - - if resource_name not in [ - 'parsed_data_object', - 'rf_parsed_data_object', - 'rf_gold_data_object', - 'rf_gold_item_details', - ]: - return JSONResponse( - status_code=status.HTTP_400_BAD_REQUEST, - content=response_formatter.buildErrorResponse( - f'Invalid resource name: {resource_name}' - ), - ) - - if resource_name == 'parsed_data_object': - resource_name = 'rf_parsed_data_object' - - data_filters = [] - is_admin = await check_admin(role_id) - if not is_admin: - data_filters = await user_service.get_user_resources( - user_id=user_id, scope=ResourceScope.DATA - ) - - if len(data_filters) == 0: - return JSONResponse( - status_code=status.HTTP_403_FORBIDDEN, - content=response_formatter.buildErrorResponse( - 'Data access not set for non-admin user' - ), - ) - - data_filters = fetch_data_filters(data_filters) - if filter: - filter = f"{filter} $and ({' $and '.join(data_filters)})" - else: - filter = f"{ ' $and '.join(data_filters)}" - - pvo_records = cloud_service.fetch_upto_limit( - filter, limit, offset, table_name=resource_name - ) - return JSONResponse( - status_code=status.HTTP_200_OK, - content=response_formatter.buildSuccessResponse( - {'records': serialize_values(pvo_records)} - ), - ) - - -@pdo_router.patch('/{resource_name}/{id}') -@inject -async def patch_pvo_records( - request: Request, - resource_name: str, - id: str, - response_formatter: ResponseFormatter = Depends( - Provide[CommonContainer.response_formatter] - ), - resource_repository: SQLAlchemyRepository[Resource] = Depends( - Provide[AuthContainer.resource_repository] - ), - cloud_service: PdoService = Depends(Provide[InsightsContainer.cloud_service]), - payload: UpdateRequest = None, -): - user_id = request.state.session.user_id - role_id = request.state.session.role_id - - if resource_name not in [ - 'parsed_data_object', - 'rf_parsed_data_object', - 'rf_gold_data_object', - 'rf_gold_item_details', - ]: - return JSONResponse( - status_code=status.HTTP_400_BAD_REQUEST, - content=response_formatter.buildErrorResponse( - f'Invalid resource name: {resource_name}' - ), - ) - - if resource_name == 'parsed_data_object': - resource_name = 'rf_parsed_data_object' - - data_filters = [] - is_admin = await check_admin(role_id) - if not is_admin: - async with resource_repository.session() as session: - statement = ( - select(Resource) - .join(RoleResource, Resource.id == RoleResource.resource_id) - .join(Role, Role.id == RoleResource.role_id) - .join(UserRole, UserRole.role_id == Role.id) - .join(User, UserRole.user_id == User.id) - .where(UserRole.user_id == user_id) - .where(User.deleted.is_(False)) - .where(Resource.scope == ResourceScope.DATA) - ) - result: Result = await session.execute(statement) - data_filters = result.scalars().all() - - if len(data_filters) == 0: - return JSONResponse( - status_code=status.HTTP_403_FORBIDDEN, - content=response_formatter.buildErrorResponse( - 'Data access not set for non-admin user' - ), - ) - - data_filters = fetch_data_filters(data_filters) - - cloud_service.patch_record_by_id( - id=id, - table_name=resource_name, - rls_filter=data_filters, - update_data=payload.data, - ) - - return JSONResponse( - status_code=status.HTTP_200_OK, - content=response_formatter.buildSuccessResponse( - {'message': 'Successfully updated the records'} - ), - ) - - -@pdo_router.get('/parsed_data_object/audio') -@inject -async def fetch_audio( - resource_url: str, - response_formatter: ResponseFormatter = Depends( - Provide[CommonContainer.response_formatter] - ), - cloud_service: PdoService = Depends(Provide[InsightsContainer.cloud_service]), -): - audio_url = cloud_service.fetch_audio(resource_url) - return JSONResponse( - status_code=status.HTTP_200_OK, - content=response_formatter.buildSuccessResponse({'audio_url': audio_url}), - ) - - -@pdo_router.get('/parsed_data_object/transcript') -@inject -async def fetch_transcript( - resource_url: str, - response_formatter: ResponseFormatter = Depends( - Provide[CommonContainer.response_formatter] - ), - cloud_service: PdoService = Depends(Provide[InsightsContainer.cloud_service]), -): - transcripts = cloud_service.fetch_transcript(resource_url) - return JSONResponse( - status_code=status.HTTP_200_OK, - content=response_formatter.buildSuccessResponse(transcripts), - ) diff --git a/wavefront/server/modules/insights_module/insights_module/controllers/router.py b/wavefront/server/modules/insights_module/insights_module/controllers/router.py deleted file mode 100644 index 83608a11..00000000 --- a/wavefront/server/modules/insights_module/insights_module/controllers/router.py +++ /dev/null @@ -1,7 +0,0 @@ -from fastapi.routing import APIRouter -from insights_module.controllers.dynamic_query_controller import dynamic_query_router -from insights_module.controllers.pdo_controller import pdo_router - -insights_router = APIRouter() -insights_router.include_router(dynamic_query_router, prefix='/v1/insights') -insights_router.include_router(pdo_router, prefix='/v1/insights') diff --git a/wavefront/server/modules/insights_module/insights_module/db/bigquery_connector.py b/wavefront/server/modules/insights_module/insights_module/db/bigquery_connector.py deleted file mode 100644 index 23f94403..00000000 --- a/wavefront/server/modules/insights_module/insights_module/db/bigquery_connector.py +++ /dev/null @@ -1,85 +0,0 @@ -from dataclasses import dataclass -import datetime - -from common_module.log.logger import logger -from google.cloud import bigquery - - -@dataclass -class BigQueryConfig: - project_id: str - dataset_id: str - location: str = 'asia-south1' - - -class BigQueryConnector: - def __init__(self, bq_config: BigQueryConfig): - self.config = bq_config - self.client = self.__get_client() - - def __get_client(self): - try: - bq_client = bigquery.Client( - project=self.config.project_id, location='asia-south1' - ) - return bq_client - except Exception as e: - logger.error(f'Connection error: {str(e)}') - raise e - - def execute_query(self, query: str, parameters: dict = None): - try: - logger.debug(f'Executing query: {query}') - logger.debug(f'Parameters: {parameters}') - - job_config = bigquery.QueryJobConfig() - if parameters: - query_params = [] - for key, value in parameters.items(): - if isinstance(value, str): - query_params.append( - bigquery.ScalarQueryParameter(key, 'STRING', value) - ) - elif isinstance(value, int): - query_params.append( - bigquery.ScalarQueryParameter(key, 'INT64', value) - ) - elif isinstance(value, float): - query_params.append( - bigquery.ScalarQueryParameter(key, 'FLOAT64', value) - ) - elif isinstance(value, bool): - query_params.append( - bigquery.ScalarQueryParameter(key, 'BOOL', value) - ) - elif isinstance(value, datetime.datetime): - query_params.append( - bigquery.ScalarQueryParameter(key, 'TIMESTAMP', value) - ) - else: - query_params.append( - bigquery.ScalarQueryParameter(key, 'STRING', str(value)) - ) - - job_config.query_parameters = query_params - - query_job = self.client.query(query, job_config=job_config) - result = query_job.result() - - # Convert RowIterator to list immediately to avoid iterator exhaustion - rows = list(result) - column_names = [field.name for field in result.schema] - - if query.strip().upper().startswith('INSERT'): - logger.info( - f'Insert completed. Affected rows: {query_job.num_dml_affected_rows}' - ) - return rows, column_names - - except Exception as e: - logger.error( - f'Query execution failed: {str(e)}\n' - f'Query: {query}\n' - f'Parameters: {parameters}' - ) - raise e diff --git a/wavefront/server/modules/insights_module/insights_module/db/redshift_connector.py b/wavefront/server/modules/insights_module/insights_module/db/redshift_connector.py deleted file mode 100644 index 9d14cd1f..00000000 --- a/wavefront/server/modules/insights_module/insights_module/db/redshift_connector.py +++ /dev/null @@ -1,151 +0,0 @@ -from contextlib import contextmanager -from dataclasses import dataclass -from functools import wraps -import time - -from common_module.log.logger import logger -import redshift_connector -from redshift_connector.core import Connection - - -@dataclass -class RedshiftConfig: - username: str - password: str - host: str - port: str - db_name: str - read_only: bool = False - - -def retry_on_connection_error(max_retries=3, delay=1, timeout=30): - def decorator(func): - @wraps(func) - def wrapper(self: 'RedshiftConnector', *args, **kwargs): - retries = 0 - last_exception = None - - while retries < max_retries: - try: - kwargs.pop('connection', None) - with self.get_connection(timeout) as conn: - return func(self, *args, **kwargs, connection=conn) - except ( - redshift_connector.Error, - redshift_connector.OperationalError, - ) as e: - last_exception = e - retries += 1 - logger.warning( - f'Database connection error: {str(e)}. ' - f'Attempt {retries} of {max_retries}' - ) - - if retries == max_retries: - logger.error( - f'Max retries reached. Last error: {str(last_exception)}' - ) - raise last_exception - - time.sleep(delay * retries) # Exponential backoff - return None - - return wrapper - - return decorator - - -class RedshiftConnector: - def __init__(self, redshift_config: RedshiftConfig): - self.config = redshift_config - - @contextmanager - def get_connection(self, timeout=300): - connection = None - try: - connection: Connection = redshift_connector.connect( - host=self.config.host, - port=int(self.config.port), - database=self.config.db_name, - user=self.config.username, - password=self.config.password, - timeout=timeout, - ssl=True, - tcp_keepalive=True, - ) - redshift_connector.paramstyle = 'named' - - if self.config.read_only: - logger.debug('Making read only connection to redshfit') - cursor = connection.cursor() - cursor.execute('SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY') - cursor.close() - - yield connection - except Exception as e: - logger.error(f'Connection error: {str(e)}') - raise e - finally: - self.__close_connection(connection=connection) - - def __close_connection(self, connection: Connection): - try: - if connection: - connection.close() - except Exception as e: - logger.error(f'Connection closing error: {str(e)}') - raise e - - @retry_on_connection_error() - def execute_query( - self, query: str, parameters: dict | None = None, connection: Connection = None - ): - try: - if self.config.read_only: - query_upper = query.strip().upper() - write_operations = ( - 'INSERT', - 'UPDATE', - 'DELETE', - 'CREATE', - 'DROP', - 'ALTER', - 'TRUNCATE', - ) - if any(query_upper.startswith(op) for op in write_operations): - raise ValueError( - 'Write operations are not allowed in read-only mode' - ) - - logger.debug(f'Executing query: {query}') - logger.debug(f'Parameters: {parameters}') - - cursor = connection.cursor() - - redshift_connector.paramstyle = 'named' - if parameters: - cursor.execute(query, parameters) - else: - cursor.execute(query) - - if query.strip().upper().startswith('INSERT'): - logger.info(f'Insert completed. Rowcount: {cursor.rowcount}') - return cursor.rowcount - - try: - results = cursor.fetchall() - column_names = [desc[0] for desc in cursor.description] - return results, column_names - except redshift_connector.ProgrammingError: - return cursor.rowcount - finally: - cursor.close() - - except Exception as e: - logger.error( - f'Query execution failed: {str(e)}\n' - f'Query: {query}\n' - f'Parameters: {parameters}' - ) - - raise e diff --git a/wavefront/server/modules/insights_module/insights_module/insights_container.py b/wavefront/server/modules/insights_module/insights_module/insights_container.py deleted file mode 100644 index 211ded47..00000000 --- a/wavefront/server/modules/insights_module/insights_module/insights_container.py +++ /dev/null @@ -1,84 +0,0 @@ -import os - -from dependency_injector import containers -from dependency_injector import providers -from insights_module.db.bigquery_connector import BigQueryConfig -from insights_module.db.bigquery_connector import BigQueryConnector -from insights_module.db.redshift_connector import RedshiftConfig -from insights_module.db.redshift_connector import RedshiftConnector -from insights_module.repository.pvo_repository import PVORepository -from insights_module.service.dynamic_query_service import DynamicQueryService -from insights_module.service.insights_service import InsightsService -from insights_module.service.pdo_service import AWSServices -from insights_module.service.pdo_service import GCPServices -from insights_module.service.usage_metric_service import UsageMetricService -from flo_cloud.cloud_storage import CloudStorageManager - - -class InsightsContainer(containers.DeclarativeContainer): - config = providers.Configuration(ini_files=['./config.ini']) - - notification_repository = providers.Dependency() - - cache_manager = providers.Dependency() - - cloud_provider = os.environ.get('CLOUD_PROVIDER', 'aws') - - if cloud_provider == 'aws': - redshift_config = providers.Factory( - RedshiftConfig, - username=config.redshift.username, - password=config.redshift.password, - host=config.redshift.host, - port=config.redshift.port, - db_name=config.redshift.db_name, - ) - connector = providers.Singleton(RedshiftConnector, redshift_config) - elif cloud_provider == 'gcp': - bq_config = providers.Factory( - BigQueryConfig, - project_id=config.bigquery.project_id, - dataset_id=config.bigquery.dataset_id, - ) - connector = providers.Singleton(BigQueryConnector, bq_config) - - pvo_repository = providers.Singleton( - PVORepository, - connector, - dataset_id=config.bigquery.dataset_id, - ) - - insights_service = providers.Singleton( - InsightsService, - repository=pvo_repository, - today_as_max_from_db=config.insights.today_as_max_from_db, - ) - usage_metric_service = providers.Singleton( - UsageMetricService, - repository=pvo_repository, - cloud_provider=cloud_provider, - ) - - if cloud_provider == 'aws': - cloud_service = providers.Singleton( - AWSServices, - insights_service=insights_service, - transcript_bucket_name=config.aws.transcript_bucket_name, - audio_bucket_name=config.aws.audio_bucket_name, - ) - elif cloud_provider == 'gcp': - cloud_service = providers.Singleton( - GCPServices, - insights_service=insights_service, - transcript_bucket_name=config.gcp.transcript_bucket_name, - audio_bucket_name=config.gcp.audio_bucket_name, - ) - - colud_manager = providers.Singleton( - CloudStorageManager, provider=config.cloud_config.cloud_provider - ) - dynamic_query_service = providers.Singleton( - DynamicQueryService, - pvo_repository=pvo_repository, - cache_manager=cache_manager, - ) diff --git a/wavefront/server/modules/insights_module/insights_module/models/dymanic_query.py b/wavefront/server/modules/insights_module/insights_module/models/dymanic_query.py deleted file mode 100644 index c56156d0..00000000 --- a/wavefront/server/modules/insights_module/insights_module/models/dymanic_query.py +++ /dev/null @@ -1,30 +0,0 @@ -from dataclasses import dataclass -from dataclasses import field -from typing import List - - -@dataclass -class QueryParameter: - name: str - type: str - - -@dataclass -class QueryParameterValue: - name: str - value: str - - -@dataclass -class Query: - id: str - description: str - query: str - parameters: List[QueryParameter] = field(default_factory=list) - - -@dataclass -class DynamicQuery: - id: str - name: str - queries: List[Query] diff --git a/wavefront/server/modules/insights_module/insights_module/models/insights_signal.py b/wavefront/server/modules/insights_module/insights_module/models/insights_signal.py deleted file mode 100644 index 9997442e..00000000 --- a/wavefront/server/modules/insights_module/insights_module/models/insights_signal.py +++ /dev/null @@ -1,122 +0,0 @@ -from dataclasses import asdict -from dataclasses import dataclass -from datetime import date -from enum import Enum -import math -from typing import Dict, List - -from insights_module.models.insights_signal_query import Threshold - - -class AlertType(str, Enum): - L7D = 'L7D' - L30D = 'L30D' - goal_line = 'goal_line' - - @staticmethod - def resolve(type: str): - if type == 'L7D': - return AlertType.L7D - if type == 'L30D': - return AlertType.L30D - if type == 'L90D': - return AlertType.L30D - if type == 'goal_line': - return AlertType.goal_line - else: - return ValueError(f'Unknown alert type: {type}') - - -def serialize_values(value): - if isinstance(value, date): - return value.isoformat() # Convert date to string ('YYYY-MM-DD') - elif isinstance(value, Enum): - return value.value # Convert Enum to its string representation - elif isinstance(value, float) and (math.isnan(value) or math.isinf(value)): - return None # Handle NaN, inf, -inf safely - elif isinstance(value, list): - return [serialize_values(v) for v in value] # Recursively handle lists - elif isinstance(value, dict): - return { - k: serialize_values(v) for k, v in value.items() - } # Recursively handle dicts - return value # Return other types as-is - - -@dataclass -class Alert: - metric: str - threshold: float - previous_value: float - current_value: float - diff_value: float - type: AlertType - - def to_dict(self): - data = asdict(self) - data['type'] = self.type.value - return data - - -@dataclass -class Metric: - metric: str - name: str - value: float - - -@dataclass -class ActionableAlerts: - id: str - title: str - type: str - name: str - description: str - alerts: List[Alert] - - def has_alerts(self): - return len(self.alerts) > 0 - - -@dataclass -class DataPoints: - window_type: str - old_window: Dict[str, List] - new_window: Dict[str, List] - - -@dataclass -class DetailedInsights: - metrices: List[Metric] - data_points: DataPoints - goal_lines: List[Threshold] - - def to_dict(self): - return { - 'metrices': [asdict(m) for m in self.metrices], - 'data_points': serialize_values(asdict(self.data_points)), - 'goal_lines': [asdict(g) for g in self.goal_lines], - } - - -@dataclass -class ActionableInsights: - id: str - title: str - type: str - name: str - description: str - alerts: List[Alert] - details: DetailedInsights - - @staticmethod - def to_actionable(alerts: ActionableAlerts, insights: DetailedInsights): - return ActionableInsights( - id=alerts.id, - name=alerts.name, - title=alerts.title, - type=alerts.type, - description=alerts.description, - alerts=alerts.alerts, - details=insights, - ) diff --git a/wavefront/server/modules/insights_module/insights_module/models/insights_signal_query.py b/wavefront/server/modules/insights_module/insights_module/models/insights_signal_query.py deleted file mode 100644 index a5e078ae..00000000 --- a/wavefront/server/modules/insights_module/insights_module/models/insights_signal_query.py +++ /dev/null @@ -1,96 +0,0 @@ -from dataclasses import dataclass -from dataclasses import field -from typing import List - -import dacite -import yaml - - -@dataclass -class Threshold: - metric: str - threshold: float - - -@dataclass -class Periodicity: - period: str - alerts: List[Threshold] - - -@dataclass -class Projection: - sql: str - metric: str - name: str - - -@dataclass -class Projections: - parent: List[Projection] - children: List[Projection] = field(default_factory=list) - - -@dataclass -class Variable: - name: str - - -@dataclass -class Query: - sql: str - variables: List[Variable] = field(default_factory=list) - - -@dataclass -class Plot: - name: str - metrices: List[Projection] - - -@dataclass -class SignalQuery: - id: str - name: str - title: str - description: str - projections: Projections - query: Query - version: int - type: str - periodicity: List[Periodicity] - plots: List[Plot] - goal_lines: List[Threshold] = field(default_factory=list) - - -def load_yaml_to_signal(signals: str) -> SignalQuery: - yaml_data = [] - for signal in signals: - yaml_data.append( - dacite.from_dict( - data_class=SignalQuery, - data={ - 'id': signal.id, - 'name': signal.name, - 'title': signal.title, - 'description': signal.description, - 'projections': signal.projections, - 'query': signal.query, - 'version': signal.version, - 'type': signal.type, - 'periodicity': signal.periodicity, - 'plots': signal.plots, - 'goal_lines': signal.goal_lines, - }, - ) - ) - return yaml_data - - -def load_yaml_from_str(yaml_str: str) -> SignalQuery: - yml_dict = yaml.safe_load(yaml_str) - return dacite.from_dict(data_class=SignalQuery, data=yml_dict) - - -def load_from_dict(data: dict) -> SignalQuery: - return dacite.from_dict(data_class=SignalQuery, data=data) diff --git a/wavefront/server/modules/insights_module/insights_module/models/lead_signal_query.py b/wavefront/server/modules/insights_module/insights_module/models/lead_signal_query.py deleted file mode 100644 index a3adadff..00000000 --- a/wavefront/server/modules/insights_module/insights_module/models/lead_signal_query.py +++ /dev/null @@ -1,17 +0,0 @@ -from dataclasses import dataclass -from datetime import datetime -from typing import Dict, List - - -@dataclass -class LeadQuery: - product_category: str - lead_type: str - query: str - periodicity: List[Dict[str, str]] - - -@dataclass -class QueryWindow: - start: datetime - end: datetime diff --git a/wavefront/server/modules/insights_module/insights_module/models/leads_aggreegate.py b/wavefront/server/modules/insights_module/insights_module/models/leads_aggreegate.py deleted file mode 100644 index 3fb1570d..00000000 --- a/wavefront/server/modules/insights_module/insights_module/models/leads_aggreegate.py +++ /dev/null @@ -1,12 +0,0 @@ -class LeadAggregate: - def __init__(self, product_name, type, count): - self.product_name = product_name - self.type = type - self.count = count - - def to_dict(self): - return { - 'product_name': self.product_name, - 'type': self.type, - 'count': self.count, - } diff --git a/wavefront/server/modules/insights_module/insights_module/repository/pvo_repository.py b/wavefront/server/modules/insights_module/insights_module/repository/pvo_repository.py deleted file mode 100644 index 1243f23f..00000000 --- a/wavefront/server/modules/insights_module/insights_module/repository/pvo_repository.py +++ /dev/null @@ -1,231 +0,0 @@ -import os -from typing import Dict, List - -from common_module.log.logger import logger -import dacite -from insights_module.db.redshift_connector import RedshiftConnector -from insights_module.models.dymanic_query import DynamicQuery -from insights_module.models.lead_signal_query import LeadQuery -from insights_module.models.insights_signal import serialize_values -import yaml - -# Define default project paths -DEFAULT_PROJECT_PATH = 'apps/floware/floware' - - -class PVORepository: - def __init__( - self, - redshift_connector: RedshiftConnector, - dataset_id: str = None, - ): - self.dataset_id = dataset_id - self.connector = redshift_connector - self.project_path = os.getenv('PROJECT_PATH', DEFAULT_PROJECT_PATH) - - def __load_yaml_files(self, directory: str, data_class: type) -> List: - """Load and parse YAML files from directory into list of data_class objects""" - results = [] - for filename in os.listdir(directory): - if filename.endswith(('.yaml', '.yml')): - file_path = os.path.join(directory, filename) - with open(file_path, 'r') as f: - yaml_data = yaml.safe_load(f) - if isinstance(yaml_data, list): - for item in yaml_data: - obj = dacite.from_dict(data_class=data_class, data=item) - results.append(obj) - else: - obj = dacite.from_dict(data_class=data_class, data=yaml_data) - results.append(obj) - return results - - def __get_asset_directory(self, asset_type: str, env_var: str) -> str: - """Get normalized directory path for asset files""" - root_dir = os.path.dirname( - os.path.dirname( - os.path.dirname( - os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - ) - ) - ) - base_dir = os.path.join(root_dir, DEFAULT_PROJECT_PATH) - directory_path = os.getenv(env_var, '') # Provide empty string as default - directory = os.path.join(base_dir, f'assets/{asset_type}', directory_path) - return os.path.normpath(directory) - - def fetch_signal_from_yaml(self) -> List[LeadQuery]: - directory = self.__get_asset_directory('leads', 'LEADS_DIR') - return self.__load_yaml_files(directory, LeadQuery) - - def fetch_dynamic_queries(self) -> List[DynamicQuery]: - directory = self.__get_asset_directory('dynamic_queries', 'DYNAMIC_QUERIES_DIR') - return self.__load_yaml_files(directory, DynamicQuery) - - def fetch_pvo_record( - self, - odata_condition: str, - params: Dict, - limit: str | None = None, - offset: str | None = None, - table_name: str = 'rf_parsed_data_object', - ) -> Dict[str, List]: - table_name = self.__resolve_table_name(self.dataset_id, table_name=table_name) - template_to_run = f'SELECT * FROM {table_name}' - if odata_condition: - template_to_run += f' WHERE {odata_condition}' - - template_to_run += ' ORDER BY start_time DESC' - - if limit: - template_to_run += f' LIMIT {limit}' - else: - template_to_run += ' LIMIT 1' - if offset: - template_to_run += f' OFFSET {offset}' - - results, column_names = self.connector.execute_query( - template_to_run, parameters=params - ) - return self.__format_results_json(results, column_names) - - def update_pvo_record( - self, - id: str, - update_data: Dict[str, str], - table_name: str, - odata_condition: str | None = None, - rls_params: Dict[str, str] = {}, - ) -> None: - odata_condition = odata_condition or 'TRUE' - cloud_provider = os.environ.get('CLOUD_PROVIDER', 'aws') - param_symbol = '@' if cloud_provider == 'gcp' else ':' - table_name = self.__resolve_table_name(self.dataset_id, table_name=table_name) - set_clause = ', '.join([f'{key} = @{key}' for key in update_data.keys()]) - query_to_run = f'UPDATE {table_name} SET {set_clause} WHERE id = {param_symbol}id AND {odata_condition}' - params = {**update_data, 'id': id, **(rls_params if rls_params else {})} - logger.info(f'Running query: {query_to_run} with params: {params}') - self.connector.execute_query(query_to_run, parameters=params) - - def fetch_insights( - self, query: str, projection: str, start_date: str, end_date: str - ) -> Dict[str, List]: - template_to_run = f'SELECT {projection} FROM ({query}) LIMIT 100' - query_to_run = template_to_run.replace('{{start_date}}', start_date).replace( - '{{end_date}}', end_date - ) - logger.debug(f'Running query: {query_to_run}') - results, column_names = self.connector.execute_query(query_to_run) - - return self.__format_results(results, column_names) - - def get_max_record_date(self) -> str | None: - table_name = self.__resolve_table_name(self.dataset_id) - query_to_run = f'SELECT MAX(start_time) as max_start_time FROM {table_name}' - logger.debug(f'Running query: {query_to_run}') - results, column_names = self.connector.execute_query(query_to_run) - formatted_outputs = self.__format_results(results, column_names) - max_date = None - if ( - 'max_start_time' in formatted_outputs - and len(formatted_outputs['max_start_time']) > 0 - ): - max_date = formatted_outputs['max_start_time'][0] - return max_date - - def fetch_raw_values( - self, query: str, projection: str, start_date: str, end_date: str - ) -> Dict[str, List]: - template_to_run = ( - f'SELECT start_date, {projection} FROM ({query}) GROUP BY start_date' - ) - query_to_run = template_to_run.replace('{{start_date}}', start_date).replace( - '{{end_date}}', end_date - ) - logger.debug(f'Running query: {query_to_run}') - results, column_names = self.connector.execute_query(query_to_run) - - return self.__format_results(results, column_names) - - def execute_query( - self, query: str, start_date: str, end_date: str - ) -> Dict[str, List]: - query_to_run = query.replace('{{start_date}}', start_date).replace( - '{{end_date}}', end_date - ) - logger.debug(f'Running query: {query_to_run}') - results, column_names = self.connector.execute_query(query_to_run) - - return self.__format_results(results, column_names) - - def execute_dynamic_query( - self, - query: str, - odata_filters: str, - odata_data_filter: str, - params: dict | None = None, - limit: str | None = None, - offset: str | None = None, - ) -> Dict[str, List]: - logger.debug(f'Running query: {query}') - - query = query.replace( - '{{rls}}', f'{odata_data_filter}' if odata_data_filter else 'TRUE' - ) - query = query.replace( - '{{filters}}', f'{odata_filters}' if odata_filters else 'TRUE' - ) - if limit: - query += f' LIMIT {limit}' - if offset: - query += f' OFFSET {offset}' - - results, column_names = self.connector.execute_query(query, parameters=params) - - return self.__format_results_json(results, column_names) - - def __format_results(self, results, column_names): - if not results: - return {col: [] for col in column_names} - - return {col: [row[i] for row in results] for i, col in enumerate(column_names)} - - def __format_results_json(self, results, column_names): - if not results: - return [] - - json_data = [] - for res in results: - result = {} - for i, col in enumerate(column_names): - result[col] = res[i] - json_data.append(result) - serialized_json = serialize_values(json_data) - return serialized_json - - def __resolve_table_name( - self, dataset_id: str = '', table_name='rf_parsed_data_object' - ): - full_table_name = table_name - if dataset_id: - full_table_name = f'{dataset_id}.{table_name}' - - return full_table_name - - def fetch_usage_metrics(self, start_time: str, end_time: str, cloud_provider: str): - table_name = self.__resolve_table_name(self.dataset_id) - dynamic_var_char = '@' if cloud_provider == 'gcp' else ':' - query = f""" - SELECT - COUNT(DISTINCT CASE WHEN rf_transcription_status = 'success' THEN conversation_id END) AS transcription_success, - COUNT(DISTINCT CASE WHEN rf_transcription_status = 'empty' THEN conversation_id END) AS transcription_empty, - COUNT(DISTINCT CASE WHEN rf_insights_status = 'success' THEN conversation_id END) AS insights_success, - COUNT(DISTINCT CASE WHEN rf_transcription_status = 'failure' OR rf_transcription_status IS NULL THEN conversation_id END) AS transcription_failure, - COUNT(DISTINCT CASE WHEN rf_insights_status = 'failure' OR rf_insights_status IS NULL THEN conversation_id END) AS insights_failure, - SUM(CASE WHEN rf_insights_status = 'success' THEN total_duration END) as total_call_duration - FROM {table_name} - WHERE created_at BETWEEN {dynamic_var_char}start_time AND {dynamic_var_char}end_time; - """ - params = {'start_time': start_time, 'end_time': end_time} - results, column_names = self.connector.execute_query(query, params) - return self.__format_results(results, column_names) diff --git a/wavefront/server/modules/insights_module/insights_module/service/dynamic_query_service.py b/wavefront/server/modules/insights_module/insights_module/service/dynamic_query_service.py deleted file mode 100644 index 9f7b227f..00000000 --- a/wavefront/server/modules/insights_module/insights_module/service/dynamic_query_service.py +++ /dev/null @@ -1,168 +0,0 @@ -import asyncio -import json -import hashlib - -from common_module.log.logger import logger -from common_module.utils.odata_parser import prepare_odata_filter -from db_repo_module.cache.cache_manager import CacheManager -from insights_module.models.dymanic_query import DynamicQuery -from insights_module.models.dymanic_query import Query -from insights_module.repository.pvo_repository import PVORepository - - -class DynamicQueryService: - def __init__( - self, - pvo_repository: PVORepository, - cache_manager: CacheManager, - ): - self.pvo_repository = pvo_repository - self.cache_manager = cache_manager - self.dynamic_query_map: dict[str, DynamicQuery] = dict() - self.__load_dynamic_queries() - - def __load_dynamic_queries(self): - all_dynamic_queries = self.pvo_repository.fetch_dynamic_queries() - for dynamic_query in all_dynamic_queries: - self.dynamic_query_map[dynamic_query.id] = dynamic_query - - def is_valid_query(self, query_id: str) -> bool: - return query_id in self.dynamic_query_map - - async def execute_dynamic_query( - self, - query_id: str, - filter: str | None = None, - rls_filter_str: str | None = None, - params: dict[str, str] | None = None, - limit: str = None, - offset: str = None, - force: bool = False, - ): - query = self.dynamic_query_map[query_id] - result_by_query = dict() - - logger.info(f'Executing dynamic query: {query_id}') - - # Create tasks for parallel execution - tasks = [] - for query in query.queries: - task = asyncio.create_task( - self.__execute_single_query( - query, - filter, - rls_filter_str, - params, - limit=limit, - offset=offset, - force=force, - ) - ) - tasks.append((query.id, task)) - - # Wait for all tasks to complete - for query_id, task in tasks: - result_by_query[query_id] = await task - - return result_by_query - - def __generate_cache_key( - self, - query: Query, - filter: str, - rls_filter_str: str, - params: dict, - limit: str = None, - offset: str = None, - ) -> str: - """Generate a unique cache key based on query parameters.""" - key_dict = { - 'query_id': query.id, - 'filter': filter, - 'rls_filter': rls_filter_str, - 'params': sorted(params.items()), - 'limit': limit, - 'offset': offset, - } - key_json = json.dumps(key_dict, sort_keys=True, separators=(',', ':')) - hash_digest = hashlib.md5(key_json.encode()).hexdigest() - return f'dynamic_query:{hash_digest}' - - async def __execute_single_query( - self, - query: Query, - filter: str | None = None, - rls_filter_str: str | None = None, - params: dict[str, dict[str, str]] | None = None, - limit: str = None, - offset: str = None, - force: bool = False, - ) -> dict: - try: - params_to_execute = dict() - odata_filter, odata_params = prepare_odata_filter(filter) - odata_data_filter, odata_data_params = prepare_odata_filter( - rls_filter_str, prefix='rls_' - ) - incoming_param_value: dict[str, str] = params - for qp in query.parameters: - if qp.name not in incoming_param_value: - raise ValueError( - f'Missing parameter: {qp.name} for query {query.id}' - ) - params_to_execute[qp.name] = incoming_param_value[qp.name] - - # Generate cache key - cache_key = self.__generate_cache_key( - query, filter, rls_filter_str, params_to_execute, limit, offset - ) - - # Try to get from cache first - cached_result = self.cache_manager.get_str(cache_key) - if cached_result and not force: - logger.info(f'Cache hit for query {query.id}') - return { - 'status': 'success', - 'error': None, - 'result': json.loads(cached_result), - } - - logger.info( - f'Executing query {query.id} with parameters: {params_to_execute}' - ) - - # TODO: If rls and filter have same columns mentioned, the behavior can be unpredictable. - if odata_params: - params_to_execute.update(odata_params) - if odata_data_params: - params_to_execute.update(odata_data_params) - - # Run the query in a thread pool since it's a blocking operation - result = await asyncio.to_thread( - self.pvo_repository.execute_dynamic_query, - query.query, - odata_filter, - odata_data_filter, - params_to_execute, - limit=limit, - offset=offset, - ) - - # Cache the result for 1 hour (3600 seconds) - self.cache_manager.add(cache_key, json.dumps(result), expiry=60 * 2) - - return { - 'status': 'success', - 'error': None, - 'description': query.description, - 'result': result, - } - except Exception as e: - logger.exception(e) - logger.error(f'Error executing query {query.id}: {str(e)}') - return { - 'status': 'error', - 'description': None, - 'error': 'Unexpected error while executing query', - 'result': [], - } diff --git a/wavefront/server/modules/insights_module/insights_module/service/insights_service.py b/wavefront/server/modules/insights_module/insights_module/service/insights_service.py deleted file mode 100644 index 322a4699..00000000 --- a/wavefront/server/modules/insights_module/insights_module/service/insights_service.py +++ /dev/null @@ -1,353 +0,0 @@ -from dataclasses import dataclass -from datetime import datetime -from datetime import timedelta -from typing import Dict, List, Any - -from common_module.log.logger import logger -from insights_module.models.insights_signal import ActionableAlerts -from insights_module.models.insights_signal import Alert -from insights_module.models.insights_signal import AlertType -from insights_module.models.insights_signal import DataPoints -from insights_module.models.insights_signal import DetailedInsights -from insights_module.models.insights_signal import Metric -from insights_module.models.insights_signal_query import Periodicity -from insights_module.models.insights_signal_query import Projection -from insights_module.models.insights_signal_query import Projections -from insights_module.models.insights_signal_query import SignalQuery -from insights_module.models.insights_signal_query import Threshold -from insights_module.repository.pvo_repository import PVORepository - - -@dataclass -class QueryWindow: - name: str - start: datetime - end: datetime - alerts: List[Threshold] - - -class InsightsService: - def __init__( - self, - repository: PVORepository, - today_as_max_from_db: str = 'false', - ): - self.repository = repository - self.today_as_max_from_db = today_as_max_from_db == 'true' - self.current_date = datetime.today() - timedelta(days=1) - - if self.today_as_max_from_db: - max_date = self.repository.get_max_record_date() - logger.info(f'Configuring current date as: {max_date}') - self.current_date = max_date if max_date is not None else self.current_date - - def __get_windows(self, name: str, number_of_days: int, alerts: List[Threshold]): - today = self.current_date - logger.info(f'Max date used for running worker: {today}') - return ( - QueryWindow( - name=name, - start=(today - timedelta(days=number_of_days * 2)).strftime('%Y-%m-%d'), - end=(today - timedelta(days=number_of_days)).strftime('%Y-%m-%d'), - alerts=alerts, - ), - QueryWindow( - name=name, - start=(today - timedelta(days=number_of_days)).strftime('%Y-%m-%d'), - end=today.strftime('%Y-%m-%d'), - alerts=alerts, - ), - ) - - def __fetch_periods(self, peroids: list[Periodicity]): - diff_window = [] - for period in peroids: - if period.period.startswith('L') and period.period.endswith('D'): - period_day_count = int( - period.period.removeprefix('L').removesuffix('D') - ) - diff_window.append( - self.__get_windows(period.period, period_day_count, period.alerts) - ) - else: - logger.warning(f'Unknown periodicity found: {period.period}') - return diff_window - - def __fetch_parent_projections(self, projections: Projections): - projections: list[Projection] = projections.parent - projection_queries = [ - f'{projection.sql} as {projection.metric}' for projection in projections - ] - - return ','.join(projection_queries), projections - - def __fetch_child_projections(self, projections: Projections): - projections: list[Projection] = projections.children - projection_queries = [ - f'{projection.sql} as {projection.metric}' for projection in projections - ] - return ','.join(projection_queries), projections - - def __execute_periodic_insights( - self, query, projection_query, old_window: QueryWindow, new_window: QueryWindow - ): - old_window_value: dict = self.repository.fetch_insights( - query, - projection_query, - start_date=old_window.start, - end_date=old_window.end, - ) - - new_window_value: dict = self.repository.fetch_insights( - query, - projection_query, - start_date=new_window.start, - end_date=new_window.end, - ) - return old_window_value, new_window_value - - def __periodic_alerts( - self, - period_name: str, - old_window_value: float, - new_window_value: float, - possible_alerts: List[Threshold], - ) -> List[Alert]: - alerts_to_notify: List[Alert] = [] - for alert in possible_alerts: - metric = alert.metric - threshold = alert.threshold - - old_values = old_window_value.get(metric, []) - new_values = new_window_value.get(metric, []) - - if ( - old_values is None - or new_values is None - or len(old_values) == 0 - or len(new_values) == 0 - ): - logger.debug( - f'No value found for metric: {metric}, skipping alert creation' - ) - continue - - if new_values[0] is None or old_values[0] is None: - logger.debug( - f'Possibly missing data, old {old_values[0]} and new {new_values[0]}' - ) - continue - - diff_value = new_values[0] - old_values[0] - diff_percentage = diff_value / old_values[0] if old_values[0] != 0 else None - if self.__check_threshold(threshold, diff_percentage): - logger.info( - f'metric: {metric}, threshould: {threshold}, diff_percentage: {diff_percentage}, type: {period_name}' - ) - alerts_to_notify.append( - Alert( - metric=metric, - threshold=threshold, - diff_value=diff_percentage, - previous_value=old_values[0], - current_value=new_values[0], - type=AlertType.resolve(period_name), - ) - ) - return alerts_to_notify - - def __check_threshold(self, threshold: float, value: float): - if value is None: - return False - return (threshold > 0 and value > threshold) or ( - threshold < 0 and value < threshold - ) - - def __goal_line_alerts(self, goal_lines: List[Threshold], new_window_value): - alerts = [] - for line in goal_lines: - new_values = new_window_value.get(line.metric, []) - if len(new_values) == 0 or new_values[0] is None: - logger.debug( - f'No value found for metric: {line.metric}, skipping alert creation' - ) - continue - logger.info( - f'metric: {line.metric}, threshould: {line.threshold}, value: {new_values[0]}, type: {AlertType.goal_line}' - ) - if (line.threshold > 0 and new_values[0] >= line.threshold) or ( - line.threshold < 0 and new_values[0] <= line.threshold - ): - alerts.append( - Alert( - metric=line.metric, - threshold=line.threshold, - current_value=new_values[0], - previous_value=None, - diff_value=None, - type=AlertType.goal_line, - ) - ) - return alerts - - # TODO remove periodicity_filter and make it part of yaml - def maybe_extract_alerts( - self, insight_query: SignalQuery, periodicity_filter: str = None - ): - periods = self.__fetch_periods(insight_query.periodicity) - projection_query, _ = self.__fetch_parent_projections(insight_query.projections) - - alerts: List[Alert] = [] - new_value_7d = None - for period in periods: - old_window, new_window = period - if periodicity_filter is not None: - if periodicity_filter != new_window.name: - continue - old_value, new_value = self.__execute_periodic_insights( - insight_query.query.sql, projection_query, old_window, new_window - ) - # TODO goal line will only be checked for L7D - if new_window.name == 'L7D': - new_value_7d = new_value - periodic_alerts = self.__periodic_alerts( - period_name=old_window.name, - old_window_value=old_value, - new_window_value=new_value, - possible_alerts=old_window.alerts, - ) - alerts.extend(periodic_alerts) - - goal_line_alerts = [] - if new_value_7d: - goal_line_alerts = self.__goal_line_alerts( - insight_query.goal_lines, new_value_7d - ) - - alerts.extend(goal_line_alerts) - - return ActionableAlerts( - id=insight_query.id, - name=insight_query.name, - title=insight_query.title, - type=insight_query.type, - description=insight_query.description, - alerts=alerts, - ) - - def __safe_fetch_metric(self, results: dict, metric: str): - value = results.get(metric, []) - return value[0] if len(value) > 0 else None - - def extract_raw_inner_query(self, insight_query: SignalQuery): - detailed_period = 'L7D' - if len(insight_query.periodicity) > 0: - detailed_period = insight_query.periodicity[0].period - period_day_count = int(detailed_period.removeprefix('L').removesuffix('D')) - - # picked the first window - _, new_window = self.__get_windows(detailed_period, period_day_count, []) - return self.repository.execute_query( - insight_query.query.sql, - start_date=new_window.start, - end_date=new_window.end, - ) - - def extract_detailed_insights(self, insight_query: SignalQuery): - projection_query, projections = self.__fetch_child_projections( - insight_query.projections - ) - - detailed_period = 'L7D' - if len(insight_query.periodicity) > 0: - detailed_period = insight_query.periodicity[0].period - period_day_count = int(detailed_period.removeprefix('L').removesuffix('D')) - - # picked the first window - old_window, new_window = self.__get_windows( - detailed_period, period_day_count, [] - ) - - results = self.repository.fetch_insights( - insight_query.query.sql, - projection_query, - start_date=new_window.start, - end_date=new_window.end, - ) - - metrices = [ - Metric( - metric=projection.metric, - name=projection.name, - value=self.__safe_fetch_metric( - results=results, metric=projection.metric - ), - ) - for projection in projections - ] - - if insight_query.plots is None or len(insight_query.plots) == 0: - logger.error('The insights query plots seems to be empty') - return DetailedInsights( - metrices=metrices, data_points=None, goal_lines=insight_query.goal_lines - ) - - projections = [p for p in insight_query.plots[0].metrices] - projection_queries = [ - f'{projection.sql} as {projection.metric}' for projection in projections - ] - pr_query = ','.join(projection_queries) - - raw_data_new: Dict[str, List] = self.repository.fetch_raw_values( - insight_query.query.sql, - projection=pr_query, - start_date=new_window.start, - end_date=new_window.end, - ) - - raw_data_old: Dict[str, List] = self.repository.fetch_raw_values( - insight_query.query.sql, - projection=pr_query, - start_date=old_window.start, - end_date=old_window.end, - ) - - return DetailedInsights( - metrices=metrices, - data_points=DataPoints( - window_type='L7D', old_window=raw_data_old, new_window=raw_data_new - ), - goal_lines=insight_query.goal_lines, - ) - - def fetch_pvo_records( - self, - odata_query: str | None = None, - params: Dict | None = None, - limit: str | None = None, - offset: str | None = None, - table_name: str = None, - ) -> List: - return self.repository.fetch_pvo_record( - odata_query, - params=params, - limit=limit, - offset=offset, - table_name=table_name, - ) - - def update_pvo_records_by_id( - self, - id: str, - table_name: str, - rls_filter: str, - rls_params: Dict[str, Any], - update_data: Dict[str, Any], - ) -> List: - return self.repository.update_pvo_record( - id=id, - table_name=table_name, - update_data=update_data, - odata_condition=rls_filter, - rls_params=rls_params, - ) diff --git a/wavefront/server/modules/insights_module/insights_module/service/pdo_service.py b/wavefront/server/modules/insights_module/insights_module/service/pdo_service.py deleted file mode 100644 index d6228543..00000000 --- a/wavefront/server/modules/insights_module/insights_module/service/pdo_service.py +++ /dev/null @@ -1,167 +0,0 @@ -from abc import ABC -from abc import abstractmethod -import json -import re - -import boto3 -from common_module.utils.odata_parser import prepare_odata_filter -from google.cloud import storage -from insights_module.service.insights_service import InsightsService -from flo_cloud.gcp.gcs import GCSStorage - - -class PdoService(ABC): - @abstractmethod - def fetch_upto_limit( - self, filter: str | None, limit: int, offset: int, table_name: str = None - ): - pass - - @abstractmethod - def patch_record_by_id(self, id: str, table_name: str = None): - pass - - @abstractmethod - def fetch_audio(self): - pass - - @abstractmethod - def fetch_transcript(self): - pass - - -class AWSServices(PdoService): - def __init__( - self, - insights_service: InsightsService, - transcript_bucket_name, - audio_bucket_name, - ): - self._insight_service = insights_service - self._transcript_bucket_name = transcript_bucket_name - self._audio_bucket_name = audio_bucket_name - - def get_bucket_key(self, value: str): - match = re.match(r's3://([^/]+)/(.+)', value) - bucket_name = match.group(1) - key = match.group(2) - return bucket_name, key - - def fetch_upto_limit(self, filter, limit, offset, table_name=None): - odata_filter, params = prepare_odata_filter(filter) - return self._insight_service.fetch_pvo_records( - odata_query=odata_filter, - params=params, - limit=limit, - offset=offset, - table_name=table_name, - ) - - def fetch_audio(self, url): - audio_bucket_name, key = self.get_bucket_key(url) - - s3_client = boto3.client('s3') - presigned_url = s3_client.generate_presigned_url( - 'get_object', - Params={'Bucket': audio_bucket_name, 'Key': key}, - ExpiresIn=1800, - ) - - return presigned_url - - def fetch_transcript(self, url): - transcript_bucket_name, key = self.get_bucket_key(url) - s3_client = boto3.client('s3') - s3_response = s3_client.get_object( - Bucket=transcript_bucket_name, - Key=key, - ) - file_content = s3_response['Body'].read() - transcript_result: dict = json.loads(file_content) - - transcripts = { - 'transcript': transcript_result['transcribe'], - 'translate': transcript_result['translate'], - 'transcribe_diarized': transcript_result.get('transcribe_diarized', None), - 'translated_diarization': transcript_result.get( - 'translated_diarization', None - ), - 'speaker_mapping': transcript_result.get('speaker_mapping', None), - 'translation_diarization': transcript_result.get( - 'translation_diarization', False - ), - 'diarization': transcript_result.get('diarization', False), - } - return transcripts - - def patch_record_by_id(self, id, table_name=None): - raise NotImplementedError( - 'Patch operation is not implemented for AWS services.' - ) - - -class GCPServices(PdoService): - def __init__( - self, - insights_service: InsightsService, - transcript_bucket_name, - audio_bucket_name, - ): - self._insight_service = insights_service - self._transcript_bucket_name = transcript_bucket_name - self._audio_bucket_name = audio_bucket_name - self.client = storage.Client() - self.storage = GCSStorage() - - def get_bucket_key(self, value: str): - match = re.match(r'gs://([^/]+)/(.+)', value) - bucket_name = match.group(1) - key = match.group(2) - return bucket_name, key - - def fetch_upto_limit(self, filter, limit, offset, table_name=None): - odata_filter, params = prepare_odata_filter(filter) - return self._insight_service.fetch_pvo_records( - odata_query=odata_filter, - params=params, - limit=limit, - offset=offset, - table_name=table_name, - ) - - def fetch_audio(self, url): - audio_bucket_name, key = self.get_bucket_key(url) - - presigned_url = self.storage.generate_presigned_url( - bucket_name=audio_bucket_name, - key=key, - type='GET', - expiresIn=300, - ) - - return presigned_url - - def fetch_transcript(self, url): - transcript_bucket_name, key = self.get_bucket_key(url) - - bucket = self.client.bucket(transcript_bucket_name) - blob = bucket.blob(key) - file_content = blob.download_as_bytes() - transcript_result = json.loads(file_content) - - transcripts = { - 'transcript': transcript_result['transcribe'], - 'translate': transcript_result['translate'], - } - - return transcripts - - def patch_record_by_id(self, id, update_data: dict, table_name, rls_filter: str): - odata_filter, params = prepare_odata_filter(rls_filter) - return self._insight_service.update_pvo_records_by_id( - id=id, - table_name=table_name, - rls_filter=odata_filter, - rls_params=params, - update_data=update_data, - ) diff --git a/wavefront/server/modules/insights_module/insights_module/service/usage_metric_service.py b/wavefront/server/modules/insights_module/insights_module/service/usage_metric_service.py deleted file mode 100644 index 4421659f..00000000 --- a/wavefront/server/modules/insights_module/insights_module/service/usage_metric_service.py +++ /dev/null @@ -1,12 +0,0 @@ -from insights_module.repository.pvo_repository import PVORepository - - -class UsageMetricService: - def __init__(self, repository: PVORepository, cloud_provider: str): - self.repository = repository - self.cloud_provider = cloud_provider - - def fetch_usage_metrics(self, start_time: str, end_time: str): - return self.repository.fetch_usage_metrics( - start_time=start_time, end_time=end_time, cloud_provider=self.cloud_provider - ) diff --git a/wavefront/server/modules/insights_module/insights_module/utils/helper.py b/wavefront/server/modules/insights_module/insights_module/utils/helper.py deleted file mode 100644 index 85f45a9c..00000000 --- a/wavefront/server/modules/insights_module/insights_module/utils/helper.py +++ /dev/null @@ -1,19 +0,0 @@ -import collections - - -def fetch_data_filters(data_filters: list) -> str: - group_filter = collections.defaultdict(list) - for data_filter in data_filters: - group_filter[data_filter.key].append(data_filter.value) - - additional_filters = [] - for key, values in group_filter.items(): - if len(values) == 1: - additional_filters.append(f"({key} eq '{values[0]}')") - else: - or_condition = [] - for value in values: - or_condition.append(f"({key} eq '{value}')") - additional_filters.append(f"({'$or'.join(or_condition)})") - - return additional_filters diff --git a/wavefront/server/modules/insights_module/pyproject.toml b/wavefront/server/modules/insights_module/pyproject.toml deleted file mode 100644 index 487a36f6..00000000 --- a/wavefront/server/modules/insights_module/pyproject.toml +++ /dev/null @@ -1,49 +0,0 @@ -[project] -name = "insights-module" -version = "0.0.1" -description = "Insights Creation Module" -authors = [ - { name = "rootflo engineering", email = "engineering@rootflo.ai" } -] -requires-python = ">=3.11" - -dependencies = [ - "common-module", - "db-repo-module", - "pyyaml>=6.0.3,<7", - "dependency-injector>=4.42.0,<5.0.0", - "redshift-connector>=2.1.5,<3.0.0", - "psycopg2>=2.9.10,<3.0.0", - "fastapi>=0.115.2,<1.0.0", - "dacite>=1.9.2,<2.0.0", - "httpx>=0.28.1,<1.0.0", - "boto3<=1.38.40", - "google-cloud-bigquery==3.34.0", - "google-cloud-storage<3.0.0", -] - -[tool.uv.sources] -common-module = { workspace = true } -db-repo-module = { workspace = true } - -[dependency-groups] -dev = [ - "pytest>=8.3.4,<9.0.0", - "pytest-asyncio>=0.24.0,<1.0.0", - "asyncpg>=0.30.0,<1.0.0", - "testing-postgresql>=1.3.0,<2.0.0" -] - -[tool.pytest.ini_options] -asyncio_mode = "auto" -asyncio_default_fixture_loop_scope = "function" - -[tool.uv] -package = true - -[build-system] -requires = ["hatchling"] -build-backend = "hatchling.build" - -[tool.hatch.build.targets.wheel] -packages = ["insights_module"] diff --git a/wavefront/server/modules/insights_module/tests/conftest.py b/wavefront/server/modules/insights_module/tests/conftest.py deleted file mode 100644 index a023079e..00000000 --- a/wavefront/server/modules/insights_module/tests/conftest.py +++ /dev/null @@ -1,239 +0,0 @@ -import json -from unittest.mock import Mock -from uuid import uuid4 - -from auth_module.auth_container import AuthContainer -from common_module.common_container import CommonContainer -from common_module.middleware.request_id_middleware import RequestIdMiddleware -from db_repo_module.database.base import Base -from db_repo_module.db_repo_container import DatabaseModuleContainer -from dependency_injector import providers -from fastapi import FastAPI -from fastapi.testclient import TestClient -from insights_module.controllers.pdo_controller import pdo_router -from insights_module.insights_container import InsightsContainer -import pytest -from sqlalchemy.ext.asyncio import async_sessionmaker -from sqlalchemy.ext.asyncio import create_async_engine -import testing.postgresql -from user_management_module.authorization.require_auth import RequireAuthMiddleware -from user_management_module.user_container import UserContainer - - -class MockDbClient: - def __init__(self, engine, session_factory): - self._engine = engine - self.session = session_factory - - -@pytest.fixture -async def test_engine(): - with testing.postgresql.Postgresql() as postgresql: - database_url = postgresql.url() - - async_database_url = database_url.replace( - 'postgresql://', 'postgresql+psycopg://' - ) - - engine = create_async_engine(async_database_url) - - async with engine.begin() as conn: - await conn.run_sync(Base.metadata.create_all) - - yield engine - - async with engine.begin() as conn: - await conn.run_sync(Base.metadata.drop_all) - await engine.dispose() - - -@pytest.fixture -async def test_session(test_engine): - async_session = async_sessionmaker(autocommit=False, bind=test_engine) - yield async_session - - -@pytest.fixture -def test_user_id(): - """Fixture to provide a consistent test user ID.""" - return str(uuid4()) - - -@pytest.fixture -def test_session_id(): - """Fixture to provide a consistent test session ID.""" - return str(uuid4()) - - -@pytest.fixture -def setup_containers(test_engine, test_session, test_user_id, test_session_id): - # setting up the dependencies for the requireauth middleware - auth_container = AuthContainer() - common_container = CommonContainer() - user_container = UserContainer() - - db_repo_container = DatabaseModuleContainer() - mock_db_client = MockDbClient(test_engine, test_session) - db_repo_container.db_client.override(mock_db_client) - - # mocking the cache manager - cache_manager_mock = Mock() - cache_manager_mock.get_str.return_value = json.dumps( - {'user_id': test_user_id, 'session_id': test_session_id} - ) - cache_manager_mock.add = Mock() - common_container.cache_manager.override(cache_manager_mock) - - common_container.cache_manager.override(db_repo_container.cache_manager) - insights_container = InsightsContainer( - notification_repository=db_repo_container.notification_repository, - ) - - # Mock connector for PVORepository - mock_connector = Mock() - mock_connector.execute_query.return_value = ([], []) - insights_container.connector.override(providers.Singleton(lambda: mock_connector)) - - # Mock cloud service - mock_cloud_service = Mock() - mock_cloud_service.fetch_audio.return_value = ( - 'https://example.com/audio/test_audio.mp3' - ) - mock_cloud_service.fetch_upto_limit.return_value = [ - { - 'id': 'test_id_1', - 'conversation_id': 'conv_1', - 'created_at': '2024-03-20T10:00:00', - 'rf_transcription_status': 'success', - 'rf_insights_status': 'success', - 'total_duration': 300, - }, - { - 'id': 'test_id_2', - 'conversation_id': 'conv_2', - 'created_at': '2024-03-20T11:00:00', - 'rf_transcription_status': 'success', - 'rf_insights_status': 'success', - 'total_duration': 450, - }, - ] - mock_cloud_service.fetch_transcript.return_value = { - 'transcript': 'This is a test transcript', - 'metadata': {'duration': 300, 'speaker_count': 2}, - } - - insights_container.cloud_service.override( - providers.Singleton(lambda: mock_cloud_service) - ) - - # mocking the token service - mock_token_service = Mock() - mock_token_service.create_token.return_value = 'mock_token' - mock_token_service.decode_token.return_value = { - 'sub': 'test@example.com', - 'user_id': test_user_id, - 'role_id': 'test_role_id', - 'session_id': test_session_id, - } - mock_token_service.token_expiry = 3600 - mock_token_service.temporary_token_expiry = 600 - - # overriding the auth container dependencies - auth_container.token_service.override(mock_token_service) - auth_container.db_client.override(db_repo_container.db_client) - auth_container.cache_manager.override(cache_manager_mock) - - # overriding the user container dependencies - user_container.db_client.override(db_repo_container.db_client) - user_container.cache_manager.override(cache_manager_mock) - - auth_container.wire( - packages=[ - 'insights_module.controllers', - 'user_management_module.authorization', - ] - ) - - user_container.wire( - packages=[ - 'user_management_module.authorization', - 'auth_module.controllers', - 'insights_module.controllers', - ] - ) - common_container.wire( - packages=[ - 'insights_module.controllers', - 'user_management_module.authorization', - ] - ) - insights_container.wire( - packages=[ - 'insights_module.controllers', - ] - ) - - yield auth_container, common_container, user_container, insights_container - auth_container.unwire() - common_container.unwire() - user_container.unwire() - - -@pytest.fixture -def test_client(setup_containers): - app = FastAPI() - app.add_middleware(RequestIdMiddleware) - app.add_middleware(RequireAuthMiddleware) - app.include_router(pdo_router, prefix='/floware/v1/insights') - return TestClient(app) - - -@pytest.fixture -def auth_token(setup_containers, test_user_id, test_session_id): - auth_container, _, _, _ = setup_containers - token_service = auth_container.token_service() - token = token_service.create_token( - sub='test@example.com', - user_id=test_user_id, - role_id='test_role_id', - session_id=test_session_id, - ) - return token - - -@pytest.fixture -def mocking_pdo_controller_is_admin(monkeypatch): - async def mock_check_admin(role_id): - return True - - monkeypatch.setattr( - 'insights_module.controllers.pdo_controller.check_admin', - mock_check_admin, - ) - - -@pytest.fixture -def mocking_pdo_controller_not_admin(monkeypatch): - async def mock_check_admin(role_id): - return False - - monkeypatch.setattr( - 'insights_module.controllers.pdo_controller.check_admin', - mock_check_admin, - ) - - -@pytest.fixture -def mock_pvo_repository_emptydata(setup_containers): - _, _, _, insights_container = setup_containers - mock_cloud_service = Mock() - mock_cloud_service.fetch_audio.return_value = ( - 'https://example.com/audio/test_audio.mp3' - ) - mock_cloud_service.fetch_upto_limit.return_value = [] - - # Configure the mock to return the dictionary directly using AsyncMock - mock_cloud_service.fetch_transcript = Mock(return_value={}) - insights_container.cloud_service.override( - providers.Singleton(lambda: mock_cloud_service) - ) diff --git a/wavefront/server/modules/insights_module/tests/test_pvo_controller.py b/wavefront/server/modules/insights_module/tests/test_pvo_controller.py deleted file mode 100644 index c26d3e96..00000000 --- a/wavefront/server/modules/insights_module/tests/test_pvo_controller.py +++ /dev/null @@ -1,275 +0,0 @@ -from db_repo_module.models.resource import Resource -from db_repo_module.models.resource import ResourceScope -from db_repo_module.models.role import Role -from db_repo_module.models.role_resource import RoleResource -from db_repo_module.models.session import Session -from db_repo_module.models.user import User -from db_repo_module.models.user_role import UserRole -import pytest -from sqlalchemy.ext.asyncio import AsyncSession - - -@pytest.mark.asyncio -async def create_test_resources_and_roles(test_session: AsyncSession, test_user_id): - async with test_session() as session: - # Create and commit role first - role = Role( - id='test_role_id', name='test_role', description='Test role for PDO access' - ) - session.add(role) - await session.commit() - - # Create and commit resource - resource = Resource( - id='test_resource_id', - key='test_resource', - value='test_value', - scope=ResourceScope.DATA, - ) - session.add(resource) - await session.commit() - - # Create role-resource mapping - role_resource = RoleResource( - role_id='test_role_id', resource_id='test_resource_id' - ) - session.add(role_resource) - - # Create user-role mapping - user_role = UserRole(user_id=test_user_id, role_id='test_role_id') - session.add(user_role) - await session.commit() - - -@pytest.mark.asyncio -async def create_session(test_session: AsyncSession, test_user_id, test_session_id): - user = User( - id=test_user_id, - email='test@example.com', - password='hashed_password', - first_name='Test', - last_name='User', - ) - - # Create a session in the database - db_session = Session( - id=test_session_id, user_id=test_user_id, device_info='test_device' - ) - - async with test_session() as session: - session.add(user) - session.add(db_session) - await session.commit() - - -@pytest.mark.asyncio -async def test_get_pvo_records( - test_session: AsyncSession, - test_user_id, - test_session_id, - test_client, - auth_token, - mocking_pdo_controller_is_admin, -): - await create_session(test_session, test_user_id, test_session_id) - response = test_client.get( - '/floware/v1/insights/parsed_data_object', - headers={'Authorization': f'Bearer {auth_token}'}, - ) - assert response.status_code == 200 - assert len(response.json()['data']['records']) == 2 - - -@pytest.mark.asyncio -async def test_get_pvo_record_with_empty_result( - test_session: AsyncSession, - test_user_id, - test_session_id, - test_client, - auth_token, - mocking_pdo_controller_is_admin, - mock_pvo_repository_emptydata, -): - await create_session(test_session, test_user_id, test_session_id) - response = test_client.get( - '/floware/v1/insights/parsed_data_object', - headers={'Authorization': f'Bearer {auth_token}'}, - ) - assert response.status_code == 200 - assert len(response.json()['data']['records']) == 0 - - -@pytest.mark.asyncio -async def test_get_pvo_records_without_admin_without_data_filter( - test_session: AsyncSession, - test_user_id, - test_session_id, - test_client, - auth_token, - mocking_pdo_controller_not_admin, -): - await create_session(test_session, test_user_id, test_session_id) - response = test_client.get( - '/floware/v1/insights/parsed_data_object', - headers={'Authorization': f'Bearer {auth_token}'}, - ) - assert response.status_code == 403 - - -@pytest.mark.asyncio -async def test_get_pvo_record_without_admin_and_data_filter( - test_session: AsyncSession, - test_user_id, - test_session_id, - test_client, - auth_token, - mocking_pdo_controller_not_admin, -): - await create_session(test_session, test_user_id, test_session_id) - await create_test_resources_and_roles(test_session, test_user_id) - - response = test_client.get( - '/floware/v1/insights/parsed_data_object', - headers={'Authorization': f'Bearer {auth_token}'}, - ) - assert response.status_code == 200 - assert len(response.json()['data']['records']) == 2 - - -@pytest.mark.asyncio -async def test_get_pvo_records_with_pagination( - test_session: AsyncSession, - test_user_id, - test_session_id, - test_client, - auth_token, - mocking_pdo_controller_is_admin, -): - """Test getting PDO records with pagination parameters""" - await create_session(test_session, test_user_id, test_session_id) - await create_test_resources_and_roles(test_session, test_user_id) - response = test_client.get( - '/floware/v1/insights/parsed_data_object?limit=1&offset=1', - headers={'Authorization': f'Bearer {auth_token}'}, - ) - assert response.status_code == 200 - assert ( - len(response.json()['data']['records']) == 2 - ) # bcz the fucntion always return 2 records - - -@pytest.mark.asyncio -async def test_get_pvo_records_with_filter( - test_session: AsyncSession, - test_user_id, - test_session_id, - test_client, - auth_token, - mocking_pdo_controller_is_admin, -): - """Test getting PDO records with filter parameter""" - await create_session(test_session, test_user_id, test_session_id) - await create_test_resources_and_roles(test_session, test_user_id) - response = test_client.get( - '/floware/v1/insights/parsed_data_object?$filter=conversation_id eq conv', - headers={'Authorization': f'Bearer {auth_token}'}, - ) - assert response.status_code == 200 - - -@pytest.mark.asyncio -async def test_get_pvo_audio( - test_session: AsyncSession, - test_user_id, - test_session_id, - test_client, - auth_token, - mocking_pdo_controller_is_admin, -): - """Test getting audio URL for a PDO record""" - await create_session(test_session, test_user_id, test_session_id) - response = test_client.get( - '/floware/v1/insights/parsed_data_object/audio?resource_url=test_audio.mp3', - headers={'Authorization': f'Bearer {auth_token}'}, - ) - assert response.status_code == 200 - assert 'audio_url' in response.json()['data'] - assert response.json()['data']['audio_url'] is not None - - -@pytest.mark.asyncio -async def test_get_pvo_audio_without_auth( - test_session: AsyncSession, test_user_id, test_session_id, test_client -): - """Test getting audio URL without authentication""" - await create_session(test_session, test_user_id, test_session_id) - response = test_client.get( - '/floware/v1/insights/parsed_data_object/audio?resource_url=test_audio.mp3' - ) - assert response.status_code == 401 - - -@pytest.mark.asyncio -async def test_get_pvo_audio_without_resource_url( - test_session: AsyncSession, - test_user_id, - test_session_id, - test_client, - auth_token, - mocking_pdo_controller_is_admin, -): - """Test getting audio URL without providing resource_url parameter""" - await create_session(test_session, test_user_id, test_session_id) - response = test_client.get( - '/floware/v1/insights/parsed_data_object/audio', - headers={'Authorization': f'Bearer {auth_token}'}, - ) - assert response.status_code == 422 # FastAPI validation error - - -@pytest.mark.asyncio -async def test_get_pvo_transcript( - test_session: AsyncSession, - test_user_id, - test_session_id, - test_client, - auth_token, - mocking_pdo_controller_is_admin, -): - """Test getting transcript for a PDO record""" - await create_session(test_session, test_user_id, test_session_id) - response = test_client.get( - '/floware/v1/insights/parsed_data_object/transcript?resource_url=test_transcript.json', - headers={'Authorization': f'Bearer {auth_token}'}, - ) - assert response.status_code == 200 - - -@pytest.mark.asyncio -async def test_get_pvo_transcript_without_auth( - test_session: AsyncSession, test_user_id, test_session_id, test_client -): - """Test getting transcript without authentication""" - await create_session(test_session, test_user_id, test_session_id) - response = test_client.get( - '/floware/v1/insights/parsed_data_object/transcript?resource_url=test_transcript.json' - ) - assert response.status_code == 401 - - -@pytest.mark.asyncio -async def test_get_pvo_transcript_without_resource_url( - test_session: AsyncSession, - test_user_id, - test_session_id, - test_client, - auth_token, - mocking_pdo_controller_is_admin, -): - """Test getting transcript without providing resource_url parameter""" - await create_session(test_session, test_user_id, test_session_id) - response = test_client.get( - '/floware/v1/insights/parsed_data_object/transcript', - headers={'Authorization': f'Bearer {auth_token}'}, - ) - assert response.status_code == 422 # FastAPI validation error diff --git a/wavefront/server/uv.lock b/wavefront/server/uv.lock index e90c433e..7852ac63 100644 --- a/wavefront/server/uv.lock +++ b/wavefront/server/uv.lock @@ -33,7 +33,6 @@ members = [ "gold-module", "inference-app", "inference-module", - "insights-module", "knowledge-base-module", "llm-inference-config-module", "plugins-module", @@ -1013,15 +1012,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/0d/c3/e90f4a4feae6410f914f8ebac129b9ae7a8c92eb60a638012dde42030a9d/cryptography-46.0.3-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:6b5063083824e5509fdba180721d55909ffacccc8adbec85268b48439423d78c", size = 3438528, upload-time = "2025-10-15T23:18:26.227Z" }, ] -[[package]] -name = "dacite" -version = "1.9.2" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/55/a0/7ca79796e799a3e782045d29bf052b5cde7439a2bbb17f15ff44f7aacc63/dacite-1.9.2.tar.gz", hash = "sha256:6ccc3b299727c7aa17582f0021f6ae14d5de47c7227932c47fec4cdfefd26f09", size = 22420, upload-time = "2025-02-05T09:27:29.757Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/94/35/386550fd60316d1e37eccdda609b074113298f23cef5bddb2049823fe666/dacite-1.9.2-py3-none-any.whl", hash = "sha256:053f7c3f5128ca2e9aceb66892b1a3c8936d02c686e707bee96e19deef4bc4a0", size = 16600, upload-time = "2025-02-05T09:27:24.345Z" }, -] - [[package]] name = "dataclasses-json" version = "0.6.7" @@ -1537,7 +1527,6 @@ dependencies = [ { name = "fastapi" }, { name = "gold-module" }, { name = "inference-module" }, - { name = "insights-module" }, { name = "knowledge-base-module" }, { name = "llm-inference-config-module" }, { name = "plugins-module" }, @@ -1562,7 +1551,6 @@ requires-dist = [ { name = "fastapi", specifier = ">=0.115.2,<1.0.0" }, { name = "gold-module", editable = "modules/gold_module" }, { name = "inference-module", editable = "modules/inference_module" }, - { name = "insights-module", editable = "modules/insights_module" }, { name = "knowledge-base-module", editable = "modules/knowledge_base_module" }, { name = "llm-inference-config-module", editable = "modules/llm_inference_config_module" }, { name = "plugins-module", editable = "modules/plugins_module" }, @@ -2440,57 +2428,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/2c/e1/e6716421ea10d38022b952c159d5161ca1193197fb744506875fbb87ea7b/iniconfig-2.1.0-py3-none-any.whl", hash = "sha256:9deba5723312380e77435581c6bf4935c94cbfab9b1ed33ef8d238ea168eb760", size = 6050, upload-time = "2025-03-19T20:10:01.071Z" }, ] -[[package]] -name = "insights-module" -version = "0.0.1" -source = { editable = "modules/insights_module" } -dependencies = [ - { name = "boto3" }, - { name = "common-module" }, - { name = "dacite" }, - { name = "db-repo-module" }, - { name = "dependency-injector" }, - { name = "fastapi" }, - { name = "google-cloud-bigquery" }, - { name = "google-cloud-storage" }, - { name = "httpx" }, - { name = "psycopg2" }, - { name = "pyyaml" }, - { name = "redshift-connector" }, -] - -[package.dev-dependencies] -dev = [ - { name = "asyncpg" }, - { name = "pytest" }, - { name = "pytest-asyncio" }, - { name = "testing-postgresql" }, -] - -[package.metadata] -requires-dist = [ - { name = "boto3", specifier = "<=1.38.40" }, - { name = "common-module", editable = "modules/common_module" }, - { name = "dacite", specifier = ">=1.9.2,<2.0.0" }, - { name = "db-repo-module", editable = "modules/db_repo_module" }, - { name = "dependency-injector", specifier = ">=4.42.0,<5.0.0" }, - { name = "fastapi", specifier = ">=0.115.2,<1.0.0" }, - { name = "google-cloud-bigquery", specifier = "==3.34.0" }, - { name = "google-cloud-storage", specifier = "<3.0.0" }, - { name = "httpx", specifier = ">=0.28.1,<1.0.0" }, - { name = "psycopg2", specifier = ">=2.9.10,<3.0.0" }, - { name = "pyyaml", specifier = ">=6.0.3,<7" }, - { name = "redshift-connector", specifier = ">=2.1.5,<3.0.0" }, -] - -[package.metadata.requires-dev] -dev = [ - { name = "asyncpg", specifier = ">=0.30.0,<1.0.0" }, - { name = "pytest", specifier = ">=8.3.4,<9.0.0" }, - { name = "pytest-asyncio", specifier = ">=0.24.0,<1.0.0" }, - { name = "testing-postgresql", specifier = ">=1.3.0,<2.0.0" }, -] - [[package]] name = "isodate" version = "0.7.2"