Skip to content

Commit f77e897

Browse files
author
Peng Ren
committed
Add retry mechanism
1 parent fe7559d commit f77e897

9 files changed

Lines changed: 355 additions & 34 deletions

File tree

README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ PyMongoSQL implements the DB API 2.0 interfaces to provide SQL-like access to Mo
4949
- **JMESPath** (JSON/Dict Path Query)
5050
- jmespath >= 1.0.0
5151

52+
- **Tenacity** (Transient Failure Retry)
53+
- tenacity >= 9.0.0
54+
5255
### Optional Dependencies
5356

5457
- **SQLAlchemy** (for ORM/Core support)
@@ -206,6 +209,22 @@ cursor.execute(
206209

207210
Parameters are substituted into the MongoDB filter during execution, providing protection against injection attacks.
208211

212+
### Retry on Transient System Errors
213+
214+
PyMongoSQL supports retrying transient, system-level MongoDB failures (for example connection timeout and reconnect errors) using Tenacity.
215+
216+
```python
217+
connection = connect(
218+
host="mongodb://localhost:27017/database",
219+
retry_enabled=True, # default: True
220+
retry_attempts=3, # default: 3
221+
retry_wait_min=0.1, # default: 0.1 seconds
222+
retry_wait_max=1.0, # default: 1.0 seconds
223+
)
224+
```
225+
226+
These options apply to connection ping checks, query/DML command execution, and paginated `getMore` fetches.
227+
209228
## Supported SQL Features
210229

211230
### SELECT Statements

pymongosql/connection.py

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from .cursor import Cursor
1414
from .error import DatabaseError, OperationalError
1515
from .helper import ConnectionHelper
16+
from .retry import RetryConfig, execute_with_retry
1617

1718
_logger = logging.getLogger(__name__)
1819

@@ -55,6 +56,10 @@ def __init__(
5556
if not self._mode and mode:
5657
self._mode = mode
5758

59+
# Retry behavior for transient system-level PyMongo failures.
60+
# These kwargs are consumed by PyMongoSQL and are not passed to MongoClient.
61+
self._retry_config = RetryConfig.from_kwargs(kwargs)
62+
5863
# Extract commonly used parameters for backward compatibility
5964
self._host = host or "localhost"
6065
self._port = port or 27017
@@ -109,7 +114,11 @@ def _connect(self) -> None:
109114
self._client = MongoClient(**self._pymongo_params)
110115

111116
# Test connection
112-
self._client.admin.command("ping")
117+
execute_with_retry(
118+
lambda: self._client.admin.command("ping"),
119+
self._retry_config,
120+
"initial MongoDB ping",
121+
)
113122

114123
# Initialize the database according to explicit parameter or client's default
115124
# This may raise OperationalError if no database could be determined; allow it to bubble up
@@ -179,6 +188,11 @@ def mode(self) -> str:
179188
"""Get the specified mode"""
180189
return self._mode
181190

191+
@property
192+
def retry_config(self) -> RetryConfig:
193+
"""Get retry configuration used for transient system-level errors."""
194+
return self._retry_config
195+
182196
def use_database(self, database_name: str) -> None:
183197
"""Switch to a different database"""
184198
if self._client is None:
@@ -332,15 +346,23 @@ def _start_session(self, **kwargs) -> ClientSession:
332346
if self._client is None:
333347
raise OperationalError("No active connection")
334348

335-
session = self._client.start_session(**kwargs)
349+
session = execute_with_retry(
350+
lambda: self._client.start_session(**kwargs),
351+
self._retry_config,
352+
"start MongoDB session",
353+
)
336354
self._session = session
337355
_logger.info("Started new MongoDB session")
338356
return session
339357

340358
def _end_session(self) -> None:
341359
"""End the current session (internal method)"""
342360
if self._session is not None:
343-
self._session.end_session()
361+
execute_with_retry(
362+
lambda: self._session.end_session(),
363+
self._retry_config,
364+
"end MongoDB session",
365+
)
344366
self._session = None
345367
_logger.info("Ended MongoDB session")
346368

@@ -357,7 +379,11 @@ def _start_transaction(self, **kwargs) -> None:
357379
if self._session is None:
358380
raise OperationalError("No active session")
359381

360-
self._session.start_transaction(**kwargs)
382+
execute_with_retry(
383+
lambda: self._session.start_transaction(**kwargs),
384+
self._retry_config,
385+
"start MongoDB transaction",
386+
)
361387
self._in_transaction = True
362388
self._autocommit = False
363389
_logger.info("Started MongoDB transaction")
@@ -370,7 +396,11 @@ def _commit_transaction(self) -> None:
370396
if not self._session.in_transaction:
371397
raise OperationalError("No active transaction to commit")
372398

373-
self._session.commit_transaction()
399+
execute_with_retry(
400+
lambda: self._session.commit_transaction(),
401+
self._retry_config,
402+
"commit MongoDB transaction",
403+
)
374404
self._in_transaction = False
375405
self._autocommit = True
376406
_logger.info("Committed MongoDB transaction")
@@ -383,7 +413,11 @@ def _abort_transaction(self) -> None:
383413
if not self._session.in_transaction:
384414
raise OperationalError("No active transaction to abort")
385415

386-
self._session.abort_transaction()
416+
execute_with_retry(
417+
lambda: self._session.abort_transaction(),
418+
self._retry_config,
419+
"abort MongoDB transaction",
420+
)
387421
self._in_transaction = False
388422
self._autocommit = True
389423
_logger.info("Aborted MongoDB transaction")
@@ -460,7 +494,11 @@ def test_connection(self) -> bool:
460494
"""Test if the connection is alive"""
461495
try:
462496
if self._client:
463-
self._client.admin.command("ping")
497+
execute_with_retry(
498+
lambda: self._client.admin.command("ping"),
499+
self._retry_config,
500+
"connection health check ping",
501+
)
464502
return True
465503
return False
466504
except Exception as e:

pymongosql/cursor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ def execute(self: _T, operation: str, parameters: Optional[Any] = None) -> _T:
110110
command_result=result,
111111
execution_plan=execution_plan_for_rs,
112112
database=self.connection.database,
113+
retry_config=self.connection.retry_config,
113114
**self._kwargs,
114115
)
115116
else:
@@ -125,6 +126,7 @@ def execute(self: _T, operation: str, parameters: Optional[Any] = None) -> _T:
125126
},
126127
execution_plan=stub_plan,
127128
database=self.connection.database,
129+
retry_config=self.connection.retry_config,
128130
**self._kwargs,
129131
)
130132
# Store the actual insert result for reference

