|
| 1 | +import logging |
| 2 | +import threading |
| 3 | +from typing import Callable, Dict, Optional |
| 4 | + |
| 5 | +import requests |
| 6 | + |
| 7 | +from ldclient.context import Context |
| 8 | +from ldclient.interfaces import FlagChange, FlagTracker, FlagValueChange |
| 9 | + |
| 10 | +log = logging.getLogger('testservice') |
| 11 | + |
| 12 | + |
| 13 | +class ListenerRegistry: |
| 14 | + """Manages all active flag change listener registrations for a single SDK client entity.""" |
| 15 | + |
| 16 | + def __init__(self, tracker: FlagTracker): |
| 17 | + self._tracker = tracker |
| 18 | + self._lock = threading.Lock() |
| 19 | + # Maps listener_id -> (sdk_listener callable, cleanup function) |
| 20 | + self._listeners: Dict[str, Callable] = {} |
| 21 | + |
| 22 | + def register_flag_change_listener(self, listener_id: str, callback_uri: str): |
| 23 | + """Register a general flag change listener that fires on any flag configuration change.""" |
| 24 | + def on_flag_change(flag_change: FlagChange): |
| 25 | + payload = { |
| 26 | + 'listenerId': listener_id, |
| 27 | + 'flagKey': flag_change.key, |
| 28 | + } |
| 29 | + try: |
| 30 | + requests.post(callback_uri, json=payload) |
| 31 | + except Exception as e: |
| 32 | + log.warning('Failed to post flag change notification: %s', e) |
| 33 | + |
| 34 | + with self._lock: |
| 35 | + # If a listener with this ID already exists, unregister the old one first |
| 36 | + if listener_id in self._listeners: |
| 37 | + self._tracker.remove_listener(self._listeners[listener_id]) |
| 38 | + |
| 39 | + self._listeners[listener_id] = on_flag_change |
| 40 | + |
| 41 | + self._tracker.add_listener(on_flag_change) |
| 42 | + |
| 43 | + def register_flag_value_change_listener( |
| 44 | + self, |
| 45 | + listener_id: str, |
| 46 | + flag_key: str, |
| 47 | + context: Context, |
| 48 | + default_value, |
| 49 | + callback_uri: str, |
| 50 | + ): |
| 51 | + """Register a flag value change listener that fires when the evaluated value changes.""" |
| 52 | + def on_value_change(change: FlagValueChange): |
| 53 | + payload = { |
| 54 | + 'listenerId': listener_id, |
| 55 | + 'flagKey': change.key, |
| 56 | + 'oldValue': change.old_value, |
| 57 | + 'newValue': change.new_value, |
| 58 | + } |
| 59 | + try: |
| 60 | + requests.post(callback_uri, json=payload) |
| 61 | + except Exception as e: |
| 62 | + log.warning('Failed to post flag value change notification: %s', e) |
| 63 | + |
| 64 | + # add_flag_value_change_listener returns the underlying listener |
| 65 | + # that must be passed to remove_listener to unsubscribe |
| 66 | + underlying_listener = self._tracker.add_flag_value_change_listener(flag_key, context, on_value_change) |
| 67 | + |
| 68 | + with self._lock: |
| 69 | + if listener_id in self._listeners: |
| 70 | + self._tracker.remove_listener(self._listeners[listener_id]) |
| 71 | + |
| 72 | + self._listeners[listener_id] = underlying_listener |
| 73 | + |
| 74 | + def unregister(self, listener_id: str) -> bool: |
| 75 | + """Unregister a previously registered listener. Returns False if not found.""" |
| 76 | + with self._lock: |
| 77 | + listener = self._listeners.pop(listener_id, None) |
| 78 | + |
| 79 | + if listener is None: |
| 80 | + return False |
| 81 | + |
| 82 | + self._tracker.remove_listener(listener) |
| 83 | + return True |
| 84 | + |
| 85 | + def close_all(self): |
| 86 | + """Unregister all listeners. Called when the SDK client entity shuts down.""" |
| 87 | + with self._lock: |
| 88 | + listeners = dict(self._listeners) |
| 89 | + self._listeners.clear() |
| 90 | + |
| 91 | + for listener in listeners.values(): |
| 92 | + self._tracker.remove_listener(listener) |
0 commit comments