Skip to content

Create Class to Easily Monitor Feature Flag State Changes #341

@lannuttia

Description

@lannuttia

Is your feature request related to a problem? Please describe.
I would like to be able to use a feature flag to subscribe/unsubscribe a Kafka consumer essentially enabling or disabling consumption of messages.

Describe the solution you'd like
A solution that I think would help with this is essentially a FeatureFlag class that has a __init__ that takes all the same arguments as the UnleashClient's is_enabled method as well as an UnleashClient reference. This would use a context manager to hook into the UnleashClient's scheduler and monitor for feature flag updates. On exit, the context manager could clean up the job that monitors for updates. A callback can be registered and deregistered from being ran when a feature flag's status changes with a register and deregister method, respectively.

Describe alternatives you've considered
For this specific use case, I can't really think of any other alternative that would work quite as well.

Additional context
I have a prototype for this feature (minus the context and fallback function) here:

"""This module provides utilities for effectively utilizing feature flags."""
from contextlib import AbstractContextManager
from types import TracebackType
from typing import Callable, Self
from weakref import WeakSet

from apscheduler.events import EVENT_JOB_EXECUTED, JobExecutionEvent
from apscheduler.job import Job
from UnleashClient import UnleashClient


class FeatureFlag(AbstractContextManager):
    """This is a class for utilizing feature flags.

    :param feature_name: The feature name.
    :type  feature_name: str

    :param client: The Unleash client.
    :type client: UnleashClient
    """
    def __init__(self, feature_name: str, client: UnleashClient):
        self.feature_name = feature_name
        self._client = client
        self._update_job: Job | None = None
        self._callbacks: WeakSet[Callable[[bool], None]] = WeakSet()
        self._current_value = self._get_value()

    def __enter__(self) -> Self:
        def update_listener(event: JobExecutionEvent) -> None:
            if event.exception or event.job_id != self._client.fl_job.id:
                return
            self.enabled = self._get_value()

        self._update_job = self._client.unleash_scheduler.add_listener(
            update_listener, EVENT_JOB_EXECUTED
        )
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        if isinstance(self._update_job, Job):
            self._update_job.remove()

    def _get_value(self) -> bool:
        return bool(self._client.is_enabled(self.feature_name))

    def register(self, callback: Callable[[bool], None]) -> None:
        """This registers a callback to be ran when the value of a feature flag is updated.

        :param callback: The callback to be registered.
        :type callback: Callable[[bool], None]

        :returns: Nothing
        :rtype: None
        """
        if callback not in self._callbacks:
            callback(self.enabled)
        return self._callbacks.add(callback)

    def deregister(self, callback: Callable[[bool], None]) -> None:
        """This deregisters a callback from being ran when the value of a feature flag is updated.

        :param callback: The callback to be deregistered.
        :type callback: Callable[[bool], None]

        :returns: Nothing
        :rtype: None
        """
        return self._callbacks.remove(callback)

    @property
    def enabled(self) -> bool:
        """The current value of the feature flag.

        :returns: The current value of the feature flag.
        :rtype: bool
        """
        return self._current_value

    @enabled.setter
    def enabled(self, value: bool) -> None:
        if self._current_value is value:
            return

        self._current_value = value
        for callback in self._callbacks:
            callback(self._current_value)

I have tested this out in another project that I am working on. I think I should be able to submit a pull request that adds this and associated automated tests. I am mostly just wanting to check and make sure that this is something that the project thinks would be useful before I go and spend time creating and submitting a pull request for this.

Example Usage

from threading import Event

from UnleashClient import UnleashClient, FeatureFlag
with UnleashClient(...) as client, FeatureFlag("feature-name", client) as feature_flag:
    def _handle_feature_flag(enabled: bool) -> None:
        if enabled:
            print("enabled")
        else:
            print("disabled")
    feature_flag.register(_handle_feature_flag)
    Event().wait()

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

Status

For later

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions