|
1 | 1 | from collections import defaultdict |
2 | 2 | from datetime import datetime, timezone |
3 | | -from typing import TYPE_CHECKING |
| 3 | +import os |
| 4 | +import threading |
| 5 | +from typing import TYPE_CHECKING, Optional |
| 6 | +import weakref |
4 | 7 |
|
5 | 8 | from sentry_sdk._batcher import Batcher |
6 | 9 | from sentry_sdk.envelope import Envelope, Item, PayloadRef |
@@ -31,14 +34,41 @@ def __init__( |
31 | 34 | capture_func: "Callable[[Envelope], None]", |
32 | 35 | record_lost_func: "Callable[..., None]", |
33 | 36 | ) -> None: |
34 | | - super().__init__(capture_func, record_lost_func) |
35 | 37 | # Spans from different traces cannot be emitted in the same envelope |
36 | 38 | # since the envelope contains a shared trace header. That's why we bucket |
37 | 39 | # by trace_id, so that we can then send the buckets each in its own |
38 | 40 | # envelope. |
39 | 41 | # trace_id -> span buffer |
40 | 42 | self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list) |
41 | 43 | self._running_size: dict[str, int] = defaultdict(lambda: 0) |
| 44 | + self._capture_func = capture_func |
| 45 | + self._record_lost_func = record_lost_func |
| 46 | + self._running = True |
| 47 | + self._lock = threading.Lock() |
| 48 | + self._active: "threading.local" = threading.local() |
| 49 | + |
| 50 | + self._flush_event: "threading.Event" = threading.Event() |
| 51 | + |
| 52 | + self._flusher: "Optional[threading.Thread]" = None |
| 53 | + self._flusher_pid: "Optional[int]" = None |
| 54 | + |
| 55 | + self._reset_thread_state() |
| 56 | + |
| 57 | + # See https://github.com/getsentry/sentry-python/blob/051cc01640a29bfd64b1f1e2e3414c02f027dd1b/sentry_sdk/monitor.py#L41-L50 |
| 58 | + if hasattr(os, "register_at_fork"): |
| 59 | + weak_reset = weakref.WeakMethod(self._reset_thread_state) |
| 60 | + |
| 61 | + def _reset_in_child() -> None: |
| 62 | + method = weak_reset() |
| 63 | + if method is not None: |
| 64 | + method() |
| 65 | + |
| 66 | + os.register_at_fork(after_in_child=_reset_in_child) |
| 67 | + |
| 68 | + def _reset_thread_state(self) -> None: |
| 69 | + self._flusher = None |
| 70 | + self._lock = threading.Lock() |
| 71 | + self._flusher_pid = None |
42 | 72 |
|
43 | 73 | def add(self, span: "StreamedSpan") -> None: |
44 | 74 | # Bail out if the current thread is already executing batcher code. |
|
0 commit comments