Skip to content

Commit 80f5400

Browse files
committed
replace queue
1 parent 41a98c0 commit 80f5400

1 file changed

Lines changed: 27 additions & 31 deletions

File tree

sentry_sdk/_span_batcher.py

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from typing import TYPE_CHECKING
77

88
from sentry_sdk._batcher import Batcher
9-
from sentry_sdk._queue import EmptyError, Queue
109
from sentry_sdk.envelope import Envelope, Item, PayloadRef
1110
from sentry_sdk.utils import format_timestamp, serialize_attribute
1211

@@ -49,8 +48,9 @@ def __init__(
4948
self._lock = threading.Lock()
5049
self._active: "threading.local" = threading.local()
5150

52-
self._last_full_flush: float = time.monotonic()
53-
self._flush_queue: Queue = Queue()
51+
self._last_full_flush: float = time.monotonic() # drives time-based flushes
52+
self._flush_event: threading.Event = threading.Event()
53+
self._pending_flush: set[str] = set() # buckets to be flushed
5454

5555
self._flusher: "Optional[threading.Thread]" = None
5656
self._flusher_pid: "Optional[int]" = None
@@ -59,11 +59,10 @@ def _flush_loop(self) -> None:
5959
self._active.flag = True
6060
while self._running:
6161
jitter = random.random() * self.FLUSH_WAIT_TIME * 0.1
62-
try:
63-
trace_id = self._flush_queue.get(timeout=self.FLUSH_WAIT_TIME + jitter)
64-
self._flush(trace_id=trace_id)
65-
except EmptyError:
66-
pass
62+
self._flush_event.wait(timeout=self.FLUSH_WAIT_TIME + jitter)
63+
self._flush_event.clear()
64+
65+
self._flush(only_pending=True)
6766

6867
if (
6968
time.monotonic() - self._last_full_flush
@@ -100,25 +99,20 @@ def add(self, span: "StreamedSpan") -> None:
10099
self._span_buffer[span.trace_id].append(span)
101100
self._running_size[span.trace_id] += self._estimate_size(span)
102101

103-
if size + 1 >= self.MAX_BEFORE_FLUSH:
104-
self._flush_queue.put(span.trace_id)
105-
return
106-
107-
if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH:
108-
self._flush_queue.put(span.trace_id)
109-
return
102+
if (
103+
size + 1 >= self.MAX_BEFORE_FLUSH
104+
or self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH
105+
):
106+
self._pending_flush.add(span.trace_id)
107+
notify = True
108+
else:
109+
notify = False
110+
111+
if notify:
112+
self._flush_event.set()
110113
finally:
111114
self._active.flag = False
112115

113-
def kill(self) -> None:
114-
if self._flusher is None:
115-
return
116-
117-
self.flush()
118-
self._running = False
119-
self._flush_queue.put(None)
120-
self._flusher = None
121-
122116
@staticmethod
123117
def _estimate_size(item: "StreamedSpan") -> int:
124118
# Rough estimate of serialized span size that's quick to compute.
@@ -159,16 +153,18 @@ def _to_transport_format(item: "StreamedSpan") -> "Any":
159153

160154
return res
161155

162-
def _flush(self, trace_id: "Optional[str]" = None) -> None:
156+
def _flush(self, only_pending: bool = False) -> None:
163157
with self._lock:
164-
if len(self._span_buffer) == 0:
165-
return
166-
167-
if trace_id is None:
158+
if only_pending:
159+
buckets = list(self._pending_flush)
160+
else:
168161
# flush whole buffer, e.g. if the SDK is shutting down
169162
buckets = list(self._span_buffer.keys())
170-
else:
171-
buckets = [trace_id]
163+
164+
self._pending_flush.clear()
165+
166+
if not buckets:
167+
return
172168

173169
envelopes = []
174170

0 commit comments

Comments
 (0)