Skip to content

Commit 593871c

Browse files
committed
.
1 parent 0eb003b commit 593871c

3 files changed

Lines changed: 69 additions & 31 deletions

File tree

sentry_sdk/integrations/celery/__init__.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,6 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any":
339339
}
340340
}
341341

342-
scope.set_custom_sampling_context(custom_sampling_context)
343342
scope.set_transaction_name(task.name, source=TransactionSource.TASK)
344343

345344
span: "Union[Span, StreamedSpan]"
@@ -351,6 +350,7 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any":
351350
headers = args[3].get("headers") or {}
352351
if span_streaming:
353352
sentry_sdk.traces.continue_trace(headers)
353+
scope.set_custom_sampling_context(custom_sampling_context)
354354
span = sentry_sdk.traces.start_span(
355355
name=task.name,
356356
parent_span=None, # make this a segment
@@ -553,11 +553,6 @@ def sentry_publish(self: "Producer", *args: "Any", **kwargs: "Any") -> "Any":
553553
# method will still work.
554554
kwargs_headers = {}
555555

556-
# XXX[ivana]: check whether this is needed with the parent checks
557-
# if "task" not in kwargs_headers:
558-
# # filter out heartbeat and other internal Celery events
559-
# return original_publish(self, *args, **kwargs)
560-
561556
task_name = kwargs_headers.get("task")
562557
task_id = kwargs_headers.get("id")
563558
retries = kwargs_headers.get("retries")

sentry_sdk/traces.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,9 @@ def __init__(
258258
):
259259
self._name: str = name
260260
self._active: bool = active
261-
self._attributes: "Attributes" = {}
261+
self._attributes: "Attributes" = {
262+
"sentry.origin": "manual",
263+
}
262264

263265
if attributes:
264266
for attribute, value in attributes.items():

tests/integrations/celery/test_celery.py

Lines changed: 65 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -515,13 +515,18 @@ def dummy_task(self, x, y):
515515
assert celery_invocation(dummy_task, 1, 1)[0].wait() == 1
516516

517517

518+
@pytest.mark.parametrize("span_streaming", [True, False])
518519
def test_traces_sampler_gets_task_info_in_sampling_context(
520+
span_streaming,
519521
init_celery,
520522
celery_invocation,
521523
DictionaryContaining, # noqa:N803
522524
):
523525
traces_sampler = mock.Mock()
524-
celery = init_celery(traces_sampler=traces_sampler)
526+
celery = init_celery(
527+
traces_sampler=traces_sampler,
528+
_experiments={"trace_lifecycle": "stream" if span_streaming else "static"},
529+
)
525530

526531
@celery.task(name="dog_walk")
527532
def walk_dogs(x, y):
@@ -618,35 +623,56 @@ def dummy_task(self, x, y):
618623
)
619624

620625

621-
def test_sentry_propagate_traces_override(init_celery):
626+
@pytest.mark.parametrize("span_streaming", [True, False])
627+
def test_sentry_propagate_traces_override(span_streaming, init_celery):
622628
"""
623629
Test if the `sentry-propagate-traces` header given to `apply_async`
624630
overrides the `propagate_traces` parameter in the integration constructor.
625631
"""
626632
celery = init_celery(
627-
propagate_traces=True, traces_sample_rate=1.0, release="abcdef"
633+
propagate_traces=True,
634+
traces_sample_rate=1.0,
635+
release="abcdef",
636+
_experiments={"trace_lifecycle": "stream" if span_streaming else "static"},
628637
)
629638

630639
@celery.task(name="dummy_task", bind=True)
631640
def dummy_task(self, message):
632641
trace_id = get_current_span().trace_id
633642
return trace_id
634643

635-
with start_transaction() as transaction:
636-
transaction_trace_id = transaction.trace_id
644+
if span_streaming:
645+
with sentry_sdk.traces.start_span(name="parent") as span:
646+
parent_trace_id = span.trace_id
637647

638-
# should propagate trace
639-
task_transaction_id = dummy_task.apply_async(
640-
args=("some message",),
641-
).get()
642-
assert transaction_trace_id == task_transaction_id
648+
# should propagate trace
649+
task_trace_id = dummy_task.apply_async(
650+
args=("some message",),
651+
).get()
652+
assert parent_trace_id == task_trace_id
643653

