Is your feature request related to a problem? Please describe.
I would like to be able to use a feature flag to subscribe/unsubscribe a Kafka consumer essentially enabling or disabling consumption of messages.
Describe the solution you'd like
A solution that I think would help with this is essentially a FeatureFlag class that has a __init__ that takes all the same arguments as the UnleashClient's is_enabled method as well as an UnleashClient reference. This would use a context manager to hook into the UnleashClient's scheduler and monitor for feature flag updates. On exit, the context manager could clean up the job that monitors for updates. A callback can be registered and deregistered from being ran when a feature flag's status changes with a register and deregister method, respectively.
Describe alternatives you've considered
For this specific use case, I can't really think of any other alternative that would work quite as well.
Additional context
I have a prototype for this feature (minus the context and fallback function) here:
"""This module provides utilities for effectively utilizing feature flags."""
from contextlib import AbstractContextManager
from types import TracebackType
from typing import Callable, Self
from weakref import WeakSet
from apscheduler.events import EVENT_JOB_EXECUTED, JobExecutionEvent
from apscheduler.job import Job
from UnleashClient import UnleashClient
class FeatureFlag(AbstractContextManager):
"""This is a class for utilizing feature flags.
:param feature_name: The feature name.
:type feature_name: str
:param client: The Unleash client.
:type client: UnleashClient
"""
def __init__(self, feature_name: str, client: UnleashClient):
self.feature_name = feature_name
self._client = client
self._update_job: Job | None = None
self._callbacks: WeakSet[Callable[[bool], None]] = WeakSet()
self._current_value = self._get_value()
def __enter__(self) -> Self:
def update_listener(event: JobExecutionEvent) -> None:
if event.exception or event.job_id != self._client.fl_job.id:
return
self.enabled = self._get_value()
self._update_job = self._client.unleash_scheduler.add_listener(
update_listener, EVENT_JOB_EXECUTED
)
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
if isinstance(self._update_job, Job):
self._update_job.remove()
def _get_value(self) -> bool:
return bool(self._client.is_enabled(self.feature_name))
def register(self, callback: Callable[[bool], None]) -> None:
"""This registers a callback to be ran when the value of a feature flag is updated.
:param callback: The callback to be registered.
:type callback: Callable[[bool], None]
:returns: Nothing
:rtype: None
"""
if callback not in self._callbacks:
callback(self.enabled)
return self._callbacks.add(callback)
def deregister(self, callback: Callable[[bool], None]) -> None:
"""This deregisters a callback from being ran when the value of a feature flag is updated.
:param callback: The callback to be deregistered.
:type callback: Callable[[bool], None]
:returns: Nothing
:rtype: None
"""
return self._callbacks.remove(callback)
@property
def enabled(self) -> bool:
"""The current value of the feature flag.
:returns: The current value of the feature flag.
:rtype: bool
"""
return self._current_value
@enabled.setter
def enabled(self, value: bool) -> None:
if self._current_value is value:
return
self._current_value = value
for callback in self._callbacks:
callback(self._current_value)
I have tested this out in another project that I am working on. I think I should be able to submit a pull request that adds this and associated automated tests. I am mostly just wanting to check and make sure that this is something that the project thinks would be useful before I go and spend time creating and submitting a pull request for this.
Example Usage
from threading import Event
from UnleashClient import UnleashClient, FeatureFlag
with UnleashClient(...) as client, FeatureFlag("feature-name", client) as feature_flag:
def _handle_feature_flag(enabled: bool) -> None:
if enabled:
print("enabled")
else:
print("disabled")
feature_flag.register(_handle_feature_flag)
Event().wait()
Is your feature request related to a problem? Please describe.
I would like to be able to use a feature flag to subscribe/unsubscribe a Kafka consumer essentially enabling or disabling consumption of messages.
Describe the solution you'd like
A solution that I think would help with this is essentially a
FeatureFlagclass that has a__init__that takes all the same arguments as theUnleashClient'sis_enabledmethod as well as anUnleashClientreference. This would use a context manager to hook into theUnleashClient's scheduler and monitor for feature flag updates. On exit, the context manager could clean up the job that monitors for updates. A callback can be registered and deregistered from being ran when a feature flag's status changes with aregisterandderegistermethod, respectively.Describe alternatives you've considered
For this specific use case, I can't really think of any other alternative that would work quite as well.
Additional context
I have a prototype for this feature (minus the context and fallback function) here:
I have tested this out in another project that I am working on. I think I should be able to submit a pull request that adds this and associated automated tests. I am mostly just wanting to check and make sure that this is something that the project thinks would be useful before I go and spend time creating and submitting a pull request for this.
Example Usage