From 49ec5539a633d879958b99f1cfef65883a2f5edf Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Thu, 21 May 2026 13:41:07 +0200 Subject: [PATCH] feat(litellm): Support span streaming --- sentry_sdk/integrations/litellm.py | 52 ++++-- sentry_sdk/tracing_utils.py | 9 ++ tests/integrations/litellm/test_litellm.py | 178 ++++++++++++++++++--- 3 files changed, 196 insertions(+), 43 deletions(-) diff --git a/sentry_sdk/integrations/litellm.py b/sentry_sdk/integrations/litellm.py index d9eb659f43..402676defa 100644 --- a/sentry_sdk/integrations/litellm.py +++ b/sentry_sdk/integrations/litellm.py @@ -14,6 +14,10 @@ from sentry_sdk.consts import SPANDATA from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.scope import should_send_default_pii +from sentry_sdk.tracing_utils import ( + has_span_streaming_enabled, + should_truncate_gen_ai_input, +) from sentry_sdk.utils import event_from_exception if TYPE_CHECKING: @@ -68,7 +72,8 @@ def _convert_message_parts(messages: "List[Dict[str, Any]]") -> "List[Dict[str, def _input_callback(kwargs: "Dict[str, Any]") -> None: """Handle the start of a request.""" - integration = sentry_sdk.get_client().get_integration(LiteLLMIntegration) + client = sentry_sdk.get_client() + integration = client.get_integration(LiteLLMIntegration) if integration is None: return @@ -88,16 +93,29 @@ def _input_callback(kwargs: "Dict[str, Any]") -> None: operation = "chat" # Start a new span/transaction - span = get_start_span_function()( - op=( - consts.OP.GEN_AI_CHAT - if operation == "chat" - else consts.OP.GEN_AI_EMBEDDINGS - ), - name=f"{operation} {model}", - origin=LiteLLMIntegration.origin, - ) - span.__enter__() + if has_span_streaming_enabled(client.options): + span = sentry_sdk.traces.start_span( + name=f"{operation} {model}", + attributes={ + "sentry.op": ( + consts.OP.GEN_AI_CHAT + if operation == "chat" + else consts.OP.GEN_AI_EMBEDDINGS + ), + "sentry.origin": LiteLLMIntegration.origin, + }, + ) + else: + span = get_start_span_function()( + op=( + consts.OP.GEN_AI_CHAT + if operation == "chat" + else consts.OP.GEN_AI_EMBEDDINGS + ), + name=f"{operation} {model}", + origin=LiteLLMIntegration.origin, + ) + span.__enter__() # Store span for later _get_metadata_dict(kwargs)["_sentry_span"] = span @@ -121,9 +139,9 @@ def _input_callback(kwargs: "Dict[str, Any]") -> None: ) client = sentry_sdk.get_client() messages_data = ( - input_list - if client.options.get("stream_gen_ai_spans", False) - else truncate_and_annotate_embedding_inputs(input_list, span, scope) + truncate_and_annotate_embedding_inputs(input_list, span, scope) + if should_truncate_gen_ai_input(client.options) + else input_list ) if messages_data is not None: set_data_normalized( @@ -140,9 +158,9 @@ def _input_callback(kwargs: "Dict[str, Any]") -> None: scope = sentry_sdk.get_current_scope() messages = _convert_message_parts(messages) messages_data = ( - messages - if client.options.get("stream_gen_ai_spans", False) - else truncate_and_annotate_messages(messages, span, scope) + truncate_and_annotate_messages(messages, span, scope) + if should_truncate_gen_ai_input(client.options) + else messages ) if messages_data is not None: set_data_normalized( diff --git a/sentry_sdk/tracing_utils.py b/sentry_sdk/tracing_utils.py index e6fc8770d6..822114628a 100644 --- a/sentry_sdk/tracing_utils.py +++ b/sentry_sdk/tracing_utils.py @@ -116,6 +116,15 @@ def has_span_streaming_enabled(options: "Optional[dict[str, Any]]") -> bool: return (options.get("_experiments") or {}).get("trace_lifecycle") == "stream" +def should_truncate_gen_ai_input(options: "Optional[dict[str, Any]]") -> bool: + if options is None: + return True + + return not options.get( + "stream_gen_ai_spans", False + ) and not has_span_streaming_enabled(options) + + @contextlib.contextmanager def record_sql_queries( cursor: "Any", diff --git a/tests/integrations/litellm/test_litellm.py b/tests/integrations/litellm/test_litellm.py index c159604bb0..5539b5d2d3 100644 --- a/tests/integrations/litellm/test_litellm.py +++ b/tests/integrations/litellm/test_litellm.py @@ -7,6 +7,8 @@ import pytest +import sentry_sdk + try: from unittest.mock import AsyncMock except ImportError: @@ -132,6 +134,7 @@ def __init__( self.created = 1234567890 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize( "send_default_pii, include_prompts", @@ -152,12 +155,14 @@ def test_nonstreaming_chat_completion( get_model_response, nonstreaming_chat_completions_model_response, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[LiteLLMIntegration(include_prompts=include_prompts)], traces_sample_rate=1.0, send_default_pii=send_default_pii, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) messages = [ @@ -183,7 +188,7 @@ def test_nonstreaming_chat_completion( request_headers={"X-Stainless-Raw-Response": "true"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with mock.patch.object( @@ -202,6 +207,7 @@ def test_nonstreaming_chat_completion( (event,) = (item.payload for item in items if item.type == "transaction") assert event["transaction"] == "litellm test" + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] chat_spans = list( x @@ -284,6 +290,7 @@ def test_nonstreaming_chat_completion( assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 30 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.asyncio(loop_scope="session") @pytest.mark.parametrize( @@ -304,12 +311,14 @@ async def test_async_nonstreaming_chat_completion( get_model_response, nonstreaming_chat_completions_model_response, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[LiteLLMIntegration(include_prompts=include_prompts)], traces_sample_rate=1.0, send_default_pii=send_default_pii, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) messages = [ @@ -335,7 +344,7 @@ async def test_async_nonstreaming_chat_completion( request_headers={"X-Stainless-Raw-Response": "true"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with mock.patch.object( @@ -355,6 +364,7 @@ async def test_async_nonstreaming_chat_completion( (event,) = (item.payload for item in items if item.type == "transaction") assert event["transaction"] == "litellm test" + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] chat_spans = list( x @@ -438,6 +448,7 @@ async def test_async_nonstreaming_chat_completion( assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 30 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize( "send_default_pii, include_prompts", @@ -459,12 +470,14 @@ def test_streaming_chat_completion( server_side_event_chunks, streaming_chat_completions_model_response, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[LiteLLMIntegration(include_prompts=include_prompts)], traces_sample_rate=1.0, send_default_pii=send_default_pii, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) messages = [ @@ -482,7 +495,7 @@ def test_streaming_chat_completion( request_headers={"X-Stainless-Raw-Response": "true"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") with mock.patch.object( @@ -501,6 +514,7 @@ def test_streaming_chat_completion( streaming_handler.executor.shutdown(wait=True) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] chat_spans = list( x @@ -548,6 +562,7 @@ def test_streaming_chat_completion( assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.asyncio(loop_scope="session") @pytest.mark.parametrize( @@ -570,12 +585,14 @@ async def test_async_streaming_chat_completion( server_side_event_chunks, streaming_chat_completions_model_response, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[LiteLLMIntegration(include_prompts=include_prompts)], traces_sample_rate=1.0, send_default_pii=send_default_pii, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) messages = [ @@ -595,7 +612,7 @@ async def test_async_streaming_chat_completion( request_headers={"X-Stainless-Raw-Response": "true"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with mock.patch.object( @@ -615,6 +632,7 @@ async def test_async_streaming_chat_completion( await GLOBAL_LOGGING_WORKER.flush() await asyncio.sleep(0.5) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] chat_spans = list( x @@ -663,6 +681,7 @@ async def test_async_streaming_chat_completion( assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_embeddings_create( sentry_init, @@ -672,6 +691,7 @@ def test_embeddings_create( openai_embedding_model_response, clear_litellm_cache, stream_gen_ai_spans, + span_streaming, ): """ Test that litellm.embedding() calls are properly instrumented. @@ -684,6 +704,7 @@ def test_embeddings_create( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) client = OpenAI(api_key="test-key") @@ -694,7 +715,7 @@ def test_embeddings_create( request_headers={"X-Stainless-Raw-Response": "true"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with mock.patch.object( @@ -713,6 +734,7 @@ def test_embeddings_create( # Response is processed by litellm, so just check it exists assert response is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] spans = list( x @@ -778,6 +800,7 @@ def test_embeddings_create( assert json.loads(embeddings_input) == ["Hello, world!"] +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.asyncio(loop_scope="session") async def test_async_embeddings_create( @@ -788,6 +811,7 @@ async def test_async_embeddings_create( openai_embedding_model_response, clear_litellm_cache, stream_gen_ai_spans, + span_streaming, ): """ Test that litellm.embedding() calls are properly instrumented. @@ -800,6 +824,7 @@ async def test_async_embeddings_create( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) client = AsyncOpenAI(api_key="test-key") @@ -810,7 +835,7 @@ async def test_async_embeddings_create( request_headers={"X-Stainless-Raw-Response": "true"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with mock.patch.object( @@ -830,6 +855,7 @@ async def test_async_embeddings_create( # Response is processed by litellm, so just check it exists assert response is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] spans = list( x @@ -896,6 +922,7 @@ async def test_async_embeddings_create( assert json.loads(embeddings_input) == ["Hello, world!"] +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_embeddings_create_with_list_input( sentry_init, @@ -905,6 +932,7 @@ def test_embeddings_create_with_list_input( openai_embedding_model_response, clear_litellm_cache, stream_gen_ai_spans, + span_streaming, ): """Test embedding with list input.""" sentry_init( @@ -912,6 +940,7 @@ def test_embeddings_create_with_list_input( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) client = OpenAI(api_key="test-key") @@ -922,7 +951,7 @@ def test_embeddings_create_with_list_input( request_headers={"X-Stainless-Raw-Response": "true"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with mock.patch.object( @@ -942,6 +971,7 @@ def test_embeddings_create_with_list_input( # Response is processed by litellm, so just check it exists assert response is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] spans = list( x @@ -1004,6 +1034,7 @@ def test_embeddings_create_with_list_input( ] +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.asyncio(loop_scope="session") async def test_async_embeddings_create_with_list_input( @@ -1014,6 +1045,7 @@ async def test_async_embeddings_create_with_list_input( openai_embedding_model_response, clear_litellm_cache, stream_gen_ai_spans, + span_streaming, ): """Test embedding with list input.""" sentry_init( @@ -1021,6 +1053,7 @@ async def test_async_embeddings_create_with_list_input( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) client = AsyncOpenAI(api_key="test-key") @@ -1031,7 +1064,7 @@ async def test_async_embeddings_create_with_list_input( request_headers={"X-Stainless-Raw-Response": "true"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with mock.patch.object( @@ -1051,6 +1084,7 @@ async def test_async_embeddings_create_with_list_input( # Response is processed by litellm, so just check it exists assert response is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] spans = list( x @@ -1114,6 +1148,7 @@ async def test_async_embeddings_create_with_list_input( ] +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_embeddings_no_pii( sentry_init, @@ -1123,6 +1158,7 @@ def test_embeddings_no_pii( openai_embedding_model_response, clear_litellm_cache, stream_gen_ai_spans, + span_streaming, ): """Test that PII is not captured when disabled.""" sentry_init( @@ -1130,6 +1166,7 @@ def test_embeddings_no_pii( traces_sample_rate=1.0, send_default_pii=False, # PII disabled stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) client = OpenAI(api_key="test-key") @@ -1140,7 +1177,7 @@ def test_embeddings_no_pii( request_headers={"X-Stainless-Raw-Response": "true"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with mock.patch.object( @@ -1159,6 +1196,7 @@ def test_embeddings_no_pii( # Response is processed by litellm, so just check it exists assert response is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] spans = list( x @@ -1208,6 +1246,7 @@ def test_embeddings_no_pii( assert SPANDATA.GEN_AI_EMBEDDINGS_INPUT not in span["data"] +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.asyncio(loop_scope="session") async def test_async_embeddings_no_pii( @@ -1218,6 +1257,7 @@ async def test_async_embeddings_no_pii( openai_embedding_model_response, clear_litellm_cache, stream_gen_ai_spans, + span_streaming, ): """Test that PII is not captured when disabled.""" sentry_init( @@ -1225,6 +1265,7 @@ async def test_async_embeddings_no_pii( traces_sample_rate=1.0, send_default_pii=False, # PII disabled stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) client = AsyncOpenAI(api_key="test-key") @@ -1235,7 +1276,7 @@ async def test_async_embeddings_no_pii( request_headers={"X-Stainless-Raw-Response": "true"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with mock.patch.object( @@ -1255,6 +1296,7 @@ async def test_async_embeddings_no_pii( # Response is processed by litellm, so just check it exists assert response is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] spans = list( x @@ -1305,6 +1347,7 @@ async def test_async_embeddings_no_pii( assert SPANDATA.GEN_AI_EMBEDDINGS_INPUT not in span["data"] +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_exception_handling( reset_litellm_executor, @@ -1313,11 +1356,13 @@ def test_exception_handling( capture_items, get_rate_limit_model_response, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[LiteLLMIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) messages = [{"role": "user", "content": "Hello!"}] @@ -1326,7 +1371,7 @@ def test_exception_handling( model_response = get_rate_limit_model_response() - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("event") with mock.patch.object( @@ -1371,6 +1416,7 @@ def test_exception_handling( assert len(error_events) == 1 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.asyncio(loop_scope="session") async def test_async_exception_handling( @@ -1379,11 +1425,13 @@ async def test_async_exception_handling( capture_items, get_rate_limit_model_response, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[LiteLLMIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) messages = [{"role": "user", "content": "Hello!"}] @@ -1392,7 +1440,7 @@ async def test_async_exception_handling( model_response = get_rate_limit_model_response() - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("event") with mock.patch.object( @@ -1437,6 +1485,7 @@ async def test_async_exception_handling( assert len(error_events) == 1 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_span_origin( reset_litellm_executor, @@ -1446,11 +1495,13 @@ def test_span_origin( get_model_response, nonstreaming_chat_completions_model_response, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[LiteLLMIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) messages = [{"role": "user", "content": "Hello!"}] @@ -1472,8 +1523,30 @@ def test_span_origin( serialize_pydantic=True, request_headers={"X-Stainless-Raw-Response": "true"}, ) + if span_streaming: + items = capture_items("transaction", "span") - if stream_gen_ai_spans: + with mock.patch.object( + client.completions._client._client, + "send", + return_value=model_response, + ), start_transaction(name="litellm test"): + litellm.completion( + model="gpt-3.5-turbo", + messages=messages, + client=client, + ) + + litellm_utils.executor.shutdown(wait=True) + + (event,) = (item.payload for item in items if item.type == "transaction") + assert event["contexts"]["trace"]["origin"] == "manual" + + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + # OpenAI span finishes first + assert spans[1]["attributes"]["sentry.origin"] == "auto.ai.litellm" + elif stream_gen_ai_spans: items = capture_items("transaction", "span") with mock.patch.object( @@ -1492,6 +1565,7 @@ def test_span_origin( (event,) = (item.payload for item in items if item.type == "transaction") assert event["contexts"]["trace"]["origin"] == "manual" + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] assert spans[0]["attributes"]["sentry.origin"] == "auto.ai.litellm" else: @@ -1516,6 +1590,7 @@ def test_span_origin( assert event["spans"][0]["origin"] == "auto.ai.litellm" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_multiple_providers( reset_litellm_executor, @@ -1527,12 +1602,14 @@ def test_multiple_providers( nonstreaming_anthropic_model_response, nonstreaming_google_genai_model_response, stream_gen_ai_spans, + span_streaming, ): """Test that the integration correctly identifies different providers.""" sentry_init( integrations=[LiteLLMIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) messages = [{"role": "user", "content": "Hello!"}] @@ -1554,7 +1631,7 @@ def test_multiple_providers( request_headers={"X-Stainless-Raw-Response": "true"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction") with mock.patch.object( @@ -1618,6 +1695,7 @@ def test_multiple_providers( events = [item.payload for item in items if item.type == "transaction"] assert len(events) == 3 + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] for span in spans: # The provider should be detected by litellm.get_llm_provider @@ -1691,6 +1769,7 @@ def test_multiple_providers( assert SPANDATA.GEN_AI_SYSTEM in span["data"] +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.asyncio(loop_scope="session") async def test_async_multiple_providers( @@ -1702,12 +1781,14 @@ async def test_async_multiple_providers( nonstreaming_anthropic_model_response, nonstreaming_google_genai_model_response, stream_gen_ai_spans, + span_streaming, ): """Test that the integration correctly identifies different providers.""" sentry_init( integrations=[LiteLLMIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) messages = [{"role": "user", "content": "Hello!"}] @@ -1729,7 +1810,7 @@ async def test_async_multiple_providers( request_headers={"X-Stainless-Raw-Response": "true"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with mock.patch.object( @@ -1796,6 +1877,7 @@ async def test_async_multiple_providers( events = [item.payload for item in items if item.type == "transaction"] assert len(events) == 3 + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] for span in spans: # The provider should be detected by litellm.get_llm_provider @@ -1872,6 +1954,7 @@ async def test_async_multiple_providers( assert SPANDATA.GEN_AI_SYSTEM in span["data"] +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_additional_parameters( reset_litellm_executor, @@ -1881,12 +1964,14 @@ def test_additional_parameters( get_model_response, nonstreaming_chat_completions_model_response, stream_gen_ai_spans, + span_streaming, ): """Test that additional parameters are captured.""" sentry_init( integrations=[LiteLLMIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) messages = [{"role": "user", "content": "Hello!"}] @@ -1908,7 +1993,7 @@ def test_additional_parameters( request_headers={"X-Stainless-Raw-Response": "true"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with mock.patch.object( @@ -1929,6 +2014,7 @@ def test_additional_parameters( litellm_utils.executor.shutdown(wait=True) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] chat_spans = list( x @@ -1981,6 +2067,7 @@ def test_additional_parameters( assert span["data"][SPANDATA.GEN_AI_REQUEST_PRESENCE_PENALTY] == 0.5 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.asyncio(loop_scope="session") async def test_async_additional_parameters( @@ -1990,12 +2077,14 @@ async def test_async_additional_parameters( get_model_response, nonstreaming_chat_completions_model_response, stream_gen_ai_spans, + span_streaming, ): """Test that additional parameters are captured.""" sentry_init( integrations=[LiteLLMIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) messages = [{"role": "user", "content": "Hello!"}] @@ -2017,7 +2106,7 @@ async def test_async_additional_parameters( request_headers={"X-Stainless-Raw-Response": "true"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with mock.patch.object( @@ -2039,6 +2128,7 @@ async def test_async_additional_parameters( await GLOBAL_LOGGING_WORKER.flush() await asyncio.sleep(0.5) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] chat_spans = list( x @@ -2092,6 +2182,7 @@ async def test_async_additional_parameters( assert span["data"][SPANDATA.GEN_AI_REQUEST_PRESENCE_PENALTY] == 0.5 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_no_integration( reset_litellm_executor, @@ -2101,11 +2192,13 @@ def test_no_integration( get_model_response, nonstreaming_chat_completions_model_response, stream_gen_ai_spans, + span_streaming, ): """Test that when integration is not enabled, callbacks don't break.""" sentry_init( traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) messages = [{"role": "user", "content": "Hello!"}] @@ -2127,7 +2220,7 @@ def test_no_integration( request_headers={"X-Stainless-Raw-Response": "true"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with mock.patch.object( @@ -2143,6 +2236,7 @@ def test_no_integration( litellm_utils.executor.shutdown(wait=True) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] chat_spans = list( x @@ -2177,6 +2271,7 @@ def test_no_integration( assert len(chat_spans) == 0 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.asyncio(loop_scope="session") async def test_async_no_integration( @@ -2186,11 +2281,13 @@ async def test_async_no_integration( get_model_response, nonstreaming_chat_completions_model_response, stream_gen_ai_spans, + span_streaming, ): """Test that when integration is not enabled, callbacks don't break.""" sentry_init( traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) messages = [{"role": "user", "content": "Hello!"}] @@ -2212,7 +2309,7 @@ async def test_async_no_integration( request_headers={"X-Stainless-Raw-Response": "true"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with mock.patch.object( @@ -2229,6 +2326,7 @@ async def test_async_no_integration( await GLOBAL_LOGGING_WORKER.flush() await asyncio.sleep(0.5) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] chat_spans = list( x @@ -2264,18 +2362,21 @@ async def test_async_no_integration( assert len(chat_spans) == 0 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_response_without_usage( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test handling of responses without usage information.""" sentry_init( integrations=[LiteLLMIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) messages = [{"role": "user", "content": "Hello!"}] @@ -2290,7 +2391,7 @@ def test_response_without_usage( }, )() - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with start_transaction(name="litellm test"): @@ -2307,6 +2408,7 @@ def test_response_without_usage( datetime.now(), ) + sentry_sdk.flush() (span,) = (item.payload for item in items if item.type == "span") # Span should still be created even without usage info @@ -2412,6 +2514,7 @@ def test_litellm_message_truncation(sentry_init, capture_events): IMAGE_DATA_URI = f"data:image/png;base64,{IMAGE_B64}" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_binary_content_encoding_image_url( reset_litellm_executor, @@ -2421,12 +2524,14 @@ def test_binary_content_encoding_image_url( get_model_response, nonstreaming_chat_completions_model_response, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[LiteLLMIntegration(include_prompts=True)], traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) messages = [ @@ -2459,7 +2564,7 @@ def test_binary_content_encoding_image_url( request_headers={"X-Stainless-Raw-Response": "true"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with mock.patch.object( @@ -2476,6 +2581,7 @@ def test_binary_content_encoding_image_url( litellm_utils.executor.shutdown(wait=True) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] chat_spans = list( x @@ -2532,6 +2638,7 @@ def test_binary_content_encoding_image_url( ) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.asyncio(loop_scope="session") async def test_async_binary_content_encoding_image_url( @@ -2541,12 +2648,14 @@ async def test_async_binary_content_encoding_image_url( get_model_response, nonstreaming_chat_completions_model_response, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[LiteLLMIntegration(include_prompts=True)], traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) messages = [ @@ -2579,7 +2688,7 @@ async def test_async_binary_content_encoding_image_url( request_headers={"X-Stainless-Raw-Response": "true"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with mock.patch.object( @@ -2597,6 +2706,7 @@ async def test_async_binary_content_encoding_image_url( await GLOBAL_LOGGING_WORKER.flush() await asyncio.sleep(0.5) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] chat_spans = list( x @@ -2654,6 +2764,7 @@ async def test_async_binary_content_encoding_image_url( ) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_binary_content_encoding_mixed_content( reset_litellm_executor, @@ -2663,12 +2774,14 @@ def test_binary_content_encoding_mixed_content( get_model_response, nonstreaming_chat_completions_model_response, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[LiteLLMIntegration(include_prompts=True)], traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) messages = [ @@ -2702,7 +2815,7 @@ def test_binary_content_encoding_mixed_content( request_headers={"X-Stainless-Raw-Response": "true"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with mock.patch.object( @@ -2719,6 +2832,7 @@ def test_binary_content_encoding_mixed_content( litellm_utils.executor.shutdown(wait=True) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] chat_spans = list( x @@ -2763,6 +2877,7 @@ def test_binary_content_encoding_mixed_content( assert any(item.get("type") == "blob" for item in content_items) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.asyncio(loop_scope="session") async def test_async_binary_content_encoding_mixed_content( @@ -2772,12 +2887,14 @@ async def test_async_binary_content_encoding_mixed_content( get_model_response, nonstreaming_chat_completions_model_response, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[LiteLLMIntegration(include_prompts=True)], traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) messages = [ @@ -2811,7 +2928,7 @@ async def test_async_binary_content_encoding_mixed_content( request_headers={"X-Stainless-Raw-Response": "true"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with mock.patch.object( @@ -2829,6 +2946,7 @@ async def test_async_binary_content_encoding_mixed_content( await GLOBAL_LOGGING_WORKER.flush() await asyncio.sleep(0.5) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] chat_spans = list( x @@ -2874,6 +2992,7 @@ async def test_async_binary_content_encoding_mixed_content( assert any(item.get("type") == "blob" for item in content_items) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_binary_content_encoding_uri_type( reset_litellm_executor, @@ -2883,12 +3002,14 @@ def test_binary_content_encoding_uri_type( get_model_response, nonstreaming_chat_completions_model_response, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[LiteLLMIntegration(include_prompts=True)], traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) messages = [ @@ -2920,7 +3041,7 @@ def test_binary_content_encoding_uri_type( request_headers={"X-Stainless-Raw-Response": "true"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with mock.patch.object( client.completions._client._client, @@ -2936,6 +3057,7 @@ def test_binary_content_encoding_uri_type( litellm_utils.executor.shutdown(wait=True) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] chat_spans = list( x @@ -2987,6 +3109,7 @@ def test_binary_content_encoding_uri_type( assert uri_item["uri"] == "https://example.com/image.jpg" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.asyncio(loop_scope="session") async def test_async_binary_content_encoding_uri_type( @@ -2996,12 +3119,14 @@ async def test_async_binary_content_encoding_uri_type( get_model_response, nonstreaming_chat_completions_model_response, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[LiteLLMIntegration(include_prompts=True)], traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) messages = [ @@ -3033,7 +3158,7 @@ async def test_async_binary_content_encoding_uri_type( request_headers={"X-Stainless-Raw-Response": "true"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with mock.patch.object( @@ -3051,6 +3176,7 @@ async def test_async_binary_content_encoding_uri_type( await GLOBAL_LOGGING_WORKER.flush() await asyncio.sleep(0.5) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] chat_spans = list( x