From 6bc90ad3c72dc75ea7b94558e3e5459264c4fea4 Mon Sep 17 00:00:00 2001 From: Krishan Patel Date: Wed, 30 Jul 2025 16:54:18 +0100 Subject: [PATCH 1/3] . --- .../database_connection_middleware.py | 1 + core/store/database.py | 37 +++++++++++++++---- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/core/api/middleware/database_connection_middleware.py b/core/api/middleware/database_connection_middleware.py index 8d179ce..88a00b0 100644 --- a/core/api/middleware/database_connection_middleware.py +++ b/core/api/middleware/database_connection_middleware.py @@ -12,6 +12,7 @@ def __init__(self, app: ASGIApp, database: Database) -> None: super().__init__(app=app) self.database = database + # NOTE(krishan711): see note in database.py about why this can cause problems with concurrent operations async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response: # NOTE(krishan711): hack to prevent running this for streaming endpoints because streaming # endpoints return a response with a generator inside it so this middleware wouldn't work diff --git a/core/store/database.py b/core/store/database.py index 42242b4..523635d 100644 --- a/core/store/database.py +++ b/core/store/database.py @@ -4,6 +4,8 @@ from collections.abc import AsyncIterator from typing import TypeVar +import sqlalchemy +from core import logging from sqlalchemy.engine import Result from sqlalchemy.ext.asyncio import AsyncConnection from sqlalchemy.ext.asyncio import AsyncEngine @@ -36,7 +38,14 @@ def __init__(self, connectionString: str) -> None: async def connect(self) -> None: if not self._engine: - self._engine = create_async_engine(self.connectionString, future=True) + self._engine = create_async_engine( + self.connectionString, + # echo_pool=True, + # hide_parameters=False, + pool_size=20, + pool_recycle=3600, + pool_pre_ping=True, + ) async def disconnect(self) -> None: if self._engine: @@ -59,18 +68,32 @@ def _get_context_connection(self) -> DatabaseConnection | None: pass return None + # NOTE(krishan711): this is a little confusing. We creaete a connection for each erquest + # but if anything inside that request wants to do parallel queries, they should create + # their own transaction using `self.database.create_transaction()`, because asyncpg (and psql) + # do not support parallel queries on the same connection. This shows up badly if there is an + # uncaught exception raised whilst parallel queries are running. + # We have the forced reconnect at the bottom just to catch for this wierd case. @contextlib.asynccontextmanager async def create_context_connection(self) -> AsyncIterator[DatabaseConnection]: if not self._engine: raise InternalServerErrorException(message='Engine has not been established. Please called collect() first.') if self._get_context_connection() is not None: raise InternalServerErrorException(message='Connection has already been established in this context.') - async with self._engine.begin() as connection: - self._connectionContext.set(connection) - try: - yield connection - finally: - self._connectionContext.set(None) + connection = None + try: + async with self._engine.begin() as connection: + self._connectionContext.set(connection) + try: + yield connection + finally: + self._connectionContext.set(None) + except sqlalchemy.exc.InterfaceError as exception: + if not "cannot perform operation: another operation is in progress" in str(exception): + raise + logging.error(f'Database connection error (likely concurrent operations): {exception}. Forcing reconnect. You MUST ensure that you are not running parallel queries on the same connection.') + await self.disconnect() + await self.connect() async def execute(self, query: TypedReturnsRows[ResultType], connection: DatabaseConnection | None = None) -> Result[ResultType]: if not self._engine: From 415391b230c4a6efa14e09c6cd835445791ed58b Mon Sep 17 00:00:00 2001 From: Krishan Patel Date: Wed, 30 Jul 2025 16:55:26 +0100 Subject: [PATCH 2/3] . --- core/store/database.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/store/database.py b/core/store/database.py index 523635d..f91f859 100644 --- a/core/store/database.py +++ b/core/store/database.py @@ -36,13 +36,13 @@ def __init__(self, connectionString: str) -> None: self._engine: AsyncEngine | None = None self._connectionContext = contextvars.ContextVar[DatabaseConnection | None]('_connectionContext') - async def connect(self) -> None: + async def connect(self, poolSize: int = 100) -> None: if not self._engine: self._engine = create_async_engine( self.connectionString, # echo_pool=True, # hide_parameters=False, - pool_size=20, + pool_size=poolSize, pool_recycle=3600, pool_pre_ping=True, ) From 3f2ed459c457d9b84672eced8309f25ba585a0ed Mon Sep 17 00:00:00 2001 From: Krishan Patel Date: Wed, 30 Jul 2025 16:58:10 +0100 Subject: [PATCH 3/3] . --- core/store/database.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/store/database.py b/core/store/database.py index f91f859..29f2365 100644 --- a/core/store/database.py +++ b/core/store/database.py @@ -5,13 +5,13 @@ from typing import TypeVar import sqlalchemy -from core import logging from sqlalchemy.engine import Result from sqlalchemy.ext.asyncio import AsyncConnection from sqlalchemy.ext.asyncio import AsyncEngine from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.sql.selectable import TypedReturnsRows +from core import logging from core.exceptions import InternalServerErrorException DatabaseConnection = AsyncConnection @@ -89,7 +89,7 @@ async def create_context_connection(self) -> AsyncIterator[DatabaseConnection]: finally: self._connectionContext.set(None) except sqlalchemy.exc.InterfaceError as exception: - if not "cannot perform operation: another operation is in progress" in str(exception): + if 'cannot perform operation: another operation is in progress' not in str(exception): raise logging.error(f'Database connection error (likely concurrent operations): {exception}. Forcing reconnect. You MUST ensure that you are not running parallel queries on the same connection.') await self.disconnect()