644-
# should NOT propagate trace (overrides `propagate_traces` parameter in integration constructor)
645-
task_transaction_id = dummy_task.apply_async(
646-
args=("another message",),
647-
headers={"sentry-propagate-traces": False},
648-
).get()
649-
assert transaction_trace_id != task_transaction_id
654+
# should NOT propagate trace
655+
task_trace_id = dummy_task.apply_async(
656+
args=("another message",),
657+
headers={"sentry-propagate-traces": False},
658+
).get()
659+
assert parent_trace_id != task_trace_id
660+
else:
661+
with start_transaction() as transaction:
662+
transaction_trace_id = transaction.trace_id
663+
664+
# should propagate trace
665+
task_trace_id = dummy_task.apply_async(
666+
args=("some message",),
667+
).get()
668+
assert transaction_trace_id == task_trace_id
669+
670+
# should NOT propagate trace
671+
task_trace_id = dummy_task.apply_async(
672+
args=("another message",),
673+
headers={"sentry-propagate-traces": False},
674+
).get()
675+
assert transaction_trace_id != task_trace_id
650676

651677

652678
def test_apply_async_manually_span(sentry_init):
@@ -1001,11 +1027,12 @@ def task(): ...
10011027

10021028
sentry_sdk.flush()
10031029

1030+
parent = items.pop(-1).payload
1031+
assert parent["name"] == "custom parent"
1032+
assert parent["attributes"]["sentry.origin"] == "manual"
1033+
10041034
for item in items:
1005-
if item.payload["name"] != "custom parent":
1006-
assert (
1007-
item.payload["attributes"]["sentry.origin"] == "auto.queue.celery"
1008-
)
1035+
assert item.payload["attributes"]["sentry.origin"] == "auto.queue.celery"
10091036
else:
10101037
events = capture_events()
10111038

@@ -1078,11 +1105,17 @@ def test_send_task_wrapped(
10781105

10791106
if span_streaming:
10801107
submit_span, outer = [item.payload for item in items]
1108+
10811109
assert outer["name"] == "custom parent"
10821110
assert outer["is_segment"] is True
1111+
10831112
assert submit_span["name"] == "very_creative_task_name"
10841113
assert submit_span["attributes"]["sentry.op"] == "queue.submit.celery"
10851114
assert submit_span["trace_id"] == outer_span.trace_id
1115+
assert (
1116+
submit_span["trace_id"] == kwargs["headers"]["sentry-trace"].split("-")[0]
1117+
)
1118+
10861119
else:
10871120
(event,) = events
10881121
assert event["type"] == "transaction"
@@ -1094,14 +1127,18 @@ def test_send_task_wrapped(
10941127
assert span["trace_id"] == kwargs["headers"]["sentry-trace"].split("-")[0]
10951128

10961129

1097-
def test_user_custom_headers_accessible_in_task(init_celery):
1130+
@pytest.mark.parametrize("span_streaming", [True, False])
1131+
def test_user_custom_headers_accessible_in_task(span_streaming, init_celery):
10981132
"""
10991133
Regression test for https://github.com/getsentry/sentry-python/issues/5566
11001134
11011135
User-provided custom headers passed to apply_async() must be accessible
11021136
via task.request.headers on the worker side.
11031137
"""
1104-
celery = init_celery(traces_sample_rate=1.0)
1138+
celery = init_celery(
1139+
traces_sample_rate=1.0,
1140+
_experiments={"trace_lifecycle": "stream" if span_streaming else "static"},
1141+
)
11051142

11061143
@celery.task(name="custom_headers_task", bind=True)
11071144
def custom_headers_task(self):
@@ -1113,8 +1150,12 @@ def custom_headers_task(self):
11131150
"tenant_id": "tenant-42",
11141151
}
11151152

1116-
with start_transaction(name="test"):
1117-
result = custom_headers_task.apply_async(headers=custom_headers)
1153+
if span_streaming:
1154+
with sentry_sdk.traces.start_span(name="test"):
1155+
result = custom_headers_task.apply_async(headers=custom_headers)
1156+
else:
1157+
with start_transaction(name="test"):
1158+
result = custom_headers_task.apply_async(headers=custom_headers)
11181159

11191160
received_headers = result.get()
11201161
for key, value in custom_headers.items():

0 commit comments

Comments
 (0)