pymongosql/executor.py

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from .error import DatabaseError, OperationalError, ProgrammingError, SqlSyntaxError
1010
from .helper import SQLHelper
11+
from .retry import execute_with_retry
1112
from .sql.delete_builder import DeleteExecutionPlan
1213
from .sql.insert_builder import InsertExecutionPlan
1314
from .sql.parser import SQLParser
@@ -17,6 +18,24 @@
1718
_logger = logging.getLogger(__name__)
1819

1920

21+
def _run_db_command(db: Any, command: Dict[str, Any], connection: Any, operation_name: str) -> Dict[str, Any]:
22+
"""Run a MongoDB command with optional transaction session and retry policy."""
23+
retry_config = getattr(connection, "retry_config", None)
24+
25+
if connection and connection.session and connection.session.in_transaction:
26+
return execute_with_retry(
27+
lambda: db.command(command, session=connection.session),
28+
retry_config,
29+
operation_name,
30+
)
31+
32+
return execute_with_retry(
33+
lambda: db.command(command),
34+
retry_config,
35+
operation_name,
36+
)
37+
38+
2039
@dataclass
2140
class ExecutionContext:
2241
"""Manages execution context for a single query"""
@@ -156,11 +175,8 @@ def _execute_find_plan(
156175

157176
_logger.debug(f"Executing MongoDB command: {find_command}")
158177

159-
# Execute find command with session if in transaction
160-
if connection and connection.session and connection.session.in_transaction:
161-
result = db.command(find_command, session=connection.session)
162-
else:
163-
result = db.command(find_command)
178+
# Execute find command with retry for transient system-level errors
179+
result = _run_db_command(db, find_command, connection, "find command")
164180

165181
# Create command result
166182
return result
@@ -214,11 +230,13 @@ def _execute_aggregate_plan(
214230
# Get collection and call aggregate()
215231
collection = db[execution_plan.collection]
216232

217-
# Execute aggregate with options
218-
cursor = collection.aggregate(pipeline, **options)
219-
220-
# Convert cursor to list
221-
results = list(cursor)
233+
# Execute aggregate with retry for transient system-level errors
234+
retry_config = getattr(connection, "retry_config", None)
235+
results = execute_with_retry(
236+
lambda: list(collection.aggregate(pipeline, **options)),
237+
retry_config,
238+
"aggregate command",
239+
)
222240

223241
# Apply additional filters if specified (from WHERE clause)
224242
if execution_plan.filter_stage:
@@ -420,11 +438,7 @@ def _execute_execution_plan(
420438

421439
_logger.debug(f"Executing MongoDB insert command: {command}")
422440

423-
# Execute with session if in transaction
424-
if connection and connection.session and connection.session.in_transaction:
425-
return db.command(command, session=connection.session)
426-
else:
427-
return db.command(command)
441+
return _run_db_command(db, command, connection, "insert command")
428442
except PyMongoError as e:
429443
_logger.error(f"MongoDB insert failed: {e}")
430444
raise DatabaseError(f"Insert execution failed: {e}")
@@ -504,11 +518,7 @@ def _execute_execution_plan(
504518

505519
_logger.debug(f"Executing MongoDB delete command: {command}")
506520

507-
# Execute with session if in transaction
508-
if connection and connection.session and connection.session.in_transaction:
509-
return db.command(command, session=connection.session)
510-
else:
511-
return db.command(command)
521+
return _run_db_command(db, command, connection, "delete command")
512522
except PyMongoError as e:
513523
_logger.error(f"MongoDB delete failed: {e}")
514524
raise DatabaseError(f"Delete execution failed: {e}")
@@ -608,11 +618,7 @@ def _execute_execution_plan(
608618

609619
_logger.debug(f"Executing MongoDB update command: {command}")
610620

611-
# Execute with session if in transaction
612-
if connection and connection.session and connection.session.in_transaction:
613-
return db.command(command, session=connection.session)
614-
else:
615-
return db.command(command)
621+
return _run_db_command(db, command, connection, "update command")
616622
except PyMongoError as e:
617623
_logger.error(f"MongoDB update failed: {e}")
618624
raise DatabaseError(f"Update execution failed: {e}")

pymongosql/result_set.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from . import STRING
1010
from .common import CursorIterator
1111
from .error import DatabaseError, ProgrammingError
12+
from .retry import RetryConfig, execute_with_retry
1213
from .sql.query_builder import QueryExecutionPlan
1314

1415
_logger = logging.getLogger(__name__)
@@ -23,6 +24,7 @@ def __init__(
2324
execution_plan: QueryExecutionPlan = None,
2425
arraysize: int = None,
2526
database: Optional[Any] = None,
27+
retry_config: Optional[RetryConfig] = None,
2628
**kwargs,
2729
) -> None:
2830
super().__init__(arraysize=arraysize or self.DEFAULT_FETCH_SIZE, **kwargs)
@@ -39,6 +41,8 @@ def __init__(
3941
else:
4042
raise ProgrammingError("command_result must be provided")
4143

44+
self._retry_config = retry_config
45+
4246
self._execution_plan = execution_plan
4347
self._is_closed = False
4448
self._cache_exhausted = False
@@ -108,7 +112,11 @@ def _ensure_results_available(self, count: int = 1) -> None:
108112
"getMore": self._cursor_id,
109113
"collection": self._execution_plan.collection,
110114
}
111-
result = self._database.command(getmore_cmd)
115+
result = execute_with_retry(
116+
lambda: self._database.command(getmore_cmd),
117+
self._retry_config,
118+
"getMore command",
119+
)
112120

113121
# Extract and process next batch
114122
cursor_info = result.get("cursor", {})

pymongosql/retry.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
# -*- coding: utf-8 -*-
2+
import logging
3+
from dataclasses import dataclass
4+
from typing import Any, Callable, Optional, Tuple, TypeVar
5+
6+
from pymongo.errors import AutoReconnect, ConnectionFailure, NetworkTimeout, PyMongoError, ServerSelectionTimeoutError
7+
from tenacity import Retrying, retry_if_exception_type, stop_after_attempt, wait_exponential
8+
9+
_logger = logging.getLogger(__name__)
10+
_T = TypeVar("_T")
11+
12+
RETRYABLE_SYSTEM_EXCEPTIONS: Tuple[type, ...] = (
13+
AutoReconnect,
14+
NetworkTimeout,
15+
ConnectionFailure,
16+
ServerSelectionTimeoutError,
17+
)
18+
19+
20+
@dataclass(frozen=True)
21+
class RetryConfig:
22+
enabled: bool = True
23+
attempts: int = 3
24+
wait_min: float = 0.1
25+
wait_max: float = 1.0
26+
27+
@classmethod
28+
def from_kwargs(cls, kwargs: dict) -> "RetryConfig":
29+
enabled = bool(kwargs.pop("retry_enabled", True))
30+
attempts = int(kwargs.pop("retry_attempts", 3))
31+
wait_min = float(kwargs.pop("retry_wait_min", 0.1))
32+
wait_max = float(kwargs.pop("retry_wait_max", 1.0))
33+
34+
if attempts < 1:
35+
attempts = 1
36+
if wait_min < 0:
37+
wait_min = 0.0
38+
if wait_max < wait_min:
39+
wait_max = wait_min
40+
41+
return cls(
42+
enabled=enabled,
43+
attempts=attempts,
44+
wait_min=wait_min,
45+
wait_max=wait_max,
46+
)
47+
48+
49+
def execute_with_retry(
50+
operation: Callable[[], _T],
51+
retry_config: Optional[RetryConfig],
52+
operation_name: str,
53+
) -> _T:
54+
"""Execute an operation with retry on transient system-level PyMongo failures."""
55+
config = retry_config or RetryConfig(enabled=False, attempts=1, wait_min=0.0, wait_max=0.0)
56+
57+
if not config.enabled or config.attempts <= 1:
58+
return operation()
59+
60+
def _before_sleep(retry_state: Any) -> None:
61+
error = retry_state.outcome.exception() if retry_state.outcome else None
62+
_logger.warning(
63+
"Retrying %s after transient error (attempt %s/%s): %s",
64+
operation_name,
65+
retry_state.attempt_number,
66+
config.attempts,
67+
error,
68+
)
69+
70+
retrying = Retrying(
71+
reraise=True,
72+
stop=stop_after_attempt(config.attempts),
73+
wait=wait_exponential(min=config.wait_min, max=config.wait_max),
74+
retry=retry_if_exception_type(RETRYABLE_SYSTEM_EXCEPTIONS),
75+
before_sleep=_before_sleep,
76+
)
77+
78+
return retrying(operation)
79+
80+
81+
def is_retryable_system_error(error: Exception) -> bool:
82+
return isinstance(error, RETRYABLE_SYSTEM_EXCEPTIONS) and isinstance(error, PyMongoError)

0 commit comments

Comments
 (0)