Skip to content

Commit eebeba5

Browse files
author
Peng Ren
committed
Fixed something bugs
1 parent 9232e61 commit eebeba5

File tree

4 files changed

+171
-88
lines changed

4 files changed

+171
-88
lines changed

pymongosql/cursor.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# -*- coding: utf-8 -*-
22
import logging
3-
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeVar
3+
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Tuple, TypeVar
44

55
from pymongo.cursor import Cursor as MongoCursor
66
from pymongo.errors import PyMongoError
@@ -206,7 +206,7 @@ def flush(self) -> None:
206206
# For now, this is a no-op
207207
pass
208208

209-
def fetchone(self) -> Optional[Dict[str, Any]]:
209+
def fetchone(self) -> Optional[Sequence[Any]]:
210210
"""Fetch the next row from the result set"""
211211
self._check_closed()
212212

@@ -215,7 +215,7 @@ def fetchone(self) -> Optional[Dict[str, Any]]:
215215

216216
return self._result_set.fetchone()
217217

218-
def fetchmany(self, size: Optional[int] = None) -> List[Dict[str, Any]]:
218+
def fetchmany(self, size: Optional[int] = None) -> List[Sequence[Any]]:
219219
"""Fetch multiple rows from the result set"""
220220
self._check_closed()
221221

@@ -224,7 +224,7 @@ def fetchmany(self, size: Optional[int] = None) -> List[Dict[str, Any]]:
224224

225225
return self._result_set.fetchmany(size)
226226

227-
def fetchall(self) -> List[Dict[str, Any]]:
227+
def fetchall(self) -> List[Sequence[Any]]:
228228
"""Fetch all remaining rows from the result set"""
229229
self._check_closed()
230230

pymongosql/result_set.py

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# -*- coding: utf-8 -*-
22
import logging
3-
from typing import Any, Dict, List, Optional, Tuple
3+
from typing import Any, Dict, List, Optional, Sequence, Tuple
44

55
from pymongo.cursor import Cursor as MongoCursor
66
from pymongo.errors import PyMongoError
@@ -32,12 +32,12 @@ def __init__(
3232
# Extract cursor info from command result
3333
self._result_cursor = command_result.get("cursor", {})
3434
self._raw_results = self._result_cursor.get("firstBatch", [])
35-
self._cached_results: List[Dict[str, Any]] = []
35+
self._cached_results: List[Sequence[Any]] = []
3636
elif mongo_cursor is not None:
3737
self._mongo_cursor = mongo_cursor
3838
self._command_result = None
3939
self._raw_results = []
40-
self._cached_results: List[Dict[str, Any]] = []
40+
self._cached_results: List[Sequence[Any]] = []
4141
else:
4242
raise ProgrammingError("Either command_result or mongo_cursor must be provided")
4343

@@ -46,11 +46,15 @@ def __init__(
4646
self._cache_exhausted = False
4747
self._total_fetched = 0
4848
self._description: Optional[List[Tuple[str, str, None, None, None, None, None]]] = None
49+
self._column_names: Optional[List[str]] = None # Track column order for sequences
4950
self._errors: List[Dict[str, str]] = []
5051

51-
# Apply projection mapping for command results now that execution_plan is set
52+
# Process firstBatch immediately if available (after all attributes are set)
5253
if command_result is not None and self._raw_results:
53-
self._cached_results = [self._process_document(doc) for doc in self._raw_results]
54+
processed_batch = [self._process_document(doc) for doc in self._raw_results]
55+
# Convert dictionaries to sequences for DB API 2.0 compliance
56+
sequence_batch = [self._dict_to_sequence(doc) for doc in processed_batch]
57+
self._cached_results.extend(sequence_batch)
5458

5559
# Build description from projection
5660
self._build_description()
@@ -102,7 +106,9 @@ def _ensure_results_available(self, count: int = 1) -> None:
102106

103107
# Process results through projection mapping
104108
processed_batch = [self._process_document(doc) for doc in batch]
105-
self._cached_results.extend(processed_batch)
109+
# Convert dictionaries to sequences for DB API 2.0 compliance
110+
sequence_batch = [self._dict_to_sequence(doc) for doc in processed_batch]
111+
self._cached_results.extend(sequence_batch)
106112
self._total_fetched += len(batch)
107113

108114
except PyMongoError as e:
@@ -127,6 +133,15 @@ def _process_document(self, doc: Dict[str, Any]) -> Dict[str, Any]:
127133

128134
return processed
129135

136+
def _dict_to_sequence(self, doc: Dict[str, Any]) -> Tuple[Any, ...]:
137+
"""Convert document dictionary to sequence according to column order"""
138+
if self._column_names is None:
139+
# First time - establish column order
140+
self._column_names = list(doc.keys())
141+
142+
# Return values in consistent column order
143+
return tuple(doc.get(col_name) for col_name in self._column_names)
144+
130145
@property
131146
def errors(self) -> List[Dict[str, str]]:
132147
return self._errors.copy()
@@ -145,18 +160,17 @@ def description(
145160
# Try to fetch one result to build description dynamically
146161
try:
147162
self._ensure_results_available(1)
148-
if self._cached_results:
149-
# Build description from first result
150-
first_result = self._cached_results[0]
163+
if self._column_names:
164+
# Build description from established column names
151165
self._description = [
152-
(col_name, "VARCHAR", None, None, None, None, None) for col_name in first_result.keys()
166+
(col_name, "VARCHAR", None, None, None, None, None) for col_name in self._column_names
153167
]
154168
except Exception as e:
155169
_logger.warning(f"Could not build dynamic description: {e}")
156170

157171
return self._description
158172

159-
def fetchone(self) -> Optional[Dict[str, Any]]:
173+
def fetchone(self) -> Optional[Sequence[Any]]:
160174
"""Fetch the next row from the result set"""
161175
if self._is_closed:
162176
raise ProgrammingError("ResultSet is closed")
@@ -172,7 +186,7 @@ def fetchone(self) -> Optional[Dict[str, Any]]:
172186
self._rownumber = (self._rownumber or 0) + 1
173187
return result
174188

175-
def fetchmany(self, size: Optional[int] = None) -> List[Dict[str, Any]]:
189+
def fetchmany(self, size: Optional[int] = None) -> List[Sequence[Any]]:
176190
"""Fetch up to 'size' rows from the result set"""
177191
if self._is_closed:
178192
raise ProgrammingError("ResultSet is closed")
@@ -191,7 +205,7 @@ def fetchmany(self, size: Optional[int] = None) -> List[Dict[str, Any]]:
191205

192206
return results
193207

194-
def fetchall(self) -> List[Dict[str, Any]]:
208+
def fetchall(self) -> List[Sequence[Any]]:
195209
"""Fetch all remaining rows from the result set"""
196210
if self._is_closed:
197211
raise ProgrammingError("ResultSet is closed")
@@ -221,7 +235,9 @@ def fetchall(self) -> List[Dict[str, Any]]:
221235
if remaining_docs:
222236
# Process results through projection mapping
223237
processed_docs = [self._process_document(doc) for doc in remaining_docs]
224-
all_results.extend(processed_docs)
238+
# Convert dictionaries to sequences for DB API 2.0 compliance
239+
sequence_docs = [self._dict_to_sequence(doc) for doc in processed_docs]
240+
all_results.extend(sequence_docs)
225241
self._total_fetched += len(remaining_docs)
226242

227243
self._cache_exhausted = True

0 commit comments

Comments
 (0)