diff --git a/wavefront/server/plugins/datasource/datasource/__init__.py b/wavefront/server/plugins/datasource/datasource/__init__.py index 90d4d718..c6694d7c 100644 --- a/wavefront/server/plugins/datasource/datasource/__init__.py +++ b/wavefront/server/plugins/datasource/datasource/__init__.py @@ -13,7 +13,7 @@ from .odata_parser import ODataQueryParser -class DatasourcePlugin(DataSourceABC): +class DatasourcePlugin: def __init__( self, datasource_type: DataSourceType, @@ -65,7 +65,7 @@ def fetch_data( ) -> QueryResult: where_clause, params = self.odata_parser.prepare_odata_filter(filter) join_query, table_aliases, join_where_clause, join_params = ( - self.odata_parser.prepare_odata_joins(join, table_name) + self.odata_parser.prepare_odata_joins(join or '', table_name) ) where_clause = where_clause if where_clause else 'true' @@ -110,13 +110,13 @@ async def execute_dynamic_query( rls_filter ) result_by_query: Dict[str, Any] = await self.datasource.execute_dynamic_query( - query, - offset, - limit, - odata_filter, - odata_params, - odata_data_filter, - odata_data_params, - params, + query=query, + odata_filter=odata_filter, + odata_params=odata_params, + odata_data_filter=odata_data_filter, + odata_data_params=odata_data_params, + offset=offset, + limit=limit, + params=params, ) return result_by_query diff --git a/wavefront/server/plugins/datasource/datasource/bigquery/__init__.py b/wavefront/server/plugins/datasource/datasource/bigquery/__init__.py index 380751f0..76d97a4d 100644 --- a/wavefront/server/plugins/datasource/datasource/bigquery/__init__.py +++ b/wavefront/server/plugins/datasource/datasource/bigquery/__init__.py @@ -20,9 +20,17 @@ def __init__(self, config: BigQueryConfig): async def test_connection(self) -> bool: return await self.client.test_connection() - def get_schema(self, table_id: str) -> dict: - table_info = self.client.get_table_info(self.config.dataset_id, table_id) - return table_info['schema'] or {} + def get_schema(self) -> dict: + tables = self.client.list_tables(self.config.dataset_id) + return { + table.table_id: ( + self.client.get_table_info(self.config.dataset_id, table.table_id).get( + 'schema' + ) + or {} + ) + for table in tables + } def get_table_names(self, **kwargs) -> list[str]: dataset_id = kwargs.get('dataset_id', self.config.dataset_id) @@ -32,24 +40,29 @@ def get_table_names(self, **kwargs) -> list[str]: def fetch_data( self, table_names: List[str], - projection: str = '*', - where_clause: str = 'true', + projection: Optional[str] = '*', + where_clause: Optional[str] = 'true', join_query: Optional[str] = None, params: Optional[Dict[str, Any]] = None, - offset: int = 0, - limit: int = 1000, + offset: Optional[int] = 0, + limit: Optional[int] = 1000, order_by: Optional[str] = None, group_by: Optional[str] = None, ) -> List[Dict[str, Any]]: + projection_value = projection or '*' + where_clause_value = where_clause or 'true' + limit_value = limit if limit is not None else 1000 + offset_value = offset if offset is not None else 0 + result = self.client.execute_query_to_dict( - projection=projection, + projection=projection_value, table_prefix=self.table_prefix, table_names=table_names, - where_clause=where_clause, + where_clause=where_clause_value, join_query=join_query, params=params, - limit=limit, - offset=offset, + limit=limit_value, + offset=offset_value, order_by=order_by, group_by=group_by, ) @@ -73,19 +86,19 @@ async def execute_query( async def execute_dynamic_query( self, - queries: List[Dict[str, Any]], - offset: Optional[int] = 0, - limit: Optional[int] = 100, + query: List[Dict[str, Any]], odata_filter: Optional[str] = None, odata_params: Optional[Dict[str, Any]] = None, odata_data_filter: Optional[str] = None, odata_data_params: Optional[Dict[str, Any]] = None, + offset: Optional[int] = 0, + limit: Optional[int] = 100, params: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: results = {} tasks = [] - for query_obj in queries: + for query_obj in query: query_to_execute = query_obj.get('query', '') query_params = query_obj.get('parameters', []) query_id = query_obj.get('id') diff --git a/wavefront/server/plugins/datasource/datasource/odata_parser.py b/wavefront/server/plugins/datasource/datasource/odata_parser.py index fedaeb72..f222d0c4 100644 --- a/wavefront/server/plugins/datasource/datasource/odata_parser.py +++ b/wavefront/server/plugins/datasource/datasource/odata_parser.py @@ -79,7 +79,7 @@ def read_string(self, quote_char: str) -> str: self.advance() if self.current_char == quote_char: - result += self.current_char + result += quote_char self.advance() return result @@ -217,8 +217,8 @@ def get_next_token(self) -> Token: class ODataParserABC(ABC): @abstractmethod def prepare_odata_filter( - self, filter_expr: str, dynamic_var_char: str = '@' - ) -> Tuple[str, dict]: + self, filter_expr: Optional[str] + ) -> Tuple[Optional[str], Optional[Dict[str, Any]]]: pass @abstractmethod @@ -240,7 +240,9 @@ def __init__(self, type: str, dynamic_var_char: str = '@'): else: raise ValueError(f'Invalid type: {self.type}') - def prepare_odata_filter(self, odata_filter: str) -> Dict[str, Any]: + def prepare_odata_filter( + self, odata_filter: Optional[str] + ) -> Tuple[Optional[str], Optional[Dict[str, Any]]]: return self.parser.prepare_odata_filter(odata_filter) def prepare_odata_joins( @@ -666,7 +668,9 @@ class SQLODataParser(ODataParserABC): def __init__(self, dynamic_var_char: str = '@'): self.dynamic_var_char = dynamic_var_char - def prepare_odata_filter(self, filter_expr: str) -> Tuple[str, dict]: + def prepare_odata_filter( + self, filter_expr: Optional[str] + ) -> Tuple[Optional[str], Optional[Dict[str, Any]]]: """Parses an OData-like filter expression and converts it into a SQL-like query with parameters.""" if not filter_expr: return None, None @@ -800,6 +804,7 @@ def build_joins( sql_filter, filter_params = filter_parser.prepare_odata_filter( filter_expr ) + filter_params = filter_params or {} if sql_filter: # Prefix the filter with table name to avoid ambiguity # Replace @param with @table_param_ to match the new parameter names diff --git a/wavefront/server/plugins/datasource/datasource/redshift/__init__.py b/wavefront/server/plugins/datasource/datasource/redshift/__init__.py index 7de460ee..aed5a871 100644 --- a/wavefront/server/plugins/datasource/datasource/redshift/__init__.py +++ b/wavefront/server/plugins/datasource/datasource/redshift/__init__.py @@ -22,7 +22,11 @@ async def test_connection(self) -> bool: return await asyncio.to_thread(self.client.test_connection) def get_schema(self) -> dict: - return self.client.get_table_info() + table_names = self.client.list_tables() + return { + table_name: self.client.get_table_info(table_name) + for table_name in table_names + } def get_table_names(self, **kwargs) -> list[str]: return self.client.list_tables() @@ -30,24 +34,29 @@ def get_table_names(self, **kwargs) -> list[str]: def fetch_data( self, table_names: List[str], - projection: str = '*', - where_clause: str = 'true', + projection: Optional[str] = '*', + where_clause: Optional[str] = 'true', join_query: Optional[str] = None, params: Optional[Dict[str, Any]] = None, - offset: int = 0, - limit: int = 10, + offset: Optional[int] = 0, + limit: Optional[int] = 10, order_by: Optional[str] = None, group_by: Optional[str] = None, ) -> List[Dict[str, Any]]: + projection_value = projection or '*' + where_clause_value = where_clause or 'true' + limit_value = limit if limit is not None else 10 + offset_value = offset if offset is not None else 0 + return self.client.execute_query_to_dict( - projection=projection, + projection=projection_value, table_prefix=f'{self.db_name}.', table_names=table_names, - where_clause=where_clause, + where_clause=where_clause_value, join_query=join_query, params=params, - limit=limit, - offset=offset, + limit=limit_value, + offset=offset_value, order_by=order_by, group_by=group_by, ) diff --git a/wavefront/server/plugins/datasource/datasource/types.py b/wavefront/server/plugins/datasource/datasource/types.py index 5aeb161e..4314ef4b 100644 --- a/wavefront/server/plugins/datasource/datasource/types.py +++ b/wavefront/server/plugins/datasource/datasource/types.py @@ -91,6 +91,7 @@ async def execute_dynamic_query( odata_data_params: Optional[Dict[str, Any]] = None, offset: Optional[int] = 0, limit: Optional[int] = 100, + params: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: pass