1414if TYPE_CHECKING :
1515 from typing import Any , Callable , Optional
1616
17- from sentry_sdk .traces import StreamedSpan
17+ from sentry_sdk ._types import SpanJSON
1818
1919
20- class SpanBatcher (Batcher ["StreamedSpan " ]):
20+ class SpanBatcher (Batcher ["SpanJSON " ]):
2121 # MAX_BEFORE_FLUSH should be lower than MAX_BEFORE_DROP, so that there is
2222 # a bit of a buffer for spans that appear between the trigger to flush
2323 # and actually flushing the buffer.
@@ -43,7 +43,7 @@ def __init__(
4343 # by trace_id, so that we can then send the buckets each in its own
4444 # envelope.
4545 # trace_id -> span buffer
46- self ._span_buffer : dict [str , list ["StreamedSpan " ]] = defaultdict (list )
46+ self ._span_buffer : dict [str , list ["SpanJSON " ]] = defaultdict (list )
4747 self ._running_size : dict [str , int ] = defaultdict (lambda : 0 )
4848 self ._capture_func = capture_func
4949 self ._record_lost_func = record_lost_func
@@ -100,7 +100,7 @@ def _flush_loop(self) -> None:
100100 self ._flush ()
101101 self ._last_full_flush = time .monotonic ()
102102
103- def add (self , span : "StreamedSpan " ) -> None :
103+ def add (self , span : "SpanJSON " ) -> None :
104104 # Bail out if the current thread is already executing batcher code.
105105 # This prevents deadlocks when code running inside the batcher (e.g.
106106 # _add_to_envelope during flush, or _flush_event.wait/set) triggers
@@ -116,7 +116,7 @@ def add(self, span: "StreamedSpan") -> None:
116116 return None
117117
118118 with self ._lock :
119- size = len (self ._span_buffer [span . trace_id ])
119+ size = len (self ._span_buffer [span [ " trace_id" ] ])
120120 if size >= self .MAX_BEFORE_DROP :
121121 self ._record_lost_func (
122122 reason = "queue_overflow" ,
@@ -125,14 +125,15 @@ def add(self, span: "StreamedSpan") -> None:
125125 )
126126 return None
127127
128- self ._span_buffer [span . trace_id ].append (span )
129- self ._running_size [span . trace_id ] += self ._estimate_size (span )
128+ self ._span_buffer [span [ " trace_id" ] ].append (span )
129+ self ._running_size [span [ " trace_id" ] ] += self ._estimate_size (span )
130130
131131 if (
132132 size + 1 >= self .MAX_BEFORE_FLUSH
133- or self ._running_size [span .trace_id ] >= self .MAX_BYTES_BEFORE_FLUSH
133+ or self ._running_size [span ["trace_id" ]]
134+ >= self .MAX_BYTES_BEFORE_FLUSH
134135 ):
135- self ._pending_flush .add (span . trace_id )
136+ self ._pending_flush .add (span [ " trace_id" ] )
136137 notify = True
137138 else :
138139 notify = False
@@ -143,12 +144,12 @@ def add(self, span: "StreamedSpan") -> None:
143144 self ._active .flag = False
144145
145146 @staticmethod
146- def _estimate_size (item : "StreamedSpan " ) -> int :
147+ def _estimate_size (item : "SpanJSON " ) -> int :
147148 # Rough estimate of serialized span size that's quick to compute.
148149 # 210 is the rough size of the payload without attributes, and then we
149150 # estimate the attributes separately.
150151 estimate = 210
151- for value in item ._attributes .values ():
152+ for value in ( item .get ( "attributes" ) or {}) .values ():
152153 estimate += 50
153154
154155 if isinstance (value , str ):
@@ -159,26 +160,15 @@ def _estimate_size(item: "StreamedSpan") -> int:
159160 return estimate
160161
161162 @staticmethod
162- def _to_transport_format (item : "StreamedSpan" ) -> "Any" :
163- res : "dict[str, Any]" = {
164- "trace_id" : item .trace_id ,
165- "span_id" : item .span_id ,
166- "name" : item ._name if item ._name is not None else "<unlabeled span>" ,
167- "status" : item ._status ,
168- "is_segment" : item ._is_segment (),
169- "start_timestamp" : item ._start_timestamp .timestamp (),
170- }
171-
172- if item ._end_timestamp :
173- res ["end_timestamp" ] = item ._end_timestamp .timestamp ()
174-
175- if item ._parent_span_id :
176- res ["parent_span_id" ] = item ._parent_span_id
177-
178- if item ._attributes :
163+ def _to_transport_format (item : "SpanJSON" ) -> "Any" :
164+ res = {k : v for k , v in item .items () if k not in ("_segment_span" ,)}
165+
166+ if item .get ("attributes" ):
179167 res ["attributes" ] = {
180- k : serialize_attribute (v ) for (k , v ) in item . _attributes .items ()
168+ k : serialize_attribute (v ) for (k , v ) in item [ "attributes" ] .items ()
181169 }
170+ else :
171+ del res ["attributes" ]
182172
183173 return res
184174
@@ -202,7 +192,7 @@ def _flush(self, only_pending: bool = False) -> None:
202192 if not spans :
203193 continue
204194
205- dsc = spans [0 ]._dynamic_sampling_context ()
195+ dsc = spans [0 ][ "_segment_span" ] ._dynamic_sampling_context ()
206196
207197 # Max per envelope is 1000, so if we happen to have more than
208198 # 1000 spans in one bucket, we'll need to separate them.
0 commit comments