Skip to content

Commit 6466117

Browse files
committed
Merge branch 'envelope-printer-transport-zcgvd' of github.com:getsentry/sentry-python into envelope-printer-transport-zcgvd
2 parents 239a332 + 851b0ba commit 6466117

22 files changed

Lines changed: 1406 additions & 500 deletions

.github/workflows/update-tox.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ on:
99
jobs:
1010
update-tox:
1111
name: Update test matrix
12+
if: github.ref == 'refs/heads/master'
1213
runs-on: ubuntu-latest
1314
timeout-minutes: 10
1415

@@ -59,7 +60,6 @@ jobs:
5960
BRANCH_NAME: ${{ steps.create-branch.outputs.branch_name }}
6061
COMMIT_TITLE: ${{ steps.create-branch.outputs.commit_title }}
6162
DATE: ${{ steps.create-branch.outputs.date }}
62-
BASE_BRANCH: ${{ github.ref_name }}
6363
with:
6464
script: |
6565
const branchName = process.env.BRANCH_NAME;
@@ -105,7 +105,7 @@ jobs:
105105
repo: context.repo.repo,
106106
title: commitTitle + ' (' + date + ')',
107107
head: branchName,
108-
base: process.env.BASE_BRANCH,
108+
base: 'master',
109109
body: prBody,
110110
});
111111

scripts/populate_tox/package_dependencies.jsonl

Lines changed: 6 additions & 6 deletions
Large diffs are not rendered by default.

scripts/populate_tox/releases.jsonl

Lines changed: 15 additions & 16 deletions
Large diffs are not rendered by default.

sentry_sdk/_batcher.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import threading
44
from datetime import datetime, timezone
55
from typing import TYPE_CHECKING, TypeVar, Generic
6+
import weakref
67

78
from sentry_sdk.utils import format_timestamp
89
from sentry_sdk.envelope import Envelope, Item, PayloadRef
@@ -38,6 +39,26 @@ def __init__(
3839
self._flusher: "Optional[threading.Thread]" = None
3940
self._flusher_pid: "Optional[int]" = None
4041

42+
# See https://github.com/getsentry/sentry-python/blob/051cc01640a29bfd64b1f1e2e3414c02f027dd1b/sentry_sdk/monitor.py#L41-L50
43+
if hasattr(os, "register_at_fork"):
44+
weak_reset = weakref.WeakMethod(self._reset_thread_state)
45+
46+
def _reset_in_child() -> None:
47+
method = weak_reset()
48+
if method is not None:
49+
method()
50+
51+
os.register_at_fork(after_in_child=_reset_in_child)
52+
53+
def _reset_thread_state(self) -> None:
54+
self._buffer = []
55+
self._running = True
56+
self._lock = threading.Lock()
57+
self._active = threading.local()
58+
self._flush_event = threading.Event()
59+
self._flusher = None
60+
self._flusher_pid = None
61+
4162
def _ensure_thread(self) -> bool:
4263
"""For forking processes we might need to restart this thread.
4364
This ensures that our process actually has that thread running.

sentry_sdk/_span_batcher.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1-
import threading
21
from collections import defaultdict
32
from datetime import datetime, timezone
3+
import os
4+
import threading
45
from typing import TYPE_CHECKING
6+
import weakref
57

68
from sentry_sdk._batcher import Batcher
79
from sentry_sdk.envelope import Envelope, Item, PayloadRef
@@ -50,6 +52,30 @@ def __init__(
5052
self._flusher: "Optional[threading.Thread]" = None
5153
self._flusher_pid: "Optional[int]" = None
5254

55+
# See https://github.com/getsentry/sentry-python/blob/051cc01640a29bfd64b1f1e2e3414c02f027dd1b/sentry_sdk/monitor.py#L41-L50
56+
if hasattr(os, "register_at_fork"):
57+
weak_reset = weakref.WeakMethod(self._reset_thread_state)
58+
59+
def _reset_in_child() -> None:
60+
method = weak_reset()
61+
if method is not None:
62+
method()
63+
64+
os.register_at_fork(after_in_child=_reset_in_child)
65+
66+
def _reset_thread_state(self) -> None:
67+
self._span_buffer = defaultdict(list)
68+
self._running_size = defaultdict(lambda: 0)
69+
self._running = True
70+
71+
self._lock = threading.Lock()
72+
self._active = threading.local()
73+
74+
self._flush_event = threading.Event()
75+
76+
self._flusher = None
77+
self._flusher_pid = None
78+
5379
def add(self, span: "StreamedSpan") -> None:
5480
# Bail out if the current thread is already executing batcher code.
5581
# This prevents deadlocks when code running inside the batcher (e.g.

sentry_sdk/api.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from sentry_sdk._init_implementation import init
77
from sentry_sdk.consts import INSTRUMENTER
88
from sentry_sdk.scope import Scope, _ScopeManager, new_scope, isolation_scope
9-
from sentry_sdk.traces import StreamedSpan
9+
from sentry_sdk.traces import StreamedSpan, _get_current_streamed_span
1010
from sentry_sdk.tracing import NoOpSpan, Transaction, trace
1111
from sentry_sdk.crons import monitor
1212

@@ -422,7 +422,7 @@ def set_measurement(name: str, value: float, unit: "MeasurementUnit" = "") -> No
422422

423423
def get_current_span(
424424
scope: "Optional[Scope]" = None,
425-
) -> "Optional[Union[Span, StreamedSpan]]":
425+
) -> "Optional[Span]":
426426
"""
427427
Returns the currently active span if there is one running, otherwise `None`
428428
"""
@@ -533,12 +533,7 @@ def update_current_span(
533533
attributes={"user_id": 123, "batch_size": 50}
534534
)
535535
"""
536-
current_span = get_current_span()
537-
538-
if current_span is None:
539-
return
540-
541-
if isinstance(current_span, StreamedSpan):
536+
if isinstance(_get_current_streamed_span(), StreamedSpan):
542537
warnings.warn(
543538
"The `update_current_span` API isn't available in streaming mode. "
544539
"Retrieve the current span with get_current_span() and use its API "
@@ -548,6 +543,11 @@ def update_current_span(
548543
)
549544
return
550545

546+
current_span = get_current_span()
547+
548+
if current_span is None:
549+
return
550+
551551
if op is not None:
552552
current_span.op = op
553553

sentry_sdk/integrations/celery/__init__.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from sentry_sdk.integrations.celery.utils import _now_seconds_since_epoch
1616
from sentry_sdk.integrations.logging import ignore_logger
1717
from sentry_sdk.scope import should_send_default_pii
18-
from sentry_sdk.traces import StreamedSpan
18+
from sentry_sdk.traces import StreamedSpan, _get_current_streamed_span
1919
from sentry_sdk.tracing import BAGGAGE_HEADER_NAME, Span, TransactionSource
2020
from sentry_sdk.tracing_utils import Baggage, has_span_streaming_enabled
2121
from sentry_sdk.utils import (
@@ -98,13 +98,16 @@ def setup_once() -> None:
9898

9999

100100
def _set_status(status: str) -> None:
101+
client = sentry_sdk.get_client()
102+
span_streaming = has_span_streaming_enabled(client.options)
103+
101104
with capture_internal_exceptions():
102105
scope = sentry_sdk.get_current_scope()
103-
if scope.span is not None:
104-
if isinstance(scope.span, Span):
105-
scope.span.set_status(status)
106-
else:
107-
scope.span.status = "ok" if status == "ok" else "error"
106+
107+
if span_streaming and scope.streamed_span is not None:
108+
scope.streamed_span.status = "ok" if status == "ok" else "error"
109+
elif not span_streaming and scope.span is not None:
110+
scope.span.set_status(status)
108111

109112

110113
def _capture_exception(task: "Any", exc_info: "ExcInfo") -> None:
@@ -289,7 +292,7 @@ def apply_async(*args: "Any", **kwargs: "Any") -> "Any":
289292

290293
span_mgr: "Union[StreamedSpan, Span, NoOpMgr]" = NoOpMgr()
291294
if span_streaming:
292-
if not task_started_from_beat and sentry_sdk.get_current_span() is not None:
295+
if not task_started_from_beat and _get_current_streamed_span() is not None:
293296
span_mgr = sentry_sdk.traces.start_span(
294297
name=task_name,
295298
attributes={
@@ -570,7 +573,7 @@ def sentry_publish(self: "Producer", *args: "Any", **kwargs: "Any") -> "Any":
570573

571574
span: "Union[StreamedSpan, Span, None]" = None
572575
if span_streaming:
573-
if sentry_sdk.get_current_span() is not None:
576+
if _get_current_streamed_span() is not None:
574577
span = sentry_sdk.traces.start_span(
575578
name=task_name,
576579
attributes={

sentry_sdk/integrations/fastapi.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,17 @@ def _sentry_get_request_handler(*args: "Any", **kwargs: "Any") -> "Any":
9090
@wraps(old_call)
9191
def _sentry_call(*args: "Any", **kwargs: "Any") -> "Any":
9292
current_scope = sentry_sdk.get_current_scope()
93-
current_span = current_scope.span
9493

95-
if isinstance(current_span, StreamedSpan) and not isinstance(
96-
current_span, NoOpStreamedSpan
97-
):
98-
segment = current_span._segment
99-
segment._update_active_thread()
94+
client = sentry_sdk.get_client()
95+
if has_span_streaming_enabled(client.options):
96+
current_span = current_scope.streamed_span
97+
98+
if isinstance(current_span, StreamedSpan) and not isinstance(
99+
current_span, NoOpStreamedSpan
100+
):
101+
segment = current_span._segment
102+
segment._update_active_thread()
103+
100104
elif current_scope.transaction is not None:
101105
current_scope.transaction.update_active_thread()
102106

sentry_sdk/integrations/starlette.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
)
2222
from sentry_sdk.integrations.asgi import SentryAsgiMiddleware
2323
from sentry_sdk.scope import should_send_default_pii
24-
from sentry_sdk.traces import NoOpStreamedSpan, StreamedSpan
24+
from sentry_sdk.traces import NoOpStreamedSpan, StreamedSpan, _get_current_streamed_span
2525
from sentry_sdk.tracing import (
2626
SOURCE_FOR_STYLE,
2727
TransactionSource,
@@ -254,7 +254,7 @@ def _default(value: "Any") -> "Any":
254254
def _set_request_body_data_on_streaming_segment(
255255
info: "Optional[Dict[str, Any]]",
256256
) -> None:
257-
current_span = sentry_sdk.get_current_span()
257+
current_span = _get_current_streamed_span()
258258
if (
259259
info
260260
and "data" in info
@@ -545,19 +545,22 @@ def event_processor(
545545

546546
@functools.wraps(old_func)
547547
def _sentry_sync_func(*args: "Any", **kwargs: "Any") -> "Any":
548-
integration = sentry_sdk.get_client().get_integration(
549-
StarletteIntegration
550-
)
548+
client = sentry_sdk.get_client()
549+
550+
integration = client.get_integration(StarletteIntegration)
551551
if integration is None:
552552
return old_func(*args, **kwargs)
553553

554554
current_scope = sentry_sdk.get_current_scope()
555-
current_span = current_scope.span
556555

557-
if isinstance(current_span, StreamedSpan) and not isinstance(
558-
current_span, NoOpStreamedSpan
559-
):
560-
current_span._segment._update_active_thread()
556+
span_streaming = has_span_streaming_enabled(client.options)
557+
if span_streaming:
558+
current_span = current_scope.streamed_span
559+
560+
if isinstance(current_span, StreamedSpan) and not isinstance(
561+
current_span, NoOpStreamedSpan
562+
):
563+
current_span._segment._update_active_thread()
561564
elif current_scope.transaction is not None:
562565
current_scope.transaction.update_active_thread()
563566

sentry_sdk/scope.py

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -580,12 +580,18 @@ def get_traceparent(self, *args: "Any", **kwargs: "Any") -> "Optional[str]":
580580
"""
581581
client = self.get_client()
582582

583+
if not has_tracing_enabled(client.options):
584+
return self.get_active_propagation_context().to_traceparent()
585+
586+
span_streaming = has_span_streaming_enabled(client.options)
583587
# If we have an active span, return traceparent from there
584588
if (
585-
has_tracing_enabled(client.options)
586-
and self.span is not None
587-
and not isinstance(self.span, NoOpStreamedSpan)
589+
span_streaming
590+
and self.streamed_span is not None
591+
and not isinstance(self.streamed_span, NoOpStreamedSpan)
588592
):
593+
return self.streamed_span._to_traceparent()
594+
elif not span_streaming and self.span is not None:
589595
return self.span._to_traceparent()
590596

591597
# else return traceparent from the propagation context
@@ -598,12 +604,18 @@ def get_baggage(self, *args: "Any", **kwargs: "Any") -> "Optional[Baggage]":
598604
"""
599605
client = self.get_client()
600606

607+
if not has_tracing_enabled(client.options):
608+
return self.get_active_propagation_context().get_baggage()
609+
610+
span_streaming = has_span_streaming_enabled(client.options)
601611
# If we have an active span, return baggage from there
602612
if (
603-
has_tracing_enabled(client.options)
604-
and self.span is not None
605-
and not isinstance(self.span, NoOpStreamedSpan)
613+
span_streaming
614+
and self.streamed_span is not None
615+
and not isinstance(self.streamed_span, NoOpStreamedSpan)
606616
):
617+
return self.streamed_span._to_baggage()
618+
elif not span_streaming and self.span is not None:
607619
return self.span._to_baggage()
608620

609621
# else return baggage from the propagation context
@@ -680,7 +692,9 @@ def iter_trace_propagation_headers(
680692
return
681693

682694
span = kwargs.pop("span", None)
683-
span = span or self.span
695+
if not span:
696+
span_streaming = has_span_streaming_enabled(client.options)
697+
span = self.streamed_span if span_streaming else self.span
684698

685699
if (
686700
has_tracing_enabled(client.options)
@@ -877,12 +891,12 @@ def set_user(self, value: "Optional[Dict[str, Any]]") -> None:
877891
session.update(user=value)
878892

879893
@property
880-
def span(self) -> "Optional[Union[Span, StreamedSpan]]":
894+
def span(self) -> "Optional[Span]":
881895
"""Get/set current tracing span or transaction."""
882-
return self._span
896+
return self._span if isinstance(self._span, Span) else None
883897

884898
@span.setter
885-
def span(self, span: "Optional[Union[Span, StreamedSpan]]") -> None:
899+
def span(self, span: "Optional[Span]") -> None:
886900
self._span = span
887901
# XXX: this differs from the implementation in JS, there Scope.setSpan
888902
# does not set Scope._transactionName.
@@ -893,6 +907,15 @@ def span(self, span: "Optional[Union[Span, StreamedSpan]]") -> None:
893907
if transaction.source:
894908
self._transaction_info["source"] = transaction.source
895909

910+
@property
911+
def streamed_span(self) -> "Optional[StreamedSpan]":
912+
"""Get/set current tracing span."""
913+
return self._span if isinstance(self._span, StreamedSpan) else None
914+
915+
@streamed_span.setter
916+
def streamed_span(self, span: "Optional[StreamedSpan]") -> None:
917+
self._span = span
918+
896919
# Also set _transaction and _transaction_info in streaming mode as this
897920
# is used for populating events and linking them to segments
898921
if (
@@ -1267,7 +1290,7 @@ def start_streamed_span(
12671290
if parent_span is _DEFAULT_PARENT_SPAN or isinstance(
12681291
parent_span, NoOpStreamedSpan
12691292
):
1270-
parent_span = self.span # type: ignore
1293+
parent_span = self.streamed_span
12711294

12721295
# If no eligible parent_span was provided and there is no currently
12731296
# active span, this is a segment
@@ -1848,9 +1871,21 @@ def apply_to_telemetry(self, telemetry: "Union[Log, Metric, StreamedSpan]") -> N
18481871
telemetry["trace_id"] = (
18491872
trace_id or "00000000-0000-0000-0000-000000000000"
18501873
)
1851-
span_id = trace_context.get("span_id")
1852-
if telemetry.get("span_id") is None and span_id:
1853-
telemetry["span_id"] = span_id
1874+
1875+
# span_id should only be populated if there's an active span. We can't
1876+
# use the trace_context here because it synthesizes a span_id if there
1877+
# isn't one
1878+
if telemetry.get("span_id") is None:
1879+
if self._span is not None and not isinstance(
1880+
self._span, NoOpStreamedSpan
1881+
):
1882+
telemetry["span_id"] = self._span.span_id
1883+
else:
1884+
external_propagation_context = get_external_propagation_context()
1885+
if external_propagation_context:
1886+
_, span_id = external_propagation_context
1887+
if span_id is not None:
1888+
telemetry["span_id"] = span_id
18541889

18551890
self._apply_scope_attributes_to_telemetry(telemetry)
18561891
self._apply_user_attributes_to_telemetry(telemetry)

0 commit comments

Comments
 (0)