@@ -259,6 +259,11 @@ class ConditionDirective(str, Enum):
259259 ConditionDirective represents the possible directives that can be returned from a condition check.
260260 """
261261
262+ REMOVE = "remove"
263+ """
264+ REMOVE suggests that the current data source should be permanently removed from consideration.
265+ """
266+
262267 FALLBACK = "fallback"
263268 """
264269 FALLBACK suggests that this data source should be abandoned in favor of the next one.
@@ -269,6 +274,11 @@ class ConditionDirective(str, Enum):
269274 RECOVER suggests that we should try to return to the primary data source.
270275 """
271276
277+ FDV1 = "fdv1"
278+ """
279+ FDV1 suggests that we should immediately revert to the FDv1 fallback synchronizer.
280+ """
281+
272282 CONTINUE = "continue"
273283 """
274284 CONTINUE suggests that no action is needed and the current data source should keep running.
@@ -459,27 +469,17 @@ def synchronizer_loop(self: 'FDv2'):
459469 try :
460470 with self ._lock .write ():
461471 synchronizer : Synchronizer = synchronizers_list [current_index ].build (self ._config )
472+ self ._active_synchronizer = synchronizer
462473 if isinstance (synchronizer , DiagnosticSource ) and self ._diagnostic_accumulator is not None :
463474 synchronizer .set_diagnostic_accumulator (self ._diagnostic_accumulator )
464- self ._active_synchronizer = synchronizer
465475
466476 log .info ("Synchronizer %s (index %d) is starting" , synchronizer .name , current_index )
467477
468- # Determine which condition to check based on current position
469- def combined_condition (status : DataSourceStatus ) -> ConditionDirective :
470- # Recovery condition: only applies when not at first synchronizer
471- if current_index > 0 and self ._recovery_condition (status ):
472- return ConditionDirective .RECOVER
473- # Fallback condition: applies at any position
474- if self ._fallback_condition (status ):
475- return ConditionDirective .FALLBACK
476- return ConditionDirective .CONTINUE
477-
478- remove_sync , fallback_v1 , directive = self ._consume_synchronizer_results (
479- synchronizer , set_on_ready , combined_condition
478+ directive = self ._consume_synchronizer_results (
479+ synchronizer , set_on_ready , current_index != 0
480480 )
481481
482- if fallback_v1 :
482+ if directive == ConditionDirective . FDV1 :
483483 # Abandon all synchronizers and use only fdv1 fallback
484484 log .info ("Reverting to FDv1 fallback synchronizer" )
485485 if self ._fdv1_fallback_synchronizer_builder is not None :
@@ -494,8 +494,7 @@ def combined_condition(status: DataSourceStatus) -> ConditionDirective:
494494 )
495495 break
496496 continue
497-
498- if remove_sync :
497+ elif directive == ConditionDirective .REMOVE :
499498 # Permanent failure - remove synchronizer from list
500499 log .warning ("Synchronizer %s permanently failed, removing from list" , synchronizer .name )
501500 del synchronizers_list [current_index ]
@@ -515,9 +514,8 @@ def combined_condition(status: DataSourceStatus) -> ConditionDirective:
515514 # Note: If we deleted a middle element, current_index now points to
516515 # what was the next element (shifted down), which is correct
517516 continue
518-
519517 # Condition was met - determine next synchronizer based on directive
520- if directive == ConditionDirective .RECOVER :
518+ elif directive == ConditionDirective .RECOVER :
521519 log .info ("Recovery condition met, returning to first synchronizer" )
522520 current_index = 0
523521 elif directive == ConditionDirective .FALLBACK :
@@ -552,8 +550,8 @@ def _consume_synchronizer_results(
552550 self ,
553551 synchronizer : Synchronizer ,
554552 set_on_ready : Event ,
555- condition_func : Callable [[ DataSourceStatus ], ConditionDirective ]
556- ) -> tuple [ bool , bool , ConditionDirective ] :
553+ check_recovery : bool ,
554+ ) -> ConditionDirective :
557555 """
558556 Consume results from a synchronizer until a condition is met or it fails.
559557
@@ -594,14 +592,15 @@ def reader(self: 'FDv2'):
594592 if update == "check" :
595593 # Check condition periodically
596594 current_status = self ._data_source_status_provider .status
597- directive = condition_func (current_status )
598- if directive != ConditionDirective .CONTINUE :
599- return False , False , directive
595+ if check_recovery and self ._recovery_condition (current_status ):
596+ return ConditionDirective .RECOVER
597+ if self ._fallback_condition (current_status ):
598+ return ConditionDirective .FALLBACK
600599 continue
601600
602601 log .info ("Synchronizer %s update: %s" , synchronizer .name , update .state )
603602 if self ._stop_event .is_set ():
604- return False , False , ConditionDirective .CONTINUE
603+ return ConditionDirective .FALLBACK
605604
606605 # Handle the update
607606 if update .change_set is not None :
@@ -616,14 +615,14 @@ def reader(self: 'FDv2'):
616615
617616 # Check if we should revert to FDv1 immediately
618617 if update .revert_to_fdv1 :
619- return True , True , ConditionDirective .FALLBACK
618+ return ConditionDirective .FDV1
620619
621620 # Check for OFF state indicating permanent failure
622621 if update .state == DataSourceState .OFF :
623- return True , False , ConditionDirective .FALLBACK
622+ return ConditionDirective .REMOVE
624623 except Exception as e :
625624 log .error ("Error consuming synchronizer results: %s" , e )
626- return True , False , ConditionDirective .FALLBACK
625+ return ConditionDirective .REMOVE
627626 finally :
628627 synchronizer .stop ()
629628 timer .stop ()
@@ -633,7 +632,7 @@ def reader(self: 'FDv2'):
633632 # If we reach here, the synchronizer's iterator completed normally (no more updates)
634633 # For continuous synchronizers (streaming/polling), this is unexpected and indicates
635634 # the synchronizer can't provide more updates, so we should remove it and fall back
636- return True , False , ConditionDirective .FALLBACK
635+ return ConditionDirective .REMOVE
637636
638637 def _fallback_condition (self , status : DataSourceStatus ) -> bool :
639638 """
0 commit comments