-
Notifications
You must be signed in to change notification settings - Fork 1
Fix session scoping in response middlewares #681
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |
|
|
||
| import logging | ||
| from typing import Any | ||
| from uuid import uuid4 | ||
|
|
||
| from src.core.common.exceptions import LoopDetectionError | ||
| from src.core.interfaces.loop_detector_interface import ILoopDetector | ||
|
|
@@ -107,11 +108,79 @@ def __init__(self, loop_detector: ILoopDetector, priority: int = 0) -> None: | |
| self._loop_detector = loop_detector | ||
| self._accumulated_content: dict[str, str] = {} | ||
| self._priority = priority | ||
| self._anonymous_session_aliases: dict[int, str] = {} | ||
|
|
||
| @property | ||
| def priority(self) -> int: | ||
| return self._priority | ||
|
|
||
| def _resolve_session_key( | ||
| self, | ||
| session_id: str, | ||
| context: dict[str, Any] | None, | ||
| response: Any, | ||
| stop_event: Any, | ||
| ) -> tuple[str, bool]: | ||
| candidate_fields = ( | ||
| "session_id", | ||
| "stream_id", | ||
| "id", | ||
| "request_id", | ||
| "conversation_id", | ||
| "thread_id", | ||
| "message_id", | ||
| ) | ||
| if session_id: | ||
| normalized = str(session_id).strip() | ||
| if normalized: | ||
| return normalized, False | ||
| sources: list[dict[str, Any]] = [] | ||
| if isinstance(context, dict): | ||
| sources.append(context) | ||
| metadata = getattr(response, "metadata", None) | ||
| if isinstance(metadata, dict): | ||
| sources.append(metadata) | ||
| for source in sources: | ||
| for field in candidate_fields: | ||
| try: | ||
| value = source.get(field) # type: ignore[call-arg] | ||
| except AttributeError: | ||
| continue | ||
| if value is None: | ||
| continue | ||
| candidate = str(value).strip() | ||
| if candidate: | ||
| return candidate, False | ||
| if stop_event is not None: | ||
| alias = self._anonymous_session_aliases.get(id(stop_event)) | ||
| if alias is None: | ||
| alias = uuid4().hex | ||
| self._anonymous_session_aliases[id(stop_event)] = alias | ||
| return alias, False | ||
| return uuid4().hex, True | ||
|
|
||
| def _cleanup_session_state( | ||
| self, | ||
| resolved_session_id: str, | ||
| ephemeral_key: bool, | ||
| stop_event: Any, | ||
| ) -> None: | ||
| if ephemeral_key: | ||
| self._accumulated_content.pop(resolved_session_id, None) | ||
| return | ||
| if stop_event is None: | ||
| return | ||
| alias_id = id(stop_event) | ||
| alias_value = self._anonymous_session_aliases.get(alias_id) | ||
| try: | ||
| is_done = bool(stop_event.is_set()) # type: ignore[attr-defined] | ||
| except AttributeError: | ||
| is_done = False | ||
| if is_done: | ||
| if alias_value is not None: | ||
| self._accumulated_content.pop(alias_value, None) | ||
| self._anonymous_session_aliases.pop(alias_id, None) | ||
|
Comment on lines
+162
to
+182
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The new cleanup logic only removes Useful? React with 👍 / 👎. |
||
|
|
||
| async def process( | ||
| self, | ||
| response: Any, | ||
|
|
@@ -121,31 +190,45 @@ async def process( | |
| stop_event: Any = None, | ||
| ) -> Any: | ||
| """Process a response, checking for loops.""" | ||
| if not response.content: | ||
| return response | ||
| resolved_session_id, ephemeral_key = self._resolve_session_key( | ||
| session_id, context, response, stop_event | ||
| ) | ||
|
|
||
| try: | ||
| if not response.content: | ||
| return response | ||
|
|
||
| self._accumulated_content.setdefault(session_id, "") | ||
| self._accumulated_content[session_id] += response.content | ||
| content = self._accumulated_content[session_id] | ||
|
|
||
| if len(content) > 100: | ||
| loop_result = await self._loop_detector.check_for_loops(content) | ||
| if loop_result.has_loop: | ||
| error_message = f"Loop detected: The response contains repetitive content. Detected {loop_result.repetitions} repetitions." | ||
| logger.warning( | ||
| f"Loop detected in session {session_id}: {loop_result.repetitions} repetitions" | ||
| ) | ||
| raise LoopDetectionError( | ||
| message=error_message, | ||
| details={ | ||
| "repetitions": loop_result.repetitions, | ||
| "pattern": loop_result.pattern, | ||
| }, | ||
| ) | ||
| previous = self._accumulated_content.get(resolved_session_id, "") | ||
| self._accumulated_content[resolved_session_id] = previous + response.content | ||
| content = self._accumulated_content[resolved_session_id] | ||
|
|
||
| if len(content) > 100: | ||
| loop_result = await self._loop_detector.check_for_loops(content) | ||
| if loop_result.has_loop: | ||
| error_message = ( | ||
| "Loop detected: The response contains repetitive content. " | ||
| f"Detected {loop_result.repetitions} repetitions." | ||
| ) | ||
| logger.warning( | ||
| f"Loop detected in session {resolved_session_id}: {loop_result.repetitions} repetitions" | ||
| ) | ||
| raise LoopDetectionError( | ||
| message=error_message, | ||
| details={ | ||
| "repetitions": loop_result.repetitions, | ||
| "pattern": loop_result.pattern, | ||
| "session_id": resolved_session_id, | ||
| }, | ||
| ) | ||
|
|
||
| return response | ||
| return response | ||
| finally: | ||
| self._cleanup_session_state(resolved_session_id, ephemeral_key, stop_event) | ||
|
|
||
| def reset_session(self, session_id: str) -> None: | ||
| """Reset the accumulated content for a session.""" | ||
| if session_id in self._accumulated_content: | ||
| del self._accumulated_content[session_id] | ||
| for alias_id, alias_value in list(self._anonymous_session_aliases.items()): | ||
| if alias_value == session_id: | ||
| self._anonymous_session_aliases.pop(alias_id, None) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Session Cleanup Fails for Metadata-Resolved Sessions
The
_cleanup_session_statemethod fails to clean up accumulated content when sessions are resolved via metadata fields (likestream_id) rather than viastop_event. The cleanup logic checks_anonymous_session_aliasesto find which session to clean, but this dictionary is only populated whenstop_eventis used as the fallback identifier in_resolve_session_key. When a session is resolved earlier via metadata, the mapping is never created, causing accumulated content to persist indefinitely even after the stream completes, resulting in a memory leak.