Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Dict, Any
from datasource.bigquery.config import BigQueryConfig
from datasource.redshift.config import RedshiftConfig
from datasource.synapse.config import SynapseConfig
from dependency_injector.wiring import inject
import json
from dependency_injector.wiring import Provide
Expand Down Expand Up @@ -90,6 +91,8 @@ async def add_datasource(
config = BigQueryConfig(**config_json)
elif add_datasource_payload.type == DataSourceType.AWS_REDSHIFT:
config = RedshiftConfig(**config_json)
elif add_datasource_payload.type == DataSourceType.AZURE_SYNAPSE:
config = SynapseConfig(**config_json)
else:
raise ValueError(f'Invalid datasource type: {add_datasource_payload.type}')

Expand Down Expand Up @@ -181,6 +184,8 @@ async def update_datasource(
config = BigQueryConfig(**payload_config)
elif datasource_type == DataSourceType.AWS_REDSHIFT:
config = RedshiftConfig(**payload_config)
elif datasource_type == DataSourceType.AZURE_SYNAPSE:
config = SynapseConfig(**payload_config)
else:
raise ValueError(f'Invalid datasource type: {datasource_type}')

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import collections
from datasource import DataSourceType, BigQueryConfig, RedshiftConfig
from datasource.synapse.config import SynapseConfig
from db_repo_module.models.datasource import Datasource
from db_repo_module.models.role import Role
from db_repo_module.repositories.sql_alchemy_repository import SQLAlchemyRepository
Expand All @@ -17,7 +18,7 @@ async def get_datasource_config(
datasource_repository: SQLAlchemyRepository[Datasource] = Depends(
Provide(PluginsContainer.datasource_repository)
),
) -> tuple[DataSourceType, BigQueryConfig | RedshiftConfig]:
) -> tuple[DataSourceType, BigQueryConfig | RedshiftConfig | SynapseConfig]:
datasource: Datasource | None = await datasource_repository.find_one(
id=datasource_id
)
Expand All @@ -28,6 +29,8 @@ async def get_datasource_config(
return DataSourceType.GCP_BIGQUERY, BigQueryConfig(**datasource.config)
elif datasource.type == DataSourceType.AWS_REDSHIFT:
return DataSourceType.AWS_REDSHIFT, RedshiftConfig(**datasource.config)
elif datasource.type == DataSourceType.AZURE_SYNAPSE:
return DataSourceType.AZURE_SYNAPSE, SynapseConfig(**datasource.config)
else:
raise ValueError(f'Invalid datasource type: {datasource.type}')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from .blob_storage import AzureBlobStorage
from .storage_queue import StorageQueue
from .key_vault import AzureKMS
from .synapse import SynapseClient

logging.getLogger('azure').setLevel(logging.WARNING)

__all__ = ['AzureBlobStorage', 'AzureKMS', 'StorageQueue']
__all__ = ['AzureBlobStorage', 'AzureKMS', 'StorageQueue', 'SynapseClient']
265 changes: 265 additions & 0 deletions wavefront/server/packages/flo_cloud/flo_cloud/azure/synapse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
import os
import re
import string
import logging
from typing import List, Dict, Any, Optional, Tuple
from contextlib import contextmanager

import pyodbc

logger = logging.getLogger(__name__)


class SynapseClient:
"""
Azure Synapse Analytics client using pyodbc.
Supports both Dedicated SQL Pools and Serverless SQL Pools —
the pool type is determined by the host URL provided.
"""

def __init__(
self,
host: Optional[str] = None,
port: int = 1433,
database: Optional[str] = None,
user: Optional[str] = None,
password: Optional[str] = None,
driver: str = 'ODBC Driver 18 for SQL Server',
timeout: int = 30,
):
self.host = host or os.getenv('AZURE_SYNAPSE_HOST')
self.port = port
self.database = database or os.getenv('AZURE_SYNAPSE_DATABASE')
self.user = user or os.getenv('AZURE_SYNAPSE_USER')
self.password = password or os.getenv('AZURE_SYNAPSE_PASSWORD')
self.driver = driver
self.timeout = timeout

if not self.host:
raise ValueError(
'Synapse host must be provided via parameter or AZURE_SYNAPSE_HOST environment variable'
)
if not self.database:
raise ValueError(
'Database must be provided via parameter or AZURE_SYNAPSE_DATABASE environment variable'
)
if not self.user:
raise ValueError(
'User must be provided via parameter or AZURE_SYNAPSE_USER environment variable'
)
if not self.password:
raise ValueError(
'Password must be provided via parameter or AZURE_SYNAPSE_PASSWORD environment variable'
)

@staticmethod
def _escape_conn_value(value: str) -> str:
"""Wrap an ODBC connection-string value in braces, escaping any literal }."""
return '{' + value.replace('}', '}}') + '}'

def _build_connection_string(self) -> str:
return (
f'DRIVER={self._escape_conn_value(self.driver)};'
f'Server={self._escape_conn_value(f"{self.host},{self.port}")};'
f'Database={self._escape_conn_value(self.database)};'
f'Uid={self._escape_conn_value(self.user)};'
f'Pwd={self._escape_conn_value(self.password)};'
f'Encrypt=yes;'
f'TrustServerCertificate=no;'
f'Connection Timeout={self.timeout};'
)

@contextmanager
def get_connection(self):
conn = None
try:
conn = pyodbc.connect(self._build_connection_string())
yield conn
except pyodbc.Error as e:
logger.error(f'Synapse connection error: {e}')
raise
except Exception as e:
logger.error(f'Unexpected error connecting to Synapse: {e}')
raise
finally:
if conn:
conn.close()

@contextmanager
def get_cursor(self):
with self.get_connection() as conn:
cursor = conn.cursor()
try:
yield cursor
finally:
cursor.close()

def _convert_named_params(
self, query: str, params: Optional[Dict[str, Any]]
) -> Tuple[str, list]:
"""Convert @name style named params to ? positional params for pyodbc."""
if not params:
return query, []

ordered_values = []

def replace_param(match: re.Match) -> str:
param_name = match.group(1)
if param_name in params:
ordered_values.append(params[param_name])
return '?'
return match.group(0)

converted_query = re.sub(r'@(\w+)', replace_param, query)
return converted_query, ordered_values

def execute_query_as_dict(
self, query: str, params: Optional[Dict[str, Any]] = None
) -> List[Dict[str, Any]]:
converted_query, values = self._convert_named_params(query, params)
with self.get_cursor() as cursor:
try:
cursor.execute(converted_query, values)
columns = [desc[0] for desc in cursor.description]
seen = set()
duplicates = {c for c in columns if c in seen or seen.add(c)}
if duplicates:
raise ValueError(
f'Duplicate column names {sorted(duplicates)!r} in query result. '
f'Query: {converted_query!r}'
)
return [dict(zip(columns, row)) for row in cursor.fetchall()]
except pyodbc.Error as e:
logger.error(f'Synapse query execution error: {e}')
raise

def execute_query_to_dict(
self,
projection: str = '*',
table_prefix: str = '',
table_names: List[str] = [],
Copy link
Copy Markdown

@coderabbitai coderabbitai bot Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fix mutable default argument.

Using [] as a default argument is a Python anti-pattern — the same list instance is shared across all calls, which can lead to subtle bugs if it's ever mutated.

Suggested fix
     def execute_query_to_dict(
         self,
         projection: str = '*',
         table_prefix: str = '',
-        table_names: List[str] = [],
+        table_names: Optional[List[str]] = None,
         where_clause: str = '1=1',
         join_query: Optional[str] = None,
         params: Optional[Dict[str, Any]] = None,
         limit: int = 10,
         offset: int = 0,
         order_by: Optional[str] = None,
         group_by: Optional[str] = None,
     ) -> List[Dict[str, Any]]:
-        if not table_names:
+        if table_names is None:
+            table_names = []
+        if not table_names:
             raise ValueError('At least one table name must be provided')
🧰 Tools
🪛 Ruff (0.15.7)

[warning] 140-140: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/server/packages/flo_cloud/flo_cloud/azure/synapse.py` at line 140,
The parameter table_names: List[str] currently uses a mutable default ([]) —
change its signature to accept None (e.g., table_names: Optional[List[str]] =
None), add Optional to imports if missing, and inside the function (the function
that defines the table_names parameter) set table_names = table_names or []
before any mutation/usage; this avoids sharing a single list across calls while
preserving type hints and behavior.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore for now

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rootflo-hardik, understood — I'll skip this for now. Feel free to revisit it whenever convenient.


🧠 Learnings used
Learnt from: rootflo-hardik
Repo: rootflo/wavefront PR: 255
File: wavefront/server/packages/flo_cloud/flo_cloud/azure/key_vault.py:94-102
Timestamp: 2026-03-25T05:38:36.270Z
Learning: In the flo_cloud codebase, the FloKMS interface methods sign(message_digest: bytes) and verify(message_digest: bytes, signature: bytes) expect the caller to provide a pre-computed SHA-256 digest (not raw message data). For AWS KMS (MessageType='DIGEST'), GCP KMS (kms_v1.Digest(sha256=...)), and Azure Key Vault CryptographyClient.sign/verify (PS256), pass the digest directly and do not hash again inside provider adapters. Ensure adapters do not re-hash inputs that are already digests (e.g., callers like token_service.py should compute the SHA-256 and pass message_digest).

where_clause: str = '1=1',
join_query: Optional[str] = None,
params: Optional[Dict[str, Any]] = None,
limit: int = 10,
offset: int = 0,
order_by: Optional[str] = None,
group_by: Optional[str] = None,
) -> List[Dict[str, Any]]:
if not table_names:
raise ValueError('At least one table name must be provided')

base_table = f'{table_prefix}{table_names[0]}'
group_by_clause = f'GROUP BY {group_by}' if group_by else ''
# SQL Server requires ORDER BY when using OFFSET/FETCH
order_by_clause = (
f'ORDER BY {order_by}' if order_by else 'ORDER BY (SELECT NULL)'
)

if join_query:
query = self.__get_join_query(
join_query,
table_names,
table_prefix,
projection,
where_clause,
limit,
offset,
order_by,
group_by=group_by,
)
else:
query = (
f'SELECT {projection} FROM {base_table} AS a '
f'WHERE {where_clause} {group_by_clause} '
f'{order_by_clause} '
f'OFFSET {offset} ROWS FETCH NEXT {limit} ROWS ONLY'
)

try:
logger.debug(f'Executing query: {query}')
return self.execute_query_as_dict(query, params)
except pyodbc.Error as e:
logger.error(f'Synapse query execution error: {e}')
raise
except Exception as e:
logger.error(f'Unexpected error executing Synapse query: {e}')
raise

def __get_join_query(
self,
join_query: str,
table_names: List[str],
table_prefix: str,
projection: str,
where_clause: str,
limit: int,
offset: int,
order_by: Optional[str] = None,
group_by: Optional[str] = None,
) -> str:
aliases = list(string.ascii_lowercase)
processed_join = join_query
processed_where = where_clause
for i, table_name in enumerate(table_names):
alias = aliases[i]
qualified = f'{table_prefix}{table_name}'
processed_join = processed_join.replace(f'{table_name}.', f'{alias}.')
processed_where = processed_where.replace(f'{table_name}.', f'{alias}.')
processed_join = processed_join.replace(
f'JOIN {table_name}',
f'JOIN {qualified} AS {alias}',
)

group_by_clause = f'GROUP BY {group_by}' if group_by else ''
order_by_clause = (
f'ORDER BY {order_by}' if order_by else 'ORDER BY (SELECT NULL)'
)
base_table = f'{table_prefix}{table_names[0]}'
return (
f'SELECT {projection} FROM {base_table} AS {aliases[0]} '
f'{processed_join} WHERE {processed_where} '
f'{group_by_clause} {order_by_clause} '
f'OFFSET {offset} ROWS FETCH NEXT {limit} ROWS ONLY'
)

def get_table_info(self, schema: str = 'dbo') -> List[Dict[str, Any]]:
query = """
SELECT
c.TABLE_NAME,
c.COLUMN_NAME,
c.DATA_TYPE,
c.CHARACTER_MAXIMUM_LENGTH,
c.NUMERIC_PRECISION,
c.NUMERIC_SCALE,
c.IS_NULLABLE,
c.COLUMN_DEFAULT,
c.ORDINAL_POSITION
FROM INFORMATION_SCHEMA.COLUMNS c
WHERE c.TABLE_SCHEMA = @schema
ORDER BY c.TABLE_NAME, c.ORDINAL_POSITION
"""
return self.execute_query_as_dict(query, {'schema': schema})

def list_tables(self, schema: str = 'dbo') -> List[str]:
query = (
'SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES '
"WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' "
'ORDER BY TABLE_NAME'
)
with self.get_cursor() as cursor:
cursor.execute(query, [schema])
return [row[0] for row in cursor.fetchall()]

def test_connection(self) -> bool:
try:
with self.get_cursor() as cursor:
cursor.execute('SELECT 1')
result = cursor.fetchone()
success = result is not None and result[0] == 1
if success:
logger.info('Synapse connection test successful')
return success
except Exception as e:
logger.error(f'Synapse connection test failed: {e}')
return False
1 change: 1 addition & 0 deletions wavefront/server/packages/flo_cloud/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies = [
"google-cloud-storage<3.0.0",
"google-cloud-pubsub>=2.28.0",
"redshift-connector>=2.1.7",
"pyodbc>=5.0.0",
]

[tool.pytest.ini_options]
Expand Down
24 changes: 15 additions & 9 deletions wavefront/server/plugins/datasource/datasource/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from .bigquery import BigQueryPlugin, BigQueryConfig
from .redshift import RedshiftPlugin, RedshiftConfig
from .synapse import SynapsePlugin, SynapseConfig
from .helper import construct_meta
from .odata_parser import ODataQueryParser

Expand All @@ -17,7 +18,7 @@ class DatasourcePlugin(DataSourceABC):
def __init__(
self,
datasource_type: DataSourceType,
config: BigQueryConfig | RedshiftConfig,
config: BigQueryConfig | RedshiftConfig | SynapseConfig,
):
self.datasource_type = datasource_type
self.config = config
Expand All @@ -34,6 +35,11 @@ def __get_datasource(self) -> DataSourceABC:
if not isinstance(self.config, BigQueryConfig):
raise ValueError(f'Invalid config type: {type(self.config)}')
return BigQueryPlugin(self.config)
elif self.datasource_type == DataSourceType.AZURE_SYNAPSE:
self.odata_parser = ODataQueryParser(type='sql', dynamic_var_char='@')
if not isinstance(self.config, SynapseConfig):
raise ValueError(f'Invalid config type: {type(self.config)}')
return SynapsePlugin(self.config)
else:
raise ValueError(f'Invalid datasource type: {self.datasource_type}')

Expand Down Expand Up @@ -110,13 +116,13 @@ async def execute_dynamic_query(
rls_filter
)
result_by_query: Dict[str, Any] = await self.datasource.execute_dynamic_query(
query,
offset,
limit,
odata_filter,
odata_params,
odata_data_filter,
odata_data_params,
params,
query=query,
odata_filter=odata_filter,
odata_params=odata_params,
odata_data_filter=odata_data_filter,
odata_data_params=odata_data_params,
offset=offset,
limit=limit,
params=params,
)
return result_by_query
Loading
Loading