Skip to content

Commit a41c39c

Browse files
author
Peng Ren
committed
Superset implementation with QueryDB
1 parent 26e307e commit a41c39c

File tree

13 files changed

+1225
-76
lines changed

13 files changed

+1225
-76
lines changed

pymongosql/__init__.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,26 @@ def connect(*args, **kwargs) -> "Connection":
4242
return Connection(*args, **kwargs)
4343

4444

45+
# Register superset execution strategy for mongodb+superset:// connections
46+
def _register_superset_executor() -> None:
47+
"""Register SupersetExecution strategy for superset mode.
48+
49+
This allows the executor and cursor to be unaware of superset -
50+
the execution strategy is automatically selected based on the connection mode.
51+
"""
52+
try:
53+
from .executor import ExecutionPlanFactory
54+
from .superset_mongodb.executor import SupersetExecution
55+
56+
ExecutionPlanFactory.register_strategy(SupersetExecution())
57+
except ImportError:
58+
# Superset module not available - skip registration
59+
pass
60+
61+
62+
# Auto-register superset executor on module import
63+
_register_superset_executor()
64+
4565
# SQLAlchemy integration (optional)
4666
# For SQLAlchemy functionality, import from pymongosql.sqlalchemy_mongodb:
4767
# from pymongosql.sqlalchemy_mongodb import create_engine_url, create_engine_from_mongodb_uri

pymongosql/common.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ class BaseCursor(metaclass=ABCMeta):
1717
def __init__(
1818
self,
1919
connection: "Connection",
20+
mode: str = "standard",
2021
**kwargs,
2122
) -> None:
2223
super().__init__()
2324
self._connection = connection
25+
self.mode = mode
2426

2527
@property
2628
def connection(self) -> "Connection":

pymongosql/connection.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from .common import BaseCursor
1313
from .cursor import Cursor
1414
from .error import DatabaseError, NotSupportedError, OperationalError
15+
from .helper import ConnectionHelper
1516

1617
_logger = logging.getLogger(__name__)
1718

