-
Notifications
You must be signed in to change notification settings - Fork 1
Improve stream id isolation in stream utilities #683
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
matdev83
wants to merge
1
commit into
dev
Choose a base branch
from
codex/fix-cross-message-data-leaks-t2y3t2
base: dev
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,26 +1,98 @@ | ||
| from __future__ import annotations | ||
|
|
||
| """Utility helpers for streaming response processors.""" | ||
|
|
||
| from uuid import uuid4 | ||
|
|
||
| from src.core.ports.streaming import StreamingContent | ||
|
|
||
|
|
||
| def get_stream_id(content: StreamingContent) -> str: | ||
| """Return a stable identifier for the current stream. | ||
|
|
||
| Processors rely on this value to keep per-stream buffers isolated. The | ||
| identifier is sourced from the chunk metadata when available. If the | ||
| upstream pipeline has not yet assigned one, a new UUID is generated and | ||
| stored back into the metadata so that subsequent processors can reuse it. | ||
| """ | ||
|
|
||
| metadata = content.metadata | ||
| stream_id = ( | ||
| metadata.get("stream_id") or metadata.get("session_id") or metadata.get("id") | ||
| ) | ||
| if not stream_id: | ||
| stream_id = uuid4().hex | ||
| metadata["stream_id"] = stream_id | ||
| return str(stream_id) | ||
| from __future__ import annotations | ||
|
|
||
| """Utility helpers for streaming response processors.""" | ||
|
|
||
| from typing import Any | ||
| from uuid import uuid4 | ||
| import threading | ||
|
|
||
| from src.core.ports.streaming import StreamingContent | ||
|
|
||
| _UNIQUE_METADATA_KEYS = ( | ||
| "stream_id", | ||
| "request_id", | ||
| "response_id", | ||
| "id", | ||
| "chunk_id", | ||
| "event_id", | ||
| ) | ||
|
|
||
| _StreamKey = tuple[str | None, str | None, str | None] | ||
|
|
||
| _fallback_lock = threading.Lock() | ||
| _active_stream_ids: dict[_StreamKey, str] = {} | ||
| _reverse_stream_keys: dict[str, _StreamKey] = {} | ||
|
|
||
|
|
||
| def _normalize_component(value: Any) -> str | None: | ||
| """Normalize arbitrary metadata values to comparable strings.""" | ||
|
|
||
| if value is None: | ||
| return None | ||
| try: | ||
| text = str(value) | ||
| except Exception: | ||
| return None | ||
| return text or None | ||
|
|
||
|
|
||
| def _build_fallback_key(metadata: dict[str, Any]) -> _StreamKey: | ||
| """Construct a key used when explicit stream identifiers are missing.""" | ||
|
|
||
| request_component = _normalize_component( | ||
| metadata.get("request_id") or metadata.get("response_id") | ||
| ) | ||
| id_component = _normalize_component( | ||
| metadata.get("id") | ||
| or metadata.get("chunk_id") | ||
| or metadata.get("event_id") | ||
| ) | ||
| session_component = _normalize_component(metadata.get("session_id")) | ||
| return (request_component, id_component, session_component) | ||
|
|
||
|
|
||
| def get_stream_id(content: StreamingContent) -> str: | ||
| """Return a stable identifier for the current stream. | ||
|
|
||
| Processors rely on this value to keep per-stream buffers isolated. The | ||
| identifier is sourced from the chunk metadata when available. If the | ||
| upstream pipeline has not yet assigned one, a new UUID is generated and | ||
| stored back into the metadata so that subsequent processors can reuse it. | ||
|
|
||
| When multiple streaming responses share the same session identifier (for | ||
| example, parallel requests from the same client), we prefer more specific | ||
| metadata such as request IDs so that each stream remains isolated. | ||
| """ | ||
|
|
||
| metadata = content.metadata | ||
| raw_stream_id = metadata.get("stream_id") | ||
| stream_id: str | None = _normalize_component(raw_stream_id) | ||
|
|
||
| if stream_id is None: | ||
| for key in _UNIQUE_METADATA_KEYS[1:]: | ||
| candidate = _normalize_component(metadata.get(key)) | ||
| if candidate: | ||
| stream_id = candidate | ||
| break | ||
|
|
||
| if stream_id is None: | ||
| fallback_key = _build_fallback_key(metadata) | ||
| if fallback_key != (None, None, None): | ||
| with _fallback_lock: | ||
| stream_id = _active_stream_ids.get(fallback_key) | ||
| if stream_id is None: | ||
| stream_id = uuid4().hex | ||
| _active_stream_ids[fallback_key] = stream_id | ||
| _reverse_stream_keys[stream_id] = fallback_key | ||
| else: | ||
| stream_id = uuid4().hex | ||
|
|
||
| metadata["stream_id"] = stream_id | ||
|
|
||
| if content.is_done or content.is_cancellation: | ||
| with _fallback_lock: | ||
| fallback_key = _reverse_stream_keys.pop(stream_id, None) | ||
| if fallback_key is not None: | ||
| _active_stream_ids.pop(fallback_key, None) | ||
|
|
||
| return stream_id | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from src.core.ports.streaming import StreamingContent | ||
| from src.core.services.streaming.stream_utils import get_stream_id | ||
|
|
||
|
|
||
| def _build_chunk( | ||
| *, | ||
| session_id: str | None = None, | ||
| request_id: str | None = None, | ||
| stream_id: str | None = None, | ||
| is_done: bool = False, | ||
| ) -> StreamingContent: | ||
| metadata: dict[str, str] = {} | ||
| if session_id is not None: | ||
| metadata["session_id"] = session_id | ||
| if request_id is not None: | ||
| metadata["request_id"] = request_id | ||
| if stream_id is not None: | ||
| metadata["stream_id"] = stream_id | ||
| return StreamingContent(content="", is_done=is_done, metadata=metadata) | ||
|
|
||
|
|
||
| def test_get_stream_id_prefers_request_over_session() -> None: | ||
| """Distinct request identifiers must yield isolated stream identifiers.""" | ||
|
|
||
| first_chunk = _build_chunk(session_id="session-1", request_id="req-a") | ||
| second_chunk = _build_chunk(session_id="session-1", request_id="req-b") | ||
|
|
||
| first_stream_id = get_stream_id(first_chunk) | ||
| second_stream_id = get_stream_id(second_chunk) | ||
|
|
||
| assert first_stream_id != second_stream_id | ||
|
|
||
| # Subsequent chunks for the same request must reuse the original identifier. | ||
| repeat_chunk = _build_chunk(session_id="session-1", request_id="req-a") | ||
| assert get_stream_id(repeat_chunk) == first_stream_id | ||
|
|
||
|
|
||
| def test_get_stream_id_releases_mapping_on_completion() -> None: | ||
| """Completing a stream should allow a fresh identifier for fallback lookups.""" | ||
|
|
||
| chunk = _build_chunk(session_id="session-42") | ||
| original_stream_id = get_stream_id(chunk) | ||
|
|
||
| completion = _build_chunk( | ||
| session_id="session-42", | ||
| stream_id=original_stream_id, | ||
| is_done=True, | ||
| ) | ||
| # Calling get_stream_id on the completion chunk should clean up state. | ||
| assert get_stream_id(completion) == original_stream_id | ||
|
|
||
| new_chunk = _build_chunk(session_id="session-42") | ||
| refreshed_stream_id = get_stream_id(new_chunk) | ||
|
|
||
| assert refreshed_stream_id != original_stream_id |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a stream starts without
stream_id, this function stores a generated UUID in_active_stream_idsand_reverse_stream_keys. If later chunks include an explicitstream_idorrequest_id, the loop above returns that explicit value, but the cleanup here uses the new identifier to pop the reverse map. The original UUID entry is never removed, so future streams that only carry the same session metadata will reuse the stale UUID and share buffers, and the dictionaries leak entries indefinitely. Consider removing or updating the fallback mapping whenever the resolvedstream_iddiffers from the recorded fallback value.Useful? React with 👍 / 👎.