@@ -135,7 +135,7 @@ def complete_deferred_fragment(
135135 successful_execution_groups = list (
136136 deferred_fragment_record .successful_execution_groups
137137 )
138- self ._remove_root_node ( deferred_fragment_record )
138+ del self ._root_nodes [ deferred_fragment_record ]
139139 for successful_execution_group in successful_execution_groups :
140140 pending_execution_group = successful_execution_group .pending_execution_group
141141 deferred_records = pending_execution_group .deferred_fragment_records
@@ -156,22 +156,18 @@ def remove_deferred_fragment(
156156 """Check if deferred fragment exists and remove it in that case."""
157157 if deferred_fragment_record not in self ._root_nodes :
158158 return False
159- self ._remove_root_node ( deferred_fragment_record )
159+ del self ._root_nodes [ deferred_fragment_record ]
160160 return True
161161
162162 def remove_stream (self , stream_record : StreamRecord ) -> None :
163163 """Remove a stream record as no longer pending."""
164- self ._remove_root_node ( stream_record )
164+ del self ._root_nodes [ stream_record ]
165165
166166 def stop_incremental_data (self ) -> None :
167167 """Stop the delivery of incremental data."""
168168 for future in self ._next_queue :
169169 future .cancel () # pragma: no cover
170170
171- def _remove_root_node (self , delivery_group : DeliveryGroup ) -> None :
172- """Remove root node."""
173- del self ._root_nodes [delivery_group ]
174-
175171 def _add_incremental_data_records (
176172 self ,
177173 incremental_data_records : Sequence [IncrementalDataRecord ],
0 commit comments