Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions wavefront/server/plugins/datasource/datasource/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from .odata_parser import ODataQueryParser


class DatasourcePlugin(DataSourceABC):
class DatasourcePlugin:
def __init__(
self,
datasource_type: DataSourceType,
Expand Down Expand Up @@ -65,7 +65,7 @@ def fetch_data(
) -> QueryResult:
where_clause, params = self.odata_parser.prepare_odata_filter(filter)
join_query, table_aliases, join_where_clause, join_params = (
self.odata_parser.prepare_odata_joins(join, table_name)
self.odata_parser.prepare_odata_joins(join or '', table_name)
)

where_clause = where_clause if where_clause else 'true'
Expand Down Expand Up @@ -110,13 +110,13 @@ async def execute_dynamic_query(
rls_filter
)
result_by_query: Dict[str, Any] = await self.datasource.execute_dynamic_query(
query,
offset,
limit,
odata_filter,
odata_params,
odata_data_filter,
odata_data_params,
params,
query=query,
odata_filter=odata_filter,
odata_params=odata_params,
odata_data_filter=odata_data_filter,
odata_data_params=odata_data_params,
offset=offset,
limit=limit,
params=params,
)
return result_by_query
43 changes: 28 additions & 15 deletions wavefront/server/plugins/datasource/datasource/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,17 @@ def __init__(self, config: BigQueryConfig):
async def test_connection(self) -> bool:
return await self.client.test_connection()

def get_schema(self, table_id: str) -> dict:
table_info = self.client.get_table_info(self.config.dataset_id, table_id)
return table_info['schema'] or {}
def get_schema(self) -> dict:
tables = self.client.list_tables(self.config.dataset_id)
return {
table.table_id: (
self.client.get_table_info(self.config.dataset_id, table.table_id).get(
'schema'
)
or {}
)
for table in tables
}
Comment on lines +23 to +33
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify the breaking call in bigquery_tools.py
rg -n 'get_schema\(' wavefront/server/modules/tools_module/tools_module/datasources/bigquery_tools.py -B2 -A2

Repository: rootflo/wavefront

Length of output: 426


🏁 Script executed:

rg -n 'schema_info' wavefront/server/modules/tools_module/tools_module/datasources/bigquery_tools.py -B2 -A10 | head -50

Repository: rootflo/wavefront

Length of output: 960


Breaking change: get_schema() will crash bigquery_tools.py caller.

The method signature changed from accepting a table_id parameter to taking no arguments. The caller at line 47 of bigquery_tools.py invokes plugin.get_schema(table_id), which will raise TypeError: get_schema() takes 1 positional argument but 2 were given.

Additionally, the return value changed fundamentally: the new implementation returns a dict mapping all table IDs in the dataset to their schemas ({table_id: schema, ...}), while the caller expects a single schema object. Line 55 concatenates str(schema_info), which would stringify the entire dict of all schemas instead of a single schema.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/server/plugins/datasource/datasource/bigquery/__init__.py` around
lines 23 - 33, The get_schema change breaks callers expecting to pass a table_id
and receive that table's schema: restore get_schema(self, table_id) to accept a
table_id parameter (keep backward-compatible self-only overload only if needed)
and have it return the schema object for that single table (not a mapping of all
tables); specifically modify BigQueryPlugin.__init__.py's get_schema
implementation to call self.client.get_table_info(self.config.dataset_id,
table_id).get('schema') (or {} / None on missing) so plugin.get_schema(table_id)
returns the exact schema the caller in bigquery_tools.py expects and does not
raise a TypeError.


def get_table_names(self, **kwargs) -> list[str]:
dataset_id = kwargs.get('dataset_id', self.config.dataset_id)
Expand All @@ -32,24 +40,29 @@ def get_table_names(self, **kwargs) -> list[str]:
def fetch_data(
self,
table_names: List[str],
projection: str = '*',
where_clause: str = 'true',
projection: Optional[str] = '*',
where_clause: Optional[str] = 'true',
join_query: Optional[str] = None,
params: Optional[Dict[str, Any]] = None,
offset: int = 0,
limit: int = 1000,
offset: Optional[int] = 0,
limit: Optional[int] = 1000,
order_by: Optional[str] = None,
group_by: Optional[str] = None,
) -> List[Dict[str, Any]]:
projection_value = projection or '*'
where_clause_value = where_clause or 'true'
limit_value = limit if limit is not None else 1000
offset_value = offset if offset is not None else 0

result = self.client.execute_query_to_dict(
projection=projection,
projection=projection_value,
table_prefix=self.table_prefix,
table_names=table_names,
where_clause=where_clause,
where_clause=where_clause_value,
join_query=join_query,
params=params,
limit=limit,
offset=offset,
limit=limit_value,
offset=offset_value,
order_by=order_by,
group_by=group_by,
)
Expand All @@ -73,19 +86,19 @@ async def execute_query(

async def execute_dynamic_query(
self,
queries: List[Dict[str, Any]],
offset: Optional[int] = 0,
limit: Optional[int] = 100,
query: List[Dict[str, Any]],
odata_filter: Optional[str] = None,
odata_params: Optional[Dict[str, Any]] = None,
odata_data_filter: Optional[str] = None,
odata_data_params: Optional[Dict[str, Any]] = None,
offset: Optional[int] = 0,
limit: Optional[int] = 100,
params: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
results = {}
tasks = []

for query_obj in queries:
for query_obj in query:
query_to_execute = query_obj.get('query', '')
query_params = query_obj.get('parameters', [])
query_id = query_obj.get('id')
Expand Down
15 changes: 10 additions & 5 deletions wavefront/server/plugins/datasource/datasource/odata_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def read_string(self, quote_char: str) -> str:
self.advance()

if self.current_char == quote_char:
result += self.current_char
result += quote_char
self.advance()

return result
Expand Down Expand Up @@ -217,8 +217,8 @@ def get_next_token(self) -> Token:
class ODataParserABC(ABC):
@abstractmethod
def prepare_odata_filter(
self, filter_expr: str, dynamic_var_char: str = '@'
) -> Tuple[str, dict]:
self, filter_expr: Optional[str]
) -> Tuple[Optional[str], Optional[Dict[str, Any]]]:
pass

@abstractmethod
Expand All @@ -240,7 +240,9 @@ def __init__(self, type: str, dynamic_var_char: str = '@'):
else:
raise ValueError(f'Invalid type: {self.type}')

def prepare_odata_filter(self, odata_filter: str) -> Dict[str, Any]:
def prepare_odata_filter(
self, odata_filter: Optional[str]
) -> Tuple[Optional[str], Optional[Dict[str, Any]]]:
return self.parser.prepare_odata_filter(odata_filter)

def prepare_odata_joins(
Expand Down Expand Up @@ -666,7 +668,9 @@ class SQLODataParser(ODataParserABC):
def __init__(self, dynamic_var_char: str = '@'):
self.dynamic_var_char = dynamic_var_char

def prepare_odata_filter(self, filter_expr: str) -> Tuple[str, dict]:
def prepare_odata_filter(
self, filter_expr: Optional[str]
) -> Tuple[Optional[str], Optional[Dict[str, Any]]]:
"""Parses an OData-like filter expression and converts it into a SQL-like query with parameters."""
if not filter_expr:
return None, None
Expand Down Expand Up @@ -800,6 +804,7 @@ def build_joins(
sql_filter, filter_params = filter_parser.prepare_odata_filter(
filter_expr
)
filter_params = filter_params or {}
if sql_filter:
# Prefix the filter with table name to avoid ambiguity
# Replace @param with @table_param_ to match the new parameter names
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,41 @@ async def test_connection(self) -> bool:
return await asyncio.to_thread(self.client.test_connection)

def get_schema(self) -> dict:
return self.client.get_table_info()
table_names = self.client.list_tables()
return {
table_name: self.client.get_table_info(table_name)
for table_name in table_names
}
Comment on lines 24 to +29
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for get_schema calls that pass arguments
rg -n 'get_schema\s*\(' --type=py -C2 | grep -v 'def get_schema'

Repository: rootflo/wavefront

Length of output: 2781


🏁 Script executed:

# Check the abstract method signature in types.py
cat -n wavefront/server/plugins/datasource/datasource/types.py | sed -n '56,65p'

Repository: rootflo/wavefront

Length of output: 307


🏁 Script executed:

# Check BigQuery plugin's get_schema implementation
cat -n wavefront/server/plugins/datasource/datasource/bigquery/__init__.py | sed -n '20,35p'

Repository: rootflo/wavefront

Length of output: 683


🏁 Script executed:

# Check full context of bigquery_tools.py around line 47
cat -n wavefront/server/modules/tools_module/tools_module/datasources/bigquery_tools.py | sed -n '40,55p'

Repository: rootflo/wavefront

Length of output: 862


🏁 Script executed:

# Search for all get_schema method implementations
rg -n 'def get_schema' --type=py -A3

Repository: rootflo/wavefront

Length of output: 1636


API signature mismatch: bigquery_tools.py incompatible with updated get_schema() signature.

The abstract method get_schema(self) -> dict (types.py) no longer accepts a table_id parameter and now returns a dict mapping all table names to schemas. However, bigquery_tools.py line 47 calls plugin.get_schema(table_id) with a parameter, which will fail with TypeError: get_schema() takes 1 positional argument but 2 were given. This is not specific to Redshift—both the Redshift and BigQuery plugin implementations conform to the updated signature, but bigquery_tools.py must be updated to either extract the specific table from the returned dict or refactor its logic to work with the full schema.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/server/plugins/datasource/datasource/redshift/__init__.py` around
lines 24 - 29, bigquery_tools.py currently calls plugin.get_schema(table_id) but
get_schema(self) was changed to return a dict of all table schemas; update
bigquery_tools.py to call plugin.get_schema() (no args), assign the returned
dict, then extract the schema for the desired table_id (e.g.,
schemas.get(table_id) or raise if missing) or refactor the logic to operate on
the full schema mapping; ensure you update any variable names and error handling
around plugin.get_schema and references to plugin.get_schema(table_id)
accordingly.


def get_table_names(self, **kwargs) -> list[str]:
return self.client.list_tables()

def fetch_data(
self,
table_names: List[str],
projection: str = '*',
where_clause: str = 'true',
projection: Optional[str] = '*',
where_clause: Optional[str] = 'true',
join_query: Optional[str] = None,
params: Optional[Dict[str, Any]] = None,
offset: int = 0,
limit: int = 10,
offset: Optional[int] = 0,
limit: Optional[int] = 10,
order_by: Optional[str] = None,
group_by: Optional[str] = None,
) -> List[Dict[str, Any]]:
projection_value = projection or '*'
where_clause_value = where_clause or 'true'
limit_value = limit if limit is not None else 10
offset_value = offset if offset is not None else 0

return self.client.execute_query_to_dict(
projection=projection,
projection=projection_value,
table_prefix=f'{self.db_name}.',
table_names=table_names,
where_clause=where_clause,
where_clause=where_clause_value,
join_query=join_query,
params=params,
limit=limit,
offset=offset,
limit=limit_value,
offset=offset_value,
order_by=order_by,
group_by=group_by,
)
Expand Down
1 change: 1 addition & 0 deletions wavefront/server/plugins/datasource/datasource/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ async def execute_dynamic_query(
odata_data_params: Optional[Dict[str, Any]] = None,
offset: Optional[int] = 0,
limit: Optional[int] = 100,
params: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
pass

Expand Down
Loading