@@ -35,9 +36,17 @@ def __init__(
3536
to ensure full compatibility. All parameters are passed through directly
3637
to the underlying MongoClient.
3738
39+
Supports connection string patterns:
40+
- mongodb://host:port/database - Core driver (no subquery support)
41+
- mongodb+superset://host:port/database - Superset driver with subquery support
42+
3843
See PyMongo MongoClient documentation for full parameter details.
3944
https://www.mongodb.com/docs/languages/python/pymongo-driver/current/connect/mongoclient/
4045
"""
46+
# Check if connection string specifies mode
47+
connection_string = host if isinstance(host, str) else None
48+
self._mode, host = ConnectionHelper.parse_connection_string(connection_string)
49+
4150
# Extract commonly used parameters for backward compatibility
4251
self._host = host or "localhost"
4352
self._port = port or 27017
@@ -154,6 +163,11 @@ def database(self) -> Database:
154163
raise OperationalError("No database selected")
155164
return self._database
156165

166+
@property
167+
def mode(self) -> str:
168+
"""Get the specified mode"""
169+
return self._mode
170+
157171
def use_database(self, database_name: str) -> None:
158172
"""Switch to a different database"""
159173
if self._client is None:
@@ -267,6 +281,7 @@ def cursor(self, cursor: Optional[Type[BaseCursor]] = None, **kwargs) -> BaseCur
267281

268282
new_cursor = cursor(
269283
connection=self,
284+
mode=self._mode,
270285
**kwargs,
271286
)
272287
self.cursor_pool.append(new_cursor)

pymongosql/cursor.py

Lines changed: 21 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,11 @@
22
import logging
33
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Tuple, TypeVar
44

5-
from pymongo.errors import PyMongoError
6-
75
from .common import BaseCursor, CursorIterator
86
from .error import DatabaseError, OperationalError, ProgrammingError, SqlSyntaxError
7+
from .executor import ExecutionContext, ExecutionPlanFactory
98
from .result_set import DictResultSet, ResultSet
109
from .sql.builder import ExecutionPlan
11-
from .sql.parser import SQLParser
1210

1311
if TYPE_CHECKING:
1412
from .connection import Connection
@@ -22,9 +20,10 @@ class Cursor(BaseCursor, CursorIterator):
2220

2321
NO_RESULT_SET = "No result set."
2422

25-
def __init__(self, connection: "Connection", **kwargs) -> None:
23+
def __init__(self, connection: "Connection", mode: str = "standard", **kwargs) -> None:
2624
super().__init__(
2725
connection=connection,
26+
mode=mode,
2827
**kwargs,
2928
)
3029
self._kwargs = kwargs
@@ -76,74 +75,6 @@ def _check_closed(self) -> None:
7675
if self._is_closed:
7776
raise ProgrammingError("Cursor is closed")
7877

79-
def _parse_sql(self, sql: str) -> ExecutionPlan:
80-
"""Parse SQL statement and return ExecutionPlan"""
81-
try:
82-
parser = SQLParser(sql)
83-
execution_plan = parser.get_execution_plan()
84-
85-
if not execution_plan.validate():
86-
raise SqlSyntaxError("Generated query plan is invalid")
87-
88-
return execution_plan
89-
90-
except SqlSyntaxError:
91-
raise
92-
except Exception as e:
93-
_logger.error(f"SQL parsing failed: {e}")
94-
raise SqlSyntaxError(f"Failed to parse SQL: {e}")
95-
96-
def _execute_execution_plan(self, execution_plan: ExecutionPlan) -> None:
97-
"""Execute an ExecutionPlan against MongoDB using db.command"""
98-
try:
99-
# Get database
100-
if not execution_plan.collection:
101-
raise ProgrammingError("No collection specified in query")
102-
103-
db = self.connection.database
104-
105-
# Build MongoDB find command
106-
find_command = {"find": execution_plan.collection, "filter": execution_plan.filter_stage or {}}
107-
108-
# Apply projection if specified
109-
if execution_plan.projection_stage:
110-
find_command["projection"] = execution_plan.projection_stage
111-
112-
# Apply sort if specified
113-
if execution_plan.sort_stage:
114-
sort_spec = {}
115-
for sort_dict in execution_plan.sort_stage:
116-
for field, direction in sort_dict.items():
117-
sort_spec[field] = direction
118-
find_command["sort"] = sort_spec
119-
120-
# Apply skip if specified
121-
if execution_plan.skip_stage:
122-
find_command["skip"] = execution_plan.skip_stage
123-
124-
# Apply limit if specified
125-
if execution_plan.limit_stage:
126-
find_command["limit"] = execution_plan.limit_stage
127-
128-
_logger.debug(f"Executing MongoDB command: {find_command}")
129-
130-
# Execute find command directly
131-
result = db.command(find_command)
132-
133-
# Create result set from command result
134-
self._result_set = self._result_set_class(
135-
command_result=result, execution_plan=execution_plan, database=db, **self._kwargs
136-
)
137-
138-
_logger.info(f"Query executed successfully on collection '{execution_plan.collection}'")
139-
140-
except PyMongoError as e:
141-
_logger.error(f"MongoDB command execution failed: {e}")
142-
raise DatabaseError(f"Command execution failed: {e}")
143-
except Exception as e:
144-
_logger.error(f"Unexpected error during command execution: {e}")
145-
raise OperationalError(f"Command execution error: {e}")
146-
14778
def execute(self: _T, operation: str, parameters: Optional[Dict[str, Any]] = None) -> _T:
14879
"""Execute a SQL statement
14980
@@ -160,11 +91,25 @@ def execute(self: _T, operation: str, parameters: Optional[Dict[str, Any]] = Non
16091
_logger.warning("Parameter substitution not yet implemented, ignoring parameters")
16192

16293
try:
163-
# Parse SQL to ExecutionPlan
164-
self._current_execution_plan = self._parse_sql(operation)
94+
# Create execution context
95+
context = ExecutionContext(operation, self.mode)
96+
97+
# Get appropriate execution strategy
98+
strategy = ExecutionPlanFactory.get_strategy(context)
99+
100+
# Execute using selected strategy (Standard or Subquery)
101+
result = strategy.execute(context, self.connection)
165102

166-
# Execute the execution plan
167-
self._execute_execution_plan(self._current_execution_plan)
103+
# Store execution plan for reference
104+
self._current_execution_plan = strategy.execution_plan
105+
106+
# Create result set from command result
107+
self._result_set = self._result_set_class(
108+
command_result=result,
109+
execution_plan=self._current_execution_plan,
110+
database=self.connection.database,
111+
**self._kwargs,
112+
)
168113

169114
return self
170115

0 commit comments

Comments
 (0)