-
Notifications
You must be signed in to change notification settings - Fork 35
FEAT: mssql-python driver changes to support bulk copy logging. #430
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
6838a3a
5312733
3198670
a3a05cb
b805294
da9ec9e
2819b6b
617a243
00d4f00
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2574,16 +2574,21 @@ def _bulkcopy( | |
| ValueError: If table_name is empty or parameters are invalid | ||
| RuntimeError: If connection string is not available | ||
| """ | ||
| # Fast check if logging is enabled to avoid overhead | ||
| is_logging = logger.is_debug_enabled | ||
|
|
||
| try: | ||
| import mssql_py_core | ||
| except ImportError as exc: | ||
| logger.error("_bulkcopy: Failed to import mssql_py_core module") | ||
| raise ImportError( | ||
| "Bulk copy requires the mssql_py_core library which is not installed. " | ||
| "To install, run: pip install mssql_py_core " | ||
| ) from exc | ||
|
|
||
| # Validate inputs | ||
| if not table_name or not isinstance(table_name, str): | ||
| logger.error("_bulkcopy: Invalid table_name parameter") | ||
| raise ValueError("table_name must be a non-empty string") | ||
|
|
||
| # Validate that data is iterable (but not a string or bytes, which are technically iterable) | ||
|
|
@@ -2615,6 +2620,7 @@ def _bulkcopy( | |
|
|
||
| # Get and parse connection string | ||
| if not hasattr(self.connection, "connection_str"): | ||
| logger.error("_bulkcopy: Connection string not available") | ||
| raise RuntimeError("Connection string not available for bulk copy") | ||
|
|
||
| # Use the proper connection string parser that handles braced values | ||
|
|
@@ -2624,9 +2630,11 @@ def _bulkcopy( | |
| params = parser._parse(self.connection.connection_str) | ||
|
|
||
| if not params.get("server"): | ||
| logger.error("_bulkcopy: SERVER parameter missing in connection string") | ||
| raise ValueError("SERVER parameter is required in connection string") | ||
|
|
||
| if not params.get("database"): | ||
| logger.error("_bulkcopy: DATABASE parameter missing in connection string") | ||
| raise ValueError( | ||
| "DATABASE parameter is required in connection string for bulk copy. " | ||
| "Specify the target database explicitly to avoid accidentally writing to system databases." | ||
|
|
@@ -2688,9 +2696,14 @@ def _bulkcopy( | |
| pycore_connection = None | ||
| pycore_cursor = None | ||
| try: | ||
| pycore_connection = mssql_py_core.PyCoreConnection(pycore_context) | ||
| # Only pass logger to Rust if logging is enabled (performance optimization) | ||
| pycore_connection = mssql_py_core.PyCoreConnection( | ||
| pycore_context, python_logger=logger if is_logging else None | ||
| ) | ||
| pycore_cursor = pycore_connection.cursor() | ||
|
|
||
| # Call bulkcopy with explicit keyword arguments | ||
| # The API signature: bulkcopy(table_name, data_source, batch_size=0, timeout=30, ...) | ||
| result = pycore_cursor.bulkcopy( | ||
| table_name, | ||
| iter(data), | ||
|
|
@@ -2703,8 +2716,17 @@ def _bulkcopy( | |
| keep_nulls=keep_nulls, | ||
| fire_triggers=fire_triggers, | ||
| use_internal_transaction=use_internal_transaction, | ||
| python_logger=logger if is_logging else None, # Only pass logger if enabled | ||
| ) | ||
|
|
||
| if is_logging: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What would happen if we dont have the But could we simply not do this check before logger.info and just call logger info? I am trying to understand the applicability of this check for bulk copy before calling logger.info. If its applicable here, then is there a plan to have this path everywhere else as well? |
||
| logger.info( | ||
| "_bulkcopy: Bulk copy completed successfully - rows_copied=%s, batch_count=%s, elapsed_time=%s", | ||
| result.get("rows_copied", "N/A"), | ||
| result.get("batch_count", "N/A"), | ||
| result.get("elapsed_time", "N/A"), | ||
| ) | ||
|
|
||
| return result | ||
|
|
||
| except Exception as e: | ||
|
|
@@ -2731,8 +2753,7 @@ def _bulkcopy( | |
| try: | ||
| resource.close() | ||
| except Exception as cleanup_error: | ||
| # Log cleanup errors at debug level to aid troubleshooting | ||
| # without masking the original exception | ||
| # Log cleanup errors only - aids troubleshooting without masking original exception | ||
| logger.debug( | ||
| "Failed to close bulk copy resource %s: %s", | ||
| type(resource).__name__, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -88,7 +88,7 @@ def __init__(self): | |
|
|
||
| # Create the underlying Python logger | ||
| self._logger = logging.getLogger("mssql_python") | ||
| self._logger.setLevel(logging.CRITICAL) # Disabled by default | ||
| self._logger.setLevel(logging.WARNING) # Allow WARNING and ERROR by default | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this change in behavior needed for a bulk copy logger integration? Should this change not be a part of a separate improvement to change the default level? |
||
| self._logger.propagate = False # Don't propagate to root logger | ||
|
|
||
| # Add trace ID filter (injects thread_id into every log record) | ||
|
|
@@ -104,7 +104,22 @@ def __init__(self): | |
| self._handler_lock = threading.RLock() # Reentrant lock for handler operations | ||
| self._cleanup_registered = False # Track if atexit cleanup is registered | ||
|
|
||
| # Don't setup handlers yet - do it lazily when setLevel is called | ||
| # Cached level for fast checks (avoid repeated isEnabledFor calls) | ||
| self._cached_level = logging.WARNING | ||
| self._is_debug_enabled = False | ||
|
|
||
| # Set up default stderr handler for WARNING and ERROR messages | ||
| # This ensures warnings are always visible even when logging is not enabled | ||
| import sys | ||
|
|
||
| default_handler = logging.StreamHandler(sys.stderr) | ||
| default_handler.setLevel(logging.WARNING) | ||
| # Simple format for warnings - no CSV formatting needed | ||
| default_handler.setFormatter(logging.Formatter("[%(name)s] %(levelname)s: %(message)s")) | ||
| self._logger.addHandler(default_handler) | ||
| self._default_handler = default_handler # Keep reference for later removal | ||
|
|
||
| # Don't setup full handlers yet - do it lazily when setLevel is called | ||
| # This prevents creating log files when user changes output mode before enabling logging | ||
|
|
||
| def _setup_handlers(self): | ||
|
|
@@ -145,15 +160,20 @@ def _setup_handlers(self): | |
| # Custom formatter to extract source from message and format as CSV | ||
| class CSVFormatter(logging.Formatter): | ||
| def format(self, record): | ||
| # Extract source from message (e.g., [Python] or [DDBC]) | ||
| msg = record.getMessage() | ||
| if msg.startswith("[") and "]" in msg: | ||
| end_bracket = msg.index("]") | ||
| source = msg[1:end_bracket] | ||
| message = msg[end_bracket + 2 :].strip() # Skip '] ' | ||
| # Check if this is from py-core (via py_core_log method) | ||
| if hasattr(record, "funcName") and record.funcName == "py-core": | ||
| source = "py-core" | ||
| message = record.getMessage() | ||
| else: | ||
| source = "Unknown" | ||
| message = msg | ||
| # Extract source from message (e.g., [Python] or [DDBC]) | ||
| msg = record.getMessage() | ||
| if msg.startswith("[") and "]" in msg: | ||
| end_bracket = msg.index("]") | ||
| source = msg[1:end_bracket] | ||
| message = msg[end_bracket + 2 :].strip() # Skip '] ' | ||
| else: | ||
| source = "Unknown" | ||
| message = msg | ||
|
|
||
| # Format timestamp with milliseconds using period separator | ||
| timestamp = self.formatTime(record, "%Y-%m-%d %H:%M:%S") | ||
|
|
@@ -326,6 +346,44 @@ def _write_log_header(self): | |
| pass # Even stderr notification failed | ||
| # Don't crash - logging continues without header | ||
|
|
||
| def py_core_log(self, level: int, msg: str, filename: str = "cursor.rs", lineno: int = 0): | ||
| """ | ||
| Logging method for py-core (Rust/TDS) code with custom source location. | ||
|
|
||
| Args: | ||
| level: Log level (DEBUG, INFO, WARNING, ERROR) | ||
| msg: Message string (already formatted) | ||
| filename: Source filename (e.g., 'cursor.rs') | ||
| lineno: Line number in source file | ||
| """ | ||
| try: | ||
| # Fast level check using cached level (same optimization as _log method) | ||
| # Exception: Always allow WARNING and ERROR messages through | ||
| if level < self._cached_level and level < logging.WARNING: | ||
| return | ||
|
|
||
| # Create a custom LogRecord with Rust source location | ||
| import logging as log_module | ||
|
|
||
| record = log_module.LogRecord( | ||
| name=self._logger.name, | ||
| level=level, | ||
| pathname=filename, | ||
| lineno=lineno, | ||
| msg=msg, | ||
| args=(), | ||
| exc_info=None, | ||
| func="py-core", | ||
| sinfo=None, | ||
| ) | ||
| self._logger.handle(record) | ||
| except Exception: | ||
| # Fallback - use regular logging | ||
| try: | ||
| self._logger.log(level, msg) | ||
| except: | ||
| pass | ||
|
|
||
| def _log(self, level: int, msg: str, add_prefix: bool = True, *args, **kwargs): | ||
| """ | ||
| Internal logging method with exception safety. | ||
|
|
@@ -352,8 +410,10 @@ def _log(self, level: int, msg: str, add_prefix: bool = True, *args, **kwargs): | |
| All other failures are silently ignored to prevent app crashes. | ||
| """ | ||
| try: | ||
| # Fast level check (zero overhead if disabled) | ||
| if not self._logger.isEnabledFor(level): | ||
| # Fast level check using cached level (zero overhead if disabled) | ||
| # This avoids the overhead of isEnabledFor() method call | ||
| # Exception: Always allow WARNING and ERROR messages through | ||
| if level < self._cached_level and level < logging.WARNING: | ||
| return | ||
|
|
||
| # Add prefix if requested (only after level check) | ||
|
|
@@ -364,8 +424,9 @@ def _log(self, level: int, msg: str, add_prefix: bool = True, *args, **kwargs): | |
| if args: | ||
| msg = msg % args | ||
|
|
||
| # Log the message (no args since already formatted) | ||
| self._logger.log(level, msg, **kwargs) | ||
| # Log the message with proper stack level to capture caller's location | ||
| # stacklevel=3 skips: _log -> debug/info/warning/error -> actual caller | ||
| self._logger.log(level, msg, stacklevel=3, **kwargs) | ||
| except Exception: | ||
| # Last resort: Try stderr fallback for any logging failure | ||
| # This helps diagnose critical issues (disk full, permission denied, etc.) | ||
|
|
@@ -441,6 +502,17 @@ def _setLevel( | |
| # Set level (atomic operation, no lock needed) | ||
| self._logger.setLevel(level) | ||
|
|
||
| # Cache level for fast checks (avoid repeated isEnabledFor calls) | ||
subrata-ms marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # Note: These updates are not atomic across both variables, creating a brief | ||
| # window where reads might see inconsistent state (e.g., updated _cached_level | ||
| # but old _is_debug_enabled). This is an acceptable benign race condition: | ||
| # - Worst case: one log statement might be incorrectly allowed/blocked | ||
| # - Duration: nanoseconds (single Python bytecode instruction gap) | ||
| # - Impact: negligible - next check will see consistent state | ||
| # - Alternative (locking) would add overhead to every log call | ||
| self._cached_level = level | ||
| self._is_debug_enabled = level <= logging.DEBUG | ||
|
|
||
| # Notify C++ bridge of level change | ||
| self._notify_cpp_level_change(level) | ||
|
|
||
|
|
@@ -546,6 +618,11 @@ def level(self) -> int: | |
| """Get the current logging level""" | ||
| return self._logger.level | ||
|
|
||
| @property | ||
| def is_debug_enabled(self) -> bool: | ||
| """Fast check if debug logging is enabled (cached for performance)""" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the performance hit noticeable ? I am wondering the gains that you saw? In logging the overhead due to I/O seems large enough that this optimization may or may not make sense. I will let this be a team call though, however I am interested in knowing any impactful perf improvements. Else I would err on the side of a simpler code. |
||
| return self._is_debug_enabled | ||
|
|
||
|
|
||
subrata-ms marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # ============================================================================ | ||
| # Module-level exports (Primary API) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
is_logging_enabledwould be a better variable name thanis_logginghere