feat(flo-cloud): add Azure Synapse Analytics datasource provider#261
feat(flo-cloud): add Azure Synapse Analytics datasource provider#261rootflo-hardik wants to merge 5 commits intodevelopfrom
Conversation
Implements Azure Synapse Analytics support across two layers: - flo_cloud/azure/synapse.py: SynapseClient using pyodbc with named param conversion (@name → ?), SQL Server OFFSET/FETCH pagination, and support for both Dedicated and Serverless SQL Pool endpoints (distinguished only by host URL) - datasource/synapse/: SynapsePlugin + SynapseConfig wiring the client into the DataSourceABC interface, with T-SQL boolean literal normalisation (TRUE → 1=1) - DatasourcePlugin factory updated to handle DataSourceType.AZURE_SYNAPSE System-level dependencies required in deployment environments: - unixodbc - msodbcsql18 — ODBC Driver 18 for SQL Server
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds Azure Synapse support: new pyodbc-backed SynapseClient, Synapse datasource plugin and config, wiring into datasource routing/controllers/services, and a pyodbc dependency. Changes
Sequence DiagramsequenceDiagram
participant Client as Datasource Consumer
participant Router as DatasourcePlugin/Controller
participant Plugin as SynapsePlugin
participant ClientLib as SynapseClient
participant DB as Azure Synapse
Client->>Router: fetch_data / execute_dynamic_query(...)
Router->>Plugin: fetch_data / execute_dynamic_query(...)
Plugin->>ClientLib: build query (placeholders, joins, pagination)
ClientLib->>ClientLib: _build_connection_string(), _convert_named_params()
ClientLib->>DB: open ODBC connection (pyodbc) & execute SQL
DB-->>ClientLib: result rows
ClientLib->>Plugin: rows as List[Dict]
Plugin->>Router: aggregated results per query id
Router->>Client: return data
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@wavefront/server/packages/flo_cloud/flo_cloud/azure/synapse.py`:
- Around line 30-57: The constructor of the Synapse client currently validates
host and database but not credentials, allowing user/password to be None and
producing an invalid connection string; update the __init__ (constructor) to
validate that self.user and self.password (or their env vars
AZURE_SYNAPSE_USER/AZURE_SYNAPSE_PASSWORD) are present and raise a ValueError
with a clear message if missing, so _build_connection_string can assume non-None
values when composing Uid and Pwd entries; keep the validation logic near the
existing host/database checks and include descriptive messages naming the
missing field(s).
In `@wavefront/server/plugins/datasource/datasource/__init__.py`:
- Around line 38-42: The AZURE_SYNAPSE branch is never reachable because the
controller never builds a SynapseConfig; update the datasource_controller's
create and update handlers (the methods that currently build
BigQueryConfig/RedshiftConfig) to include a branch for
DataSourceType.AZURE_SYNAPSE that constructs and validates a SynapseConfig from
the incoming payload and passes it through the same save/update flow; ensure the
same logic is added in both the create path and the update path so
DataSourceType.AZURE_SYNAPSE reaches the __init__.py branch that instantiates
SynapsePlugin.
In `@wavefront/server/plugins/datasource/datasource/synapse/__init__.py`:
- Around line 58-59: The insert_rows_json method currently silently does nothing
(pass), which will make callers think writes succeeded; replace the no-op with a
failing implementation: in insert_rows_json (in this module/class) raise a clear
exception (e.g., NotImplementedError or a custom
ReadOnlyDatasourceError/RuntimeError) with a descriptive message like "Synapse
datasource is read-only: insert_rows_json is not supported" so callers get an
explicit error instead of dropped writes; if the datasource later supports
writes, implement the proper insert logic and remove the explicit error.
- Around line 19-28: The get_table_names method currently ignores the configured
schema (self.db_name) and calls AzureSynapseClient.list_tables(), which defaults
to dbo; update get_table_names to pass the configured schema to the client
(e.g., call self.client.list_tables(schema=self.db_name) or the equivalent param
name used by AzureSynapseClient) so discovered tables use the same schema that
fetch_data prefixes with self.db_name; ensure you reference and use self.db_name
inside get_table_names rather than relying on the client's default.
- Around line 67-77: The call to execute_dynamic_query is passing positional
args in the wrong order causing limit (int) to be passed into odata_params;
change the call site that invokes execute_dynamic_query(...) to use keyword
arguments matching the signature (query=..., odata_filter=..., odata_params=...,
odata_data_filter=..., odata_data_params=..., offset=..., limit=..., params=...)
so each parameter (especially odata_params and limit) is bound correctly; locate
the function definition execute_dynamic_query and the caller currently passing
positional args (the caller around the previous PR comment) and replace the
positional invocation with the keyword-based invocation.
In `@wavefront/server/plugins/datasource/datasource/synapse/config.py`:
- Around line 5-11: SynapseConfig currently requires port but SynapseClient
assumes 1433; make port optional by giving it a default of 1433 on the
SynapseConfig class (change the port field to use a default value, e.g., port:
int = 1433) so create/update payloads that omit port will inherit the standard
SQL port; update any places that instantiate SynapseConfig without an explicit
port to rely on this default (class name: SynapseConfig, field: port, related
consumer: SynapseClient).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 98ac185e-32e2-488b-8af8-cc5f90b2681a
⛔ Files ignored due to path filters (1)
wavefront/server/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (6)
wavefront/server/packages/flo_cloud/flo_cloud/azure/__init__.pywavefront/server/packages/flo_cloud/flo_cloud/azure/synapse.pywavefront/server/packages/flo_cloud/pyproject.tomlwavefront/server/plugins/datasource/datasource/__init__.pywavefront/server/plugins/datasource/datasource/synapse/__init__.pywavefront/server/plugins/datasource/datasource/synapse/config.py
wavefront/server/plugins/datasource/datasource/synapse/__init__.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py (2)
173-207:⚠️ Potential issue | 🟠 MajorRequire a config when the datasource type changes.
This block lets a PATCH update only
type. Changing an existing datasource toAZURE_SYNAPSEwithout a new config will persist the new type but keep the old provider’s config JSON.get_datasource_config()later reconstructs the config class fromdatasource.typeinwavefront/server/modules/plugins_module/plugins_module/services/datasource_services.py:28-33, so a successful PATCH can turn a working datasource into one that no longer loads.Suggested guard
# Use provided type or keep existing type datasource_type = update_datasource_payload.type or existing_datasource.type + + if ( + update_datasource_payload.type is not None + and update_datasource_payload.type != existing_datasource.type + and update_datasource_payload.config is None + ): + return JSONResponse( + status_code=status.HTTP_400_BAD_REQUEST, + content=response_formatter.buildErrorResponse( + 'Config must be provided when changing datasource type' + ), + ) if update_datasource_payload.config is not None:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py` around lines 173 - 207, The controller allows changing update_datasource_payload.type without a corresponding config, which can leave the DB with a mismatched type+config; add a guard in the update handling that if update_datasource_payload.type is set and differs from existing_datasource.type then require update_datasource_payload.config (return a 400/validation error) before proceeding to construct payload_config, instantiate DatasourcePlugin and call test_connection; ensure you check the type-change condition before setting update_data['type'] and before calling get_datasource_config()/the code path that reconstructs config from datasource.type.
88-101:⚠️ Potential issue | 🟠 MajorReturn 400 for invalid Synapse configs instead of 500.
The new Synapse branch constructs
SynapseConfigstraight from request JSON and then immediately instantiatesDatasourcePlugin. A malformed JSON blob or a missing required Synapse field currently escapes the route as an unhandled exception, so bad client input returns a 500 instead of a validation error.Also applies to: 180-194
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py` around lines 88 - 101, When parsing client JSON into config objects (BigQueryConfig, RedshiftConfig, SynapseConfig) in datasource_controller.py before creating DatasourcePlugin, wrap the config construction in a try/except that catches Pydantic ValidationError (and TypeError/ValueError for malformed JSON) and convert it into an HTTP 400 response (e.g. raise fastapi.HTTPException(status_code=400, detail=...)) with a clear message about invalid client input; apply the same pattern to the other branch (the update flow referenced around the other block) so invalid SynapseConfig or other config validation errors return 400 instead of bubbling up as 500.
♻️ Duplicate comments (1)
wavefront/server/plugins/datasource/datasource/synapse/__init__.py (1)
58-59:⚠️ Potential issue | 🟠 MajorFail loudly for unsupported writes.
insert_rows_json()still no-ops, butwavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py:524-530treats this method as successful and returns “Inserted N rows successfully”. That silently drops every Synapse write request.Suggested fix
def insert_rows_json(self, table_name: str, data: List[Dict[str, Any]]) -> None: - pass + raise NotImplementedError( + 'Synapse datasource is read-only: insert_rows_json is not supported' + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@wavefront/server/plugins/datasource/datasource/synapse/__init__.py` around lines 58 - 59, The insert_rows_json method currently no-ops causing silent drops; change insert_rows_json in the Synapse datasource implementation to fail loudly by raising a clear exception (e.g., NotImplementedError or RuntimeError) with a descriptive message like "Synapse datasource does not support writes" so callers (e.g., the datasource controller) receive an error instead of a false success; locate the method insert_rows_json and replace the pass with raising that exception and ensure the message includes the datasource name and that writes are unsupported.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@wavefront/server/modules/plugins_module/plugins_module/services/datasource_services.py`:
- Around line 32-33: The hardcoded whitelist in check_is_valid_resource() blocks
Synapse table resource_ids even after adding the SynapseConfig branch; update
check_is_valid_resource (and the validation path used by query_datasource()) to
allow Azure Synapse resource IDs by either (a) short-circuiting validation when
datasource.type == DataSourceType.AZURE_SYNAPSE, (b) delegating validation to
the datasource plugin (call a new/existing plugin method that validates
resource_id for Synapse), or (c) extending the whitelist logic to accept Synapse
table patterns described in SynapseConfig; ensure the change is applied to the
same validation used by query_datasource() so Synapse tables can be listed and
queried.
In `@wavefront/server/packages/flo_cloud/flo_cloud/azure/synapse.py`:
- Around line 117-119: The current code builds a columns list from
cursor.description and zips rows into dicts which silently overwrites duplicate
column names; modify the block around cursor.execute(converted_query, values) /
columns to detect duplicate names (e.g., build columns = [desc[0] for desc in
cursor.description], then check for duplicates via a set comparison or count)
and raise a clear exception (ValueError or a datasource-specific error) listing
the duplicated column names (and include converted_query or a short context)
instead of returning a dict; this ensures cursor.execute/columns/row handling in
the function fails fast when duplicate column names are present rather than
dropping data.
- Around line 55-65: The ODBC connection string built in
_build_connection_string currently interpolates self.database, self.user, and
self.password directly which breaks on semicolons or braces; fix by escaping
those values before interpolation: replace any '}' in each value with '}}' and
wrap the value in braces when inserting into the connection string (e.g., use
{escaped_value}) so Database, Uid, and Pwd are safely encoded; apply the same
escaping approach to any other interpolated attributes that may contain special
chars (driver/host if configurable).
In `@wavefront/server/plugins/datasource/datasource/synapse/__init__.py`:
- Around line 104-111: The code always appends " ORDER BY (SELECT NULL) OFFSET
{offset} ROWS FETCH NEXT {limit} ROWS ONLY" to query_to_execute which breaks
valid T-SQL when the YAML query already contains ORDER BY/OFFSET/FETCH or a
trailing semicolon; update the logic around query_to_execute (the variable used
with odata_data_filter/odata_filter and the offset/limit values) to first
normalize the query (strip trailing whitespace and any trailing semicolon), then
detect existing pagination/order clauses with a case-insensitive check (e.g.,
search for "\bORDER\s+BY\b" or "\bOFFSET\b" / "\bFETCH\b"); only append the
ORDER BY ... OFFSET ... FETCH ... clause when none of those tokens are present.
Ensure you reference the same variables (query_to_execute, offset, limit) and
avoid leaving a stray semicolon before appending.
---
Outside diff comments:
In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py`:
- Around line 173-207: The controller allows changing
update_datasource_payload.type without a corresponding config, which can leave
the DB with a mismatched type+config; add a guard in the update handling that if
update_datasource_payload.type is set and differs from existing_datasource.type
then require update_datasource_payload.config (return a 400/validation error)
before proceeding to construct payload_config, instantiate DatasourcePlugin and
call test_connection; ensure you check the type-change condition before setting
update_data['type'] and before calling get_datasource_config()/the code path
that reconstructs config from datasource.type.
- Around line 88-101: When parsing client JSON into config objects
(BigQueryConfig, RedshiftConfig, SynapseConfig) in datasource_controller.py
before creating DatasourcePlugin, wrap the config construction in a try/except
that catches Pydantic ValidationError (and TypeError/ValueError for malformed
JSON) and convert it into an HTTP 400 response (e.g. raise
fastapi.HTTPException(status_code=400, detail=...)) with a clear message about
invalid client input; apply the same pattern to the other branch (the update
flow referenced around the other block) so invalid SynapseConfig or other config
validation errors return 400 instead of bubbling up as 500.
---
Duplicate comments:
In `@wavefront/server/plugins/datasource/datasource/synapse/__init__.py`:
- Around line 58-59: The insert_rows_json method currently no-ops causing silent
drops; change insert_rows_json in the Synapse datasource implementation to fail
loudly by raising a clear exception (e.g., NotImplementedError or RuntimeError)
with a descriptive message like "Synapse datasource does not support writes" so
callers (e.g., the datasource controller) receive an error instead of a false
success; locate the method insert_rows_json and replace the pass with raising
that exception and ensure the message includes the datasource name and that
writes are unsupported.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3e8d5d43-22e7-45a5-8cac-0fc0fe2c4a79
📒 Files selected for processing (6)
wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.pywavefront/server/modules/plugins_module/plugins_module/services/datasource_services.pywavefront/server/packages/flo_cloud/flo_cloud/azure/synapse.pywavefront/server/plugins/datasource/datasource/__init__.pywavefront/server/plugins/datasource/datasource/synapse/__init__.pywavefront/server/plugins/datasource/datasource/synapse/config.py
✅ Files skipped from review due to trivial changes (1)
- wavefront/server/plugins/datasource/datasource/synapse/config.py
🚧 Files skipped from review as they are similar to previous changes (1)
- wavefront/server/plugins/datasource/datasource/init.py
wavefront/server/modules/plugins_module/plugins_module/services/datasource_services.py
Show resolved
Hide resolved
wavefront/server/plugins/datasource/datasource/synapse/__init__.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (2)
wavefront/server/plugins/datasource/datasource/synapse/__init__.py (2)
58-59:⚠️ Potential issue | 🟠 MajorFail loudly for unsupported writes.
passmakesinsert_rows_json()look successful while dropping every row. If this datasource is intentionally read-only, raise a clearNotImplementedErrorso callers can handle it explicitly instead of assuming the write worked.Possible fix
def insert_rows_json(self, table_name: str, data: List[Dict[str, Any]]) -> None: - pass + raise NotImplementedError( + 'Synapse datasource is read-only: insert_rows_json is not supported' + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@wavefront/server/plugins/datasource/datasource/synapse/__init__.py` around lines 58 - 59, The current insert_rows_json method silently drops writes because it is a no-op; replace the pass with a clear failure by raising NotImplementedError in insert_rows_json(self, table_name: str, data: List[Dict[str, Any]]) so callers know writes are unsupported (and optionally include a message like "Synapse datasource is read-only: insert_rows_json not implemented"); update any docstring/comments on the method to state it is intentionally read-only so behavior is explicit to maintainers and callers.
110-116:⚠️ Potential issue | 🟠 MajorDon't append pagination blindly.
The plain
' order by 'substring check misses multiline/tabbedORDER BYclauses, and it still appends a secondOFFSET/FETCHwhen the template already paginates. That makes valid Synapse SQL fail at runtime. DetectORDER BY/OFFSET/FETCHwith a case-insensitive regex and only append pagination when those clauses are absent.#!/bin/bash python <<'PY' samples = [ "SELECT * FROM t ORDER BY id", "SELECT * FROM t\nORDER BY id", "SELECT * FROM t ORDER BY id OFFSET 0 ROWS FETCH NEXT 10 ROWS ONLY;", ] for sample in samples: query_to_execute = sample.rstrip().rstrip(';') if ' order by ' in query_to_execute.lower(): output = ( query_to_execute + ' OFFSET 20 ROWS FETCH NEXT 10 ROWS ONLY' ) else: output = ( query_to_execute + ' ORDER BY (SELECT NULL) OFFSET 20 ROWS FETCH NEXT 10 ROWS ONLY' ) print(f"INPUT : {sample!r}") print(f"OUTPUT: {output!r}") print() PY🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@wavefront/server/plugins/datasource/datasource/synapse/__init__.py` around lines 110 - 116, The current logic appends pagination based on a plain substring (' order by ') which misses multiline/tabbed "ORDER BY" and blindly adds OFFSET/FETCH even if the template already includes pagination; update the code that builds query_to_execute to use case-insensitive regex checks (e.g. search for r'\border\s+by\b' with DOTALL and IGNORECASE) and also check for existing pagination tokens like r'\boffset\b' or r'\bfetch\s+next\b' before appending; only append "ORDER BY (SELECT NULL) OFFSET {offset} ROWS FETCH NEXT {limit} ROWS ONLY" when neither an ORDER BY nor OFFSET/FETCH are present, otherwise leave query_to_execute unchanged (use the same variable names: query_to_execute, offset, limit to locate the change).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@wavefront/server/packages/flo_cloud/flo_cloud/azure/synapse.py`:
- Around line 152-177: The SQL is vulnerable because projection, table_names,
where_clause, group_by, order_by, and join_query are interpolated as raw
fragments; replace these with structured, validated inputs: in
SynapsePlugin.fetch_data and the class methods that call __get_join_query (and
the non-join branch), accept lists/objects (e.g., table_names: list, projection:
list of columns, joins: structured join descriptors, filters: paramizable
expressions) and validate each identifier against the discovered schema
(whitelist table and column names), escape or quote identifiers safely, build
WHERE/GROUP/ORDER clauses from those structured inputs while binding values via
params, and reject or sanitize any caller-supplied raw SQL fragments (remove
accept of join_query/raw clauses); apply the same changes to the other
occurrence noted so both code paths construct SQL from validated components
rather than splicing raw fragments.
In `@wavefront/server/plugins/datasource/datasource/synapse/__init__.py`:
- Around line 81-125: The current loop validates and immediately starts
background tasks, causing earlier queries to run if a later query fails; change
the logic in the method that iterates over query/query_obj so you first perform
a validation/preparation pass that: (1) verifies every query_obj has an 'id' and
all required parameter names from query_obj['parameters'] exist in the provided
params, (2) builds a prepared list of tuples containing (query_id,
query_to_execute, params_to_execute) including applying rls/filters,
trimming/trailing semicolons, and adding the OFFSET/FETCH clause; only after the
whole batch is validated and prepared, iterate over the prepared list to create
asyncio.create_task calls that call self.client.execute_query_as_dict and append
(query_id, task) to tasks so no background tasks are spawned until the entire
batch is known-good.
---
Duplicate comments:
In `@wavefront/server/plugins/datasource/datasource/synapse/__init__.py`:
- Around line 58-59: The current insert_rows_json method silently drops writes
because it is a no-op; replace the pass with a clear failure by raising
NotImplementedError in insert_rows_json(self, table_name: str, data:
List[Dict[str, Any]]) so callers know writes are unsupported (and optionally
include a message like "Synapse datasource is read-only: insert_rows_json not
implemented"); update any docstring/comments on the method to state it is
intentionally read-only so behavior is explicit to maintainers and callers.
- Around line 110-116: The current logic appends pagination based on a plain
substring (' order by ') which misses multiline/tabbed "ORDER BY" and blindly
adds OFFSET/FETCH even if the template already includes pagination; update the
code that builds query_to_execute to use case-insensitive regex checks (e.g.
search for r'\border\s+by\b' with DOTALL and IGNORECASE) and also check for
existing pagination tokens like r'\boffset\b' or r'\bfetch\s+next\b' before
appending; only append "ORDER BY (SELECT NULL) OFFSET {offset} ROWS FETCH NEXT
{limit} ROWS ONLY" when neither an ORDER BY nor OFFSET/FETCH are present,
otherwise leave query_to_execute unchanged (use the same variable names:
query_to_execute, offset, limit to locate the change).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d3ad960e-2015-43fe-a19c-e0cdc2c989db
📒 Files selected for processing (2)
wavefront/server/packages/flo_cloud/flo_cloud/azure/synapse.pywavefront/server/plugins/datasource/datasource/synapse/__init__.py
wavefront/server/plugins/datasource/datasource/synapse/__init__.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (3)
wavefront/server/plugins/datasource/datasource/synapse/__init__.py (2)
58-59:⚠️ Potential issue | 🟠 MajorFail loudly for unsupported writes.
passmakesinsert_rows_json()look successful while dropping every row. If Synapse is intentionally read-only, raise an explicit error so callers do not treat a no-op as a successful write.💡 Proposed fix
def insert_rows_json(self, table_name: str, data: List[Dict[str, Any]]) -> None: - pass + raise NotImplementedError( + 'Synapse datasource is read-only: insert_rows_json is not supported' + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@wavefront/server/plugins/datasource/datasource/synapse/__init__.py` around lines 58 - 59, Replace the no-op in insert_rows_json with an explicit exception so callers don't assume success; in the Synapse datasource class implement insert_rows_json(self, table_name: str, data: List[Dict[str, Any]]) to raise a clear exception (e.g., NotImplementedError or a custom ReadOnlyDatasourceError) with a message like "Synapse datasource is read-only; insert_rows_json is not supported" so any attempted writes fail loudly and include the function name insert_rows_json in the error context.
110-116:⚠️ Potential issue | 🟠 MajorDon't rely on a raw
' order by 'substring before appending pagination.This misses valid queries with
ORDER BYon a new line or tab-separated formatting, and it still appends a second clause when the SQL already containsOFFSET/FETCH. Those cases will fail at runtime with invalid T-SQL.💡 Proposed fix
query_to_execute = query_to_execute.rstrip().rstrip(';') - if ' order by ' in query_to_execute.lower(): - query_to_execute += ( - f' OFFSET {offset} ROWS FETCH NEXT {limit} ROWS ONLY' - ) - else: - query_to_execute += f' ORDER BY (SELECT NULL) OFFSET {offset} ROWS FETCH NEXT {limit} ROWS ONLY' + normalized_sql = f" {' '.join(query_to_execute.lower().split())} " + has_order_by = ' order by ' in normalized_sql + has_pagination = ( + ' offset ' in normalized_sql or ' fetch ' in normalized_sql + ) + + if not has_pagination: + if has_order_by: + query_to_execute += ( + f' OFFSET {offset} ROWS FETCH NEXT {limit} ROWS ONLY' + ) + else: + query_to_execute += ( + f' ORDER BY (SELECT NULL) OFFSET {offset} ROWS FETCH NEXT {limit} ROWS ONLY' + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@wavefront/server/plugins/datasource/datasource/synapse/__init__.py` around lines 110 - 116, The code currently checks for the literal substring ' order by ' which misses ORDER BY with newlines/tabs and doesn't detect pre-existing OFFSET/FETCH clauses; update the logic around query_to_execute (the block that strips semicolons and appends pagination using offset and limit) to use regex checks instead: use a case-insensitive search for r'\border\s+by\b' to detect ORDER BY regardless of whitespace/newlines/tabs, and also search for r'\boffset\b|\bfetch\b' (case-insensitive) to detect already-present pagination; only append "ORDER BY (SELECT NULL) OFFSET {offset} ROWS FETCH NEXT {limit} ROWS ONLY" if ORDER BY is absent and OFFSET/FETCH are absent, otherwise append just " OFFSET {offset} ROWS FETCH NEXT {limit} ROWS ONLY" when ORDER BY exists but OFFSET/FETCH do not.wavefront/server/packages/flo_cloud/flo_cloud/azure/synapse.py (1)
136-177:⚠️ Potential issue | 🟠 MajorStop interpolating raw SQL fragments into the final statement.
Only
paramsare bound here.projection,table_names,where_clause,join_query,group_by, andorder_byare still spliced verbatim in both query builders, so any caller-controlled fragment can change query shape or inject arbitrary SQL. Please validate identifiers/clauses against discovered schema and build these pieces from structured inputs instead of accepting raw SQL text.Also applies to: 189-223
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@wavefront/server/packages/flo_cloud/flo_cloud/azure/synapse.py` around lines 136 - 177, The execute_query_to_dict method (and its helper __get_join_query) currently splices caller-controlled SQL fragments (projection, table_names, where_clause, join_query, group_by, order_by) directly into the final query; fix this by accepting structured inputs and validating/whitelisting identifiers before composing SQL: require table_names as validated identifiers (compare to discovered schema) and build FROM/JOIN clauses from those safe names, require projection as a list of validated column names and join into a safe SELECT list, construct ORDER BY and GROUP BY only from validated column names and directions (deny raw expressions), and convert filtering into a parameterized WHERE built from key->param mappings rather than raw where_clause; keep params for values only, and raise on any unverified fragment or switch to an explicit AST-style builder used by execute_query_to_dict/__get_join_query. Ensure all identifier formatting uses safe quoting/escaping functions and that join_query is removed or replaced by structured join descriptors.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@wavefront/server/packages/flo_cloud/flo_cloud/azure/synapse.py`:
- Around line 207-212: The alias rewrite breaks schema-qualified names because
replacing "{table_name}." after inserting "qualified" turns "dbo.orders" into
"dbo.b"; fix by using a regex that only replaces standalone table name
occurrences not preceded by a dot: replace occurrences of "{table_name}." using
a negative lookbehind like r'(?<!\.)\b{table_name}\.' -> f'{alias}.' (apply to
processed_join and processed_where), and keep the JOIN replacement that inserts
f'{qualified} AS {alias}' unchanged; this ensures "dbo.orders" stays intact
while "orders.col" becomes "b.col".
- Around line 226-241: get_table_info currently selects columns from all schemas
causing columns from identically named tables in different schemas to be merged;
update get_table_info to filter INFORMATION_SCHEMA.COLUMNS by the configured
schema (use config.schema or a schema parameter) in the WHERE clause (e.g.,
WHERE TABLE_SCHEMA = :schema) and pass that schema value into
execute_query_as_dict, and also thread self.db_name (or schema) through
SynapsePlugin's get_schema
(wavefront/server/plugins/datasource/datasource/synapse/__init__.py:get_schema)
so the same schema/db name used by get_table_names is used here; ensure you
reference get_table_info and get_schema and use the existing
config.schema/self.db_name fields when calling the query to keep table metadata
scoped to the correct schema.
In `@wavefront/server/plugins/datasource/datasource/synapse/__init__.py`:
- Around line 30-45: The override fetch_data currently types where_clause as str
and calls where_clause.strip(), which breaks the DataSourceABC contract that
allows where_clause=None; change the signature to accept Optional[str] and
defensively handle None before calling strip (e.g., treat None as the
always-true predicate '1=1'), then normalize the string 'true' to '1=1' as
existing logic does; update any related uses in fetch_data and ensure the call
to self.client.execute_query_to_dict receives a valid where_clause string.
---
Duplicate comments:
In `@wavefront/server/packages/flo_cloud/flo_cloud/azure/synapse.py`:
- Around line 136-177: The execute_query_to_dict method (and its helper
__get_join_query) currently splices caller-controlled SQL fragments (projection,
table_names, where_clause, join_query, group_by, order_by) directly into the
final query; fix this by accepting structured inputs and validating/whitelisting
identifiers before composing SQL: require table_names as validated identifiers
(compare to discovered schema) and build FROM/JOIN clauses from those safe
names, require projection as a list of validated column names and join into a
safe SELECT list, construct ORDER BY and GROUP BY only from validated column
names and directions (deny raw expressions), and convert filtering into a
parameterized WHERE built from key->param mappings rather than raw where_clause;
keep params for values only, and raise on any unverified fragment or switch to
an explicit AST-style builder used by execute_query_to_dict/__get_join_query.
Ensure all identifier formatting uses safe quoting/escaping functions and that
join_query is removed or replaced by structured join descriptors.
In `@wavefront/server/plugins/datasource/datasource/synapse/__init__.py`:
- Around line 58-59: Replace the no-op in insert_rows_json with an explicit
exception so callers don't assume success; in the Synapse datasource class
implement insert_rows_json(self, table_name: str, data: List[Dict[str, Any]]) to
raise a clear exception (e.g., NotImplementedError or a custom
ReadOnlyDatasourceError) with a message like "Synapse datasource is read-only;
insert_rows_json is not supported" so any attempted writes fail loudly and
include the function name insert_rows_json in the error context.
- Around line 110-116: The code currently checks for the literal substring '
order by ' which misses ORDER BY with newlines/tabs and doesn't detect
pre-existing OFFSET/FETCH clauses; update the logic around query_to_execute (the
block that strips semicolons and appends pagination using offset and limit) to
use regex checks instead: use a case-insensitive search for r'\border\s+by\b' to
detect ORDER BY regardless of whitespace/newlines/tabs, and also search for
r'\boffset\b|\bfetch\b' (case-insensitive) to detect already-present pagination;
only append "ORDER BY (SELECT NULL) OFFSET {offset} ROWS FETCH NEXT {limit} ROWS
ONLY" if ORDER BY is absent and OFFSET/FETCH are absent, otherwise append just "
OFFSET {offset} ROWS FETCH NEXT {limit} ROWS ONLY" when ORDER BY exists but
OFFSET/FETCH do not.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 81eb291b-b622-475c-b490-beb2f76076cc
📒 Files selected for processing (2)
wavefront/server/packages/flo_cloud/flo_cloud/azure/synapse.pywavefront/server/plugins/datasource/datasource/synapse/__init__.py
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
wavefront/server/plugins/datasource/datasource/synapse/__init__.py (1)
59-60:⚠️ Potential issue | 🟠 MajorDo not silently drop writes.
This no-op implementation makes
insert_rows_json()appear successful while inserting nothing. If Synapse is intentionally read-only in this integration, fail explicitly so callers are aware.Suggested fix
def insert_rows_json(self, table_name: str, data: List[Dict[str, Any]]) -> None: - pass + raise NotImplementedError( + 'Synapse datasource does not support insert_rows_json' + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@wavefront/server/plugins/datasource/datasource/synapse/__init__.py` around lines 59 - 60, The current no-op insert_rows_json method silently drops writes; change it to fail explicitly instead: in the insert_rows_json(...) implementation raise a clear exception (e.g., NotImplementedError or a custom ReadOnlyError/PermissionError) with a message stating that Synapse writes are unsupported/read-only in this integration so callers know the operation failed; update the method body for insert_rows_json to raise that exception and ensure any callers/tests expect/handle the exception.
🧹 Nitpick comments (1)
wavefront/server/plugins/datasource/datasource/synapse/__init__.py (1)
92-93: Avoid mutating the caller'sparamsdict.Reassigning
params = {}whenNoneis fine, but be aware that if a caller passes a mutable dict, any modifications toparamsin later iterations would persist. Currently safe since you copy values toparams_to_execute, but consider using a local variable for clarity.Suggested improvement
- if params is None: - params = {} + provided_params = params or {} for key in params_key: - if key not in params: + if key not in provided_params: raise ValueError(f'Missing parameter: {key} for query {query_id}') - params_to_execute[key] = params[key] + params_to_execute[key] = provided_params[key]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@wavefront/server/plugins/datasource/datasource/synapse/__init__.py` around lines 92 - 93, Don't mutate the caller's params dict: instead of reassigning params = {} or modifying params in place, create a local copy (e.g., local_params = {} if params is None else params.copy()) and use that for subsequent work and for building params_to_execute; reference the existing params and params_to_execute variables and ensure all modifications are applied to the local copy so the original caller-provided dict is never mutated.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@wavefront/server/packages/flo_cloud/flo_cloud/azure/synapse.py`:
- Line 140: The parameter table_names: List[str] currently uses a mutable
default ([]) — change its signature to accept None (e.g., table_names:
Optional[List[str]] = None), add Optional to imports if missing, and inside the
function (the function that defines the table_names parameter) set table_names =
table_names or [] before any mutation/usage; this avoids sharing a single list
across calls while preserving type hints and behavior.
In `@wavefront/server/plugins/datasource/datasource/synapse/__init__.py`:
- Around line 68-78: Calls to execute_dynamic_query are passing six positional
args in the wrong order so values like filter/offset/limit/params get
mis-mapped; update the call sites that currently pass (yaml_query,
rls_filter_str, filter, offset, limit, params) to use explicit keyword arguments
matching the signature (query=..., odata_filter=..., odata_params=...,
odata_data_filter=..., odata_data_params=..., offset=..., limit=..., params=...)
or reorder the positional arguments to align with execute_dynamic_query(query,
odata_filter, odata_params, odata_data_filter, odata_data_params, offset, limit,
params), ensuring the string filters go to odata_filter/odata_data_filter and
dicts go to odata_params/odata_data_params and offset/limit/params are passed to
the last three parameters.
---
Duplicate comments:
In `@wavefront/server/plugins/datasource/datasource/synapse/__init__.py`:
- Around line 59-60: The current no-op insert_rows_json method silently drops
writes; change it to fail explicitly instead: in the insert_rows_json(...)
implementation raise a clear exception (e.g., NotImplementedError or a custom
ReadOnlyError/PermissionError) with a message stating that Synapse writes are
unsupported/read-only in this integration so callers know the operation failed;
update the method body for insert_rows_json to raise that exception and ensure
any callers/tests expect/handle the exception.
---
Nitpick comments:
In `@wavefront/server/plugins/datasource/datasource/synapse/__init__.py`:
- Around line 92-93: Don't mutate the caller's params dict: instead of
reassigning params = {} or modifying params in place, create a local copy (e.g.,
local_params = {} if params is None else params.copy()) and use that for
subsequent work and for building params_to_execute; reference the existing
params and params_to_execute variables and ensure all modifications are applied
to the local copy so the original caller-provided dict is never mutated.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: b42aff0f-f9ac-4dd6-9491-c9eb1577672b
📒 Files selected for processing (2)
wavefront/server/packages/flo_cloud/flo_cloud/azure/synapse.pywavefront/server/plugins/datasource/datasource/synapse/__init__.py
| self, | ||
| projection: str = '*', | ||
| table_prefix: str = '', | ||
| table_names: List[str] = [], |
There was a problem hiding this comment.
Fix mutable default argument.
Using [] as a default argument is a Python anti-pattern — the same list instance is shared across all calls, which can lead to subtle bugs if it's ever mutated.
Suggested fix
def execute_query_to_dict(
self,
projection: str = '*',
table_prefix: str = '',
- table_names: List[str] = [],
+ table_names: Optional[List[str]] = None,
where_clause: str = '1=1',
join_query: Optional[str] = None,
params: Optional[Dict[str, Any]] = None,
limit: int = 10,
offset: int = 0,
order_by: Optional[str] = None,
group_by: Optional[str] = None,
) -> List[Dict[str, Any]]:
- if not table_names:
+ if table_names is None:
+ table_names = []
+ if not table_names:
raise ValueError('At least one table name must be provided')🧰 Tools
🪛 Ruff (0.15.7)
[warning] 140-140: Do not use mutable data structures for argument defaults
Replace with None; initialize within function
(B006)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@wavefront/server/packages/flo_cloud/flo_cloud/azure/synapse.py` at line 140,
The parameter table_names: List[str] currently uses a mutable default ([]) —
change its signature to accept None (e.g., table_names: Optional[List[str]] =
None), add Optional to imports if missing, and inside the function (the function
that defines the table_names parameter) set table_names = table_names or []
before any mutation/usage; this avoids sharing a single list across calls while
preserving type hints and behavior.
There was a problem hiding this comment.
ignore for now
There was a problem hiding this comment.
@rootflo-hardik, understood — I'll skip this for now. Feel free to revisit it whenever convenient.
🧠 Learnings used
Learnt from: rootflo-hardik
Repo: rootflo/wavefront PR: 255
File: wavefront/server/packages/flo_cloud/flo_cloud/azure/key_vault.py:94-102
Timestamp: 2026-03-25T05:38:36.270Z
Learning: In the flo_cloud codebase, the FloKMS interface methods sign(message_digest: bytes) and verify(message_digest: bytes, signature: bytes) expect the caller to provide a pre-computed SHA-256 digest (not raw message data). For AWS KMS (MessageType='DIGEST'), GCP KMS (kms_v1.Digest(sha256=...)), and Azure Key Vault CryptographyClient.sign/verify (PS256), pass the digest directly and do not hash again inside provider adapters. Ensure adapters do not re-hash inputs that are already digests (e.g., callers like token_service.py should compute the SHA-256 and pass message_digest).
| async def execute_dynamic_query( | ||
| self, | ||
| query: List[Dict[str, Any]], | ||
| odata_filter: Optional[str] = None, | ||
| odata_params: Optional[Dict[str, Any]] = None, | ||
| odata_data_filter: Optional[str] = None, | ||
| odata_data_params: Optional[Dict[str, Any]] = None, | ||
| offset: Optional[int] = 0, | ||
| limit: Optional[int] = 100, | ||
| params: Optional[Dict[str, Any]] = None, | ||
| ): |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Description: Search for all call sites of execute_dynamic_query to verify argument order alignment
# Expected: All calls should use keyword arguments or match the signature order
rg -n "execute_dynamic_query\s*\(" wavefront/server/ -A 8 | head -100Repository: rootflo/wavefront
Length of output: 11089
🏁 Script executed:
# Read the synapse/__init__.py file to see the actual implementation
cat -n wavefront/server/plugins/datasource/datasource/synapse/__init__.py | head -100Repository: rootflo/wavefront
Length of output: 4250
🏁 Script executed:
# Get full context around the call sites in datasource_controller.py
sed -n '710,730p' wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py
sed -n '835,850p' wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.pyRepository: rootflo/wavefront
Length of output: 1405
🏁 Script executed:
# Check the base class definition in types.py to see the complete signature
sed -n '85,100p' wavefront/server/plugins/datasource/datasource/types.pyRepository: rootflo/wavefront
Length of output: 617
Fix argument order in execute_dynamic_query calls or switch to keyword arguments.
The calls at datasource_controller.py:715-722 and 837-844 pass 6 positional arguments in the order (yaml_query, rls_filter_str, filter, offset, limit, params), but the method signature expects (query, odata_filter, odata_params, odata_data_filter, odata_data_params, offset, limit, params). This causes arguments to map incorrectly:
filter(str) →odata_params(Dict)offset(int) →odata_data_filter(str)limit(int) →odata_data_params(Dict)params(dict) →offset(int)
Either reorder the positional arguments or refactor the calls to use keyword arguments.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@wavefront/server/plugins/datasource/datasource/synapse/__init__.py` around
lines 68 - 78, Calls to execute_dynamic_query are passing six positional args in
the wrong order so values like filter/offset/limit/params get mis-mapped; update
the call sites that currently pass (yaml_query, rls_filter_str, filter, offset,
limit, params) to use explicit keyword arguments matching the signature
(query=..., odata_filter=..., odata_params=..., odata_data_filter=...,
odata_data_params=..., offset=..., limit=..., params=...) or reorder the
positional arguments to align with execute_dynamic_query(query, odata_filter,
odata_params, odata_data_filter, odata_data_params, offset, limit, params),
ensuring the string filters go to odata_filter/odata_data_filter and dicts go to
odata_params/odata_data_params and offset/limit/params are passed to the last
three parameters.
There was a problem hiding this comment.
Not a valid finding.
The call sites in datasource_controller.py call DatasourcePlugin.execute_dynamic_query (the facade/wrapper in datasource/init.py), not SynapsePlugin.execute_dynamic_query directly. The wrapper has a different, simpler 6-parameter signature:
(query, rls_filter, filter, offset, limit, params)
The 6 positional args passed by the controller map correctly to this signature.
The wrapper itself is responsible for parsing the OData filters and then forwarding to the plugin using keyword arguments (already fixed in datasource/init.py:118-127). There is no mis-mapping at the call sites.
There was a problem hiding this comment.
@rootflo-hardik, you're right — I incorrectly conflated the two layers. The controller calls DatasourcePlugin.execute_dynamic_query (the facade), which has its own simpler 6-parameter signature, and the facade is responsible for OData parsing and forwarding to SynapsePlugin via keyword arguments. The call sites are correct. My apology for the noise.
✏️ Learnings added
Learnt from: rootflo-hardik
Repo: rootflo/wavefront PR: 261
File: wavefront/server/plugins/datasource/datasource/synapse/__init__.py:68-78
Timestamp: 2026-03-26T12:33:35.362Z
Learning: In `wavefront/server/plugins/datasource/datasource/__init__.py`, `DatasourcePlugin.execute_dynamic_query` is a facade/wrapper with a simpler 6-parameter signature `(query, rls_filter, filter, offset, limit, params)`. Controllers call this wrapper, not the individual plugin's `execute_dynamic_query` directly. The wrapper handles OData filter parsing internally and forwards to the underlying plugin (e.g., `SynapsePlugin`) using keyword arguments (lines 118-127). Do not flag controller call sites as mismatched against the plugin-level signature.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
🧠 Learnings used
Learnt from: rootflo-hardik
Repo: rootflo/wavefront PR: 261
File: wavefront/server/modules/plugins_module/plugins_module/services/datasource_services.py:32-33
Timestamp: 2026-03-26T12:18:20.601Z
Learning: In `wavefront/server/modules/plugins_module/plugins_module/services/datasource_services.py`, the `check_is_valid_resource()` function has a hardcoded whitelist of five resource IDs: `parsed_data_object`, `rf_parsed_data_object`, `rf_gold_data_object`, `rf_gold_item_details`, `rf_gold_auditor_data`. This is intentional — these are the only table resource IDs that will be queried across all datasource types, including the newly added Azure Synapse datasource. Do not flag this whitelist as incomplete or blocking.
Implements Azure Synapse Analytics support across two layers:
System-level dependencies required in deployment environments:
Environment variables (all required unless passed explicitly):
Summary by CodeRabbit