Skip to content

Commit 31489ea

Browse files
sl0thentr0pyclaude
andcommitted
feat(dramatiq): Support span streaming
Add a span-streaming branch in SentryMiddleware.before_process_message and after_process_message alongside the legacy transaction-based path, gated on has_span_streaming_enabled. The streaming branch uses sentry_sdk.traces.continue_trace + start_span and stashes the span context manager on the message so the after-hook can exit it without relying on scope.transaction (which doesn't exist in streaming mode). Parametrize the tracing tests over span_streaming True/False so both modes are covered. Refs GH-5386 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent bd567bd commit 31489ea

2 files changed

Lines changed: 221 additions & 62 deletions

File tree

sentry_sdk/integrations/dramatiq.py

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
from sentry_sdk.api import continue_trace, get_baggage, get_traceparent
66
from sentry_sdk.integrations import Integration, DidNotEnable
77
from sentry_sdk.integrations._wsgi_common import request_body_within_bounds
8+
from sentry_sdk.traces import SegmentSource
89
from sentry_sdk.tracing import (
910
BAGGAGE_HEADER_NAME,
1011
SENTRY_TRACE_HEADER_NAME,
1112
TransactionSource,
1213
)
14+
from sentry_sdk.tracing_utils import has_span_streaming_enabled
1315
from sentry_sdk.utils import (
1416
AnnotatedValue,
1517
capture_internal_exceptions,
@@ -113,7 +115,8 @@ def before_enqueue(
113115
}
114116

115117
def before_process_message(self, broker: "Broker", message: "Message[R]") -> None:
116-
integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
118+
client = sentry_sdk.get_client()
119+
integration = client.get_integration(DramatiqIntegration)
117120
if integration is None:
118121
return
119122

@@ -128,21 +131,35 @@ def before_process_message(self, broker: "Broker", message: "Message[R]") -> Non
128131
# start new trace in case of retrying
129132
sentry_headers = {}
130133

131-
transaction = continue_trace(
132-
sentry_headers,
133-
name=message.actor_name,
134-
op=OP.QUEUE_TASK_DRAMATIQ,
135-
source=TransactionSource.TASK,
136-
origin=DramatiqIntegration.origin,
137-
)
138-
transaction.set_status(SPANSTATUS.OK)
139-
sentry_sdk.start_transaction(
140-
transaction,
141-
name=message.actor_name,
142-
op=OP.QUEUE_TASK_DRAMATIQ,
143-
source=TransactionSource.TASK,
144-
)
145-
transaction.__enter__()
134+
if has_span_streaming_enabled(client.options):
135+
sentry_sdk.traces.continue_trace(sentry_headers)
136+
span = sentry_sdk.traces.start_span(
137+
name=message.actor_name,
138+
attributes={
139+
"sentry.op": OP.QUEUE_TASK_DRAMATIQ,
140+
"sentry.origin": DramatiqIntegration.origin,
141+
"sentry.span.source": SegmentSource.TASK.value,
142+
},
143+
)
144+
message._sentry_span_ctx = span
145+
span.__enter__()
146+
else:
147+
transaction = continue_trace(
148+
sentry_headers,
149+
name=message.actor_name,
150+
op=OP.QUEUE_TASK_DRAMATIQ,
151+
source=TransactionSource.TASK,
152+
origin=DramatiqIntegration.origin,
153+
)
154+
transaction.set_status(SPANSTATUS.OK)
155+
sentry_sdk.start_transaction(
156+
transaction,
157+
name=message.actor_name,
158+
op=OP.QUEUE_TASK_DRAMATIQ,
159+
source=TransactionSource.TASK,
160+
)
161+
transaction.__enter__()
162+
message._sentry_span_ctx = transaction
146163

147164
def after_process_message(
148165
self,
@@ -160,8 +177,8 @@ def after_process_message(
160177
throws = message.options.get("throws") or actor.options.get("throws")
161178

162179
scope_manager = message._scope_manager
163-
transaction = sentry_sdk.get_current_scope().transaction
164-
if not transaction:
180+
span_ctx = getattr(message, "_sentry_span_ctx", None)
181+
if span_ctx is None:
165182
return None
166183

167184
is_event_capture_required = (
@@ -171,7 +188,7 @@ def after_process_message(
171188
)
172189
if not is_event_capture_required:
173190
# normal transaction finish
174-
transaction.__exit__(None, None, None)
191+
span_ctx.__exit__(None, None, None)
175192
scope_manager.__exit__(None, None, None)
176193
return
177194

@@ -185,7 +202,7 @@ def after_process_message(
185202
)
186203
sentry_sdk.capture_event(event, hint=hint)
187204
# transaction error
188-
transaction.__exit__(type(exception), exception, None)
205+
span_ctx.__exit__(type(exception), exception, None)
189206
scope_manager.__exit__(type(exception), exception, None)
190207

191208
after_skip_message = after_process_message

tests/integrations/dramatiq/test_dramatiq.py

Lines changed: 184 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@
1717

1818
@pytest.fixture(scope="function")
1919
def broker(request, sentry_init):
20-
sentry_init(
21-
integrations=[DramatiqIntegration()],
22-
traces_sample_rate=getattr(request, "param", None),
23-
)
20+
param = getattr(request, "param", None)
21+
if isinstance(param, dict):
22+
sentry_init(integrations=[DramatiqIntegration()], **param)
23+
else:
24+
sentry_init(
25+
integrations=[DramatiqIntegration()],
26+
traces_sample_rate=param,
27+
)
2428
broker = StubBroker()
2529
broker.emit_after("process_boot")
2630
dramatiq.set_broker(broker)
@@ -66,22 +70,77 @@ def dummy_actor(x, y):
6670

6771

6872
@pytest.mark.parametrize(
69-
"broker,expected_span_status,fail_fast",
73+
"broker,expected_span_status,fail_fast,span_streaming",
7074
[
71-
(1.0, SPANSTATUS.INTERNAL_ERROR, False),
72-
(1.0, SPANSTATUS.OK, False),
73-
(1.0, SPANSTATUS.INTERNAL_ERROR, True),
74-
(1.0, SPANSTATUS.OK, True),
75+
({"traces_sample_rate": 1.0}, SPANSTATUS.INTERNAL_ERROR, False, False),
76+
({"traces_sample_rate": 1.0}, SPANSTATUS.OK, False, False),
77+
({"traces_sample_rate": 1.0}, SPANSTATUS.INTERNAL_ERROR, True, False),
78+
({"traces_sample_rate": 1.0}, SPANSTATUS.OK, True, False),
79+
(
80+
{
81+
"traces_sample_rate": 1.0,
82+
"_experiments": {"trace_lifecycle": "stream"},
83+
},
84+
SPANSTATUS.INTERNAL_ERROR,
85+
False,
86+
True,
87+
),
88+
(
89+
{
90+
"traces_sample_rate": 1.0,
91+
"_experiments": {"trace_lifecycle": "stream"},
92+
},
93+
SPANSTATUS.OK,
94+
False,
95+
True,
96+
),
97+
(
98+
{
99+
"traces_sample_rate": 1.0,
100+
"_experiments": {"trace_lifecycle": "stream"},
101+
},
102+
SPANSTATUS.INTERNAL_ERROR,
103+
True,
104+
True,
105+
),
106+
(
107+
{
108+
"traces_sample_rate": 1.0,
109+
"_experiments": {"trace_lifecycle": "stream"},
110+
},
111+
SPANSTATUS.OK,
112+
True,
113+
True,
114+
),
115+
],
116+
ids=[
117+
"error",
118+
"success",
119+
"error_fail_fast",
120+
"success_fail_fast",
121+
"error_stream",
122+
"success_stream",
123+
"error_fail_fast_stream",
124+
"success_fail_fast_stream",
75125
],
76-
ids=["error", "success", "error_fail_fast", "success_fail_fast"],
77126
indirect=["broker"],
78127
)
79128
def test_task_transaction(
80-
broker, worker, capture_events, expected_span_status, fail_fast
129+
broker,
130+
worker,
131+
capture_events,
132+
capture_items,
133+
expected_span_status,
134+
fail_fast,
135+
span_streaming,
81136
):
82-
events = capture_events()
83137
task_fails = expected_span_status == SPANSTATUS.INTERNAL_ERROR
84138

139+
if span_streaming:
140+
items = capture_items("event", "span")
141+
else:
142+
events = capture_events()
143+
85144
@dramatiq.actor(max_retries=0)
86145
def dummy_actor(x, y):
87146
return x / y
@@ -95,37 +154,93 @@ def dummy_actor(x, y):
95154
broker.join(dummy_actor.queue_name, fail_fast=fail_fast)
96155

97156
worker.join()
157+
sentry_sdk.flush()
158+
159+
if span_streaming:
160+
if task_fails:
161+
error_item, segment_item = items
162+
error_event = error_item.payload
163+
exception = error_event["exception"]["values"][0]
164+
assert exception["type"] == "ZeroDivisionError"
165+
assert exception["mechanism"]["type"] == DramatiqIntegration.identifier
166+
else:
167+
(segment_item,) = items
168+
169+
segment = segment_item.payload
170+
assert segment_item.type == "span"
171+
assert segment["name"] == "dummy_actor"
172+
assert segment["is_segment"] is True
173+
assert segment["attributes"]["sentry.op"] == "queue.task.dramatiq"
174+
assert segment["attributes"]["sentry.span.source"] == "task"
175+
assert segment["status"] == ("error" if task_fails else "ok")
176+
else:
177+
if task_fails:
178+
error_event = events.pop(0)
179+
exception = error_event["exception"]["values"][0]
180+
assert exception["type"] == "ZeroDivisionError"
181+
assert exception["mechanism"]["type"] == DramatiqIntegration.identifier
98182

99-
if task_fails:
100-
error_event = events.pop(0)
101-
exception = error_event["exception"]["values"][0]
102-
assert exception["type"] == "ZeroDivisionError"
103-
assert exception["mechanism"]["type"] == DramatiqIntegration.identifier
183+
(event,) = events
184+
assert event["type"] == "transaction"
185+
assert event["transaction"] == "dummy_actor"
186+
assert event["transaction_info"] == {"source": TransactionSource.TASK}
187+
assert event["contexts"]["trace"]["status"] == expected_span_status
104188

105-
(event,) = events
106-
assert event["type"] == "transaction"
107-
assert event["transaction"] == "dummy_actor"
108-
assert event["transaction_info"] == {"source": TransactionSource.TASK}
109-
assert event["contexts"]["trace"]["status"] == expected_span_status
110189

190+
@pytest.mark.parametrize(
191+
"broker,span_streaming",
192+
[
193+
({"traces_sample_rate": 1.0}, False),
194+
(
195+
{
196+
"traces_sample_rate": 1.0,
197+
"_experiments": {"trace_lifecycle": "stream"},
198+
},
199+
True,
200+
),
201+
],
202+
ids=["static", "stream"],
203+
indirect=["broker"],
204+
)
205+
def test_dramatiq_propagate_trace(
206+
broker, worker, capture_events, capture_items, span_streaming
207+
):
208+
if span_streaming:
209+
items = capture_items("span")
111210

112-
@pytest.mark.parametrize("broker", [1.0], indirect=True)
113-
def test_dramatiq_propagate_trace(broker, worker, capture_events):
114-
events = capture_events()
211+
with sentry_sdk.traces.start_span(name="outer") as outer_span:
115212

116-
@dramatiq.actor(max_retries=0)
117-
def propagated_trace_task():
118-
pass
213+
@dramatiq.actor(max_retries=0)
214+
def propagated_trace_task():
215+
pass
119216

120-
with start_transaction() as outer_transaction:
121-
propagated_trace_task.send()
122-
broker.join(propagated_trace_task.queue_name)
123-
worker.join()
217+
propagated_trace_task.send()
218+
broker.join(propagated_trace_task.queue_name)
219+
worker.join()
124220

125-
assert (
126-
events[0]["transaction"] == "propagated_trace_task"
127-
) # the "inner" transaction
128-
assert events[0]["contexts"]["trace"]["trace_id"] == outer_transaction.trace_id
221+
sentry_sdk.flush()
222+
223+
inner_segment, outer_segment = [i.payload for i in items]
224+
assert inner_segment["name"] == "propagated_trace_task"
225+
assert inner_segment["attributes"]["sentry.op"] == "queue.task.dramatiq"
226+
assert inner_segment["trace_id"] == outer_span.trace_id
227+
assert outer_segment["name"] == "outer"
228+
else:
229+
events = capture_events()
230+
231+
@dramatiq.actor(max_retries=0)
232+
def propagated_trace_task():
233+
pass
234+
235+
with start_transaction() as outer_transaction:
236+
propagated_trace_task.send()
237+
broker.join(propagated_trace_task.queue_name)
238+
worker.join()
239+
240+
assert (
241+
events[0]["transaction"] == "propagated_trace_task"
242+
) # the "inner" transaction
243+
assert events[0]["contexts"]["trace"]["trace_id"] == outer_transaction.trace_id
129244

130245

131246
@pytest.mark.parametrize(
@@ -389,19 +504,39 @@ def dummy_actor():
389504
assert events == []
390505

391506

392-
@pytest.mark.parametrize("broker", [1.0], indirect=True)
507+
@pytest.mark.parametrize(
508+
"broker,span_streaming",
509+
[
510+
({"traces_sample_rate": 1.0}, False),
511+
(
512+
{
513+
"traces_sample_rate": 1.0,
514+
"_experiments": {"trace_lifecycle": "stream"},
515+
},
516+
True,
517+
),
518+
],
519+
ids=["static", "stream"],
520+
indirect=["broker"],
521+
)
393522
def test_that_skip_message_cleans_up_scope_and_transaction(
394-
broker, worker, capture_events
523+
broker, worker, capture_events, capture_items, span_streaming
395524
):
396-
transactions: list[Transaction] = []
525+
captured_spans: list = []
397526

398527
class SkipMessageMiddleware(Middleware):
399528
def before_process_message(self, broker, message):
400-
transactions.append(sentry_sdk.get_current_scope().transaction)
529+
if span_streaming:
530+
captured_spans.append(sentry_sdk.get_current_span())
531+
else:
532+
captured_spans.append(sentry_sdk.get_current_scope().transaction)
401533
raise SkipMessage()
402534

403535
broker.add_middleware(SkipMessageMiddleware())
404536

537+
if span_streaming:
538+
items = capture_items("span")
539+
405540
@dramatiq.actor(max_retries=0)
406541
def skipped_actor(): ...
407542

@@ -410,5 +545,12 @@ def skipped_actor(): ...
410545
broker.join(skipped_actor.queue_name)
411546
worker.join()
412547

413-
(transaction,) = transactions
414-
assert transaction.timestamp is not None
548+
if span_streaming:
549+
sentry_sdk.flush()
550+
(segment_payload,) = [i.payload for i in items]
551+
assert segment_payload["name"] == "skipped_actor"
552+
assert segment_payload["end_timestamp"] is not None
553+
else:
554+
(transaction,) = captured_spans
555+
assert isinstance(transaction, Transaction)
556+
assert transaction.timestamp is not None

0 commit comments

Comments
 (0)