diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 96a0240b..da314835 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,11 +6,13 @@ ## Upgrading - +- The old `Broadcast` class is deprecated in favour of the new auto-closing `BroadcastChannel`. + +- The `Anycast` class is deprecated, because of the lack of use-cases and the maintenance cost. ## New Features - +- There's a new `BroadcastChannel`, which returns a broadcast sender and a broadcast receiver. The channel is auto-closing, meaning when all the senders or all the receivers are closed, the channel is closed. ## Bug Fixes diff --git a/src/frequenz/channels/__init__.py b/src/frequenz/channels/__init__.py index a6a092a9..83618343 100644 --- a/src/frequenz/channels/__init__.py +++ b/src/frequenz/channels/__init__.py @@ -80,7 +80,7 @@ """ from ._anycast import Anycast -from ._broadcast import Broadcast +from ._broadcast import Broadcast, BroadcastChannel, BroadcastReceiver, BroadcastSender from ._exceptions import ChannelClosedError, ChannelError, Error from ._generic import ( ChannelMessageT, @@ -101,14 +101,26 @@ select, selected_from, ) -from ._sender import Sender, SenderClosedError, SenderError +from ._sender import ( + CloneableSender, + CloneableSubscribableSender, + Sender, + SenderClosedError, + SenderError, + SubscribableSender, +) __all__ = [ "Anycast", "Broadcast", + "BroadcastChannel", + "BroadcastReceiver", + "BroadcastSender", "ChannelClosedError", "ChannelError", "ChannelMessageT", + "CloneableSender", + "CloneableSubscribableSender", "Error", "ErroredChannelT_co", "LatestValueCache", @@ -128,6 +140,7 @@ "SenderError", "SenderMessageT_co", "SenderMessageT_contra", + "SubscribableSender", "UnhandledSelectedError", "merge", "select", diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index 5c3f8284..987c64ea 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -20,6 +20,12 @@ _logger = logging.getLogger(__name__) +@deprecated( + "Anycast channels are deprecated, because of the high cost of maintaining " + "something for which we don't have a use case. The implementation will " + "remain in the codebase in its current form at least until the next major " + "version." +) class Anycast(Generic[ChannelMessageT]): """A channel that delivers each message to exactly one receiver. diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index f55003a5..47e33767 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -16,12 +16,15 @@ from ._exceptions import ChannelClosedError from ._generic import ChannelMessageT from ._receiver import Receiver, ReceiverStoppedError -from ._sender import Sender, SenderClosedError, SenderError +from ._sender import CloneableSubscribableSender, SenderClosedError, SenderError _logger = logging.getLogger(__name__) -class Broadcast(Generic[ChannelMessageT]): +@deprecated("Please use BroadcastChannel instead.") +class Broadcast( # pylint: disable=too-many-instance-attributes + Generic[ChannelMessageT] +): """A channel that deliver all messages to all receivers. # Description @@ -184,7 +187,13 @@ async def main() -> None: ``` """ - def __init__(self, *, name: str, resend_latest: bool = False) -> None: + def __init__( + self, + *, + name: str, + resend_latest: bool = False, + auto_close: bool = False, + ) -> None: """Initialize this channel. Args: @@ -197,6 +206,8 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None: wait for the next message on the channel to arrive. It is safe to be set in data/reporting channels, but is not recommended for use in channels that stream control instructions. + auto_close: If True, the channel will be closed when all senders or all + receivers are closed. """ self._name: str = name """The name of the broadcast channel. @@ -207,8 +218,11 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None: self._recv_cv: Condition = Condition() """The condition to wait for data in the channel's buffer.""" + self._sender_count: int = 0 + """The number of senders attached to this channel.""" + self._receivers: dict[ - int, weakref.ReferenceType[_Receiver[ChannelMessageT]] + int, weakref.ReferenceType[BroadcastReceiver[ChannelMessageT]] ] = {} """The receivers attached to the channel, indexed by their hash().""" @@ -218,6 +232,9 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None: self._latest: ChannelMessageT | None = None """The latest message sent to the channel.""" + self._auto_close_enabled: bool = auto_close + """Whether to close the channel when all senders or all receivers are closed.""" + self.resend_latest: bool = resend_latest """Whether to resend the latest message to new receivers. @@ -269,13 +286,13 @@ async def close(self) -> None: # noqa: D402 """Close the channel, deprecated alias for `aclose()`.""" # noqa: D402 return await self.aclose() - def new_sender(self) -> Sender[ChannelMessageT]: + def new_sender(self) -> BroadcastSender[ChannelMessageT]: """Return a new sender attached to this channel.""" - return _Sender(self) + return BroadcastSender(self) def new_receiver( self, *, name: str | None = None, limit: int = 50, warn_on_overflow: bool = True - ) -> Receiver[ChannelMessageT]: + ) -> BroadcastReceiver[ChannelMessageT]: """Return a new receiver attached to this channel. Broadcast receivers have their own buffer, and when messages are not @@ -291,12 +308,12 @@ def new_receiver( Returns: A new receiver attached to this channel. """ - recv: _Receiver[ChannelMessageT] = _Receiver( + recv: BroadcastReceiver[ChannelMessageT] = BroadcastReceiver( self, name=name, limit=limit, warn_on_overflow=warn_on_overflow ) self._receivers[hash(recv)] = weakref.ref(recv) if self.resend_latest and self._latest is not None: - recv.enqueue(self._latest) + recv._enqueue(self._latest) # pylint: disable=protected-access return recv def __str__(self) -> str: @@ -317,12 +334,12 @@ def __repr__(self) -> str: _T = TypeVar("_T") -class _Sender(Sender[_T]): +class BroadcastSender(CloneableSubscribableSender[_T]): """A sender to send messages to the broadcast channel. Should not be created directly, but through the - [Broadcast.new_sender()][frequenz.channels.Broadcast.new_sender] - method. + [BroadcastSender.clone()][frequenz.channels.BroadcastSender.clone] + method of an existing sender. """ def __init__(self, channel: Broadcast[_T], /) -> None: @@ -337,6 +354,13 @@ def __init__(self, channel: Broadcast[_T], /) -> None: self._closed: bool = False """Whether this sender is closed.""" + self._channel._sender_count += 1 + + @property + def sender_count(self) -> int: + """Return the number of open senders attached to this sender's channel.""" + return self._channel._sender_count # pylint: disable=protected-access + @override async def send(self, message: _T, /) -> None: """Send a message to all broadcast receivers. @@ -357,16 +381,21 @@ async def send(self, message: _T, /) -> None: raise SenderError("The channel was closed", self) from ChannelClosedError( self._channel ) - self._channel._latest = message stale_refs = [] for _hash, recv_ref in self._channel._receivers.items(): recv = recv_ref() if recv is None: stale_refs.append(_hash) continue - recv.enqueue(message) + recv._enqueue(message) for _hash in stale_refs: del self._channel._receivers[_hash] + if self._channel._auto_close_enabled and len(self._channel._receivers) == 0: + await self._channel.aclose() + raise SenderError("The channel was closed", self) from ChannelClosedError( + self._channel + ) + self._channel._latest = message async with self._channel._recv_cv: self._channel._recv_cv.notify_all() # pylint: enable=protected-access @@ -379,7 +408,53 @@ async def aclose(self) -> None: attempt to send a message through a closed sender will raise a [SenderClosedError][frequenz.channels.SenderClosedError]. """ + if self._closed: + return self._closed = True + self._channel._sender_count -= 1 + + if ( + self._channel._sender_count == 0 # pylint: disable=protected-access + and self._channel._auto_close_enabled # pylint: disable=protected-access + ): + await self._channel.aclose() + + @override + def clone(self) -> BroadcastSender[_T]: + """Return a clone of this sender.""" + return BroadcastSender(self._channel) + + @override + def subscribe( + self, + name: str | None = None, + limit: int = 50, + warn_on_overflow: bool = True, + ) -> BroadcastReceiver[_T]: + """Return a new receiver attached to this sender's channel. + + Args: + name: A name to identify the receiver in the logs. + limit: Number of messages the receiver can hold in its buffer. + warn_on_overflow: Whether to log a warning when the receiver's buffer is + full and a message is dropped. + + Returns: + A new receiver attached to this sender's channel. + + Raises: + SenderError: If the underlying channel is closed. + SenderClosedError: If this sender is closed. + """ + if self._closed: + raise SenderClosedError(self) + if self._channel._closed: # pylint: disable=protected-access + raise SenderError("The channel was closed", self) from ChannelClosedError( + self._channel + ) + return self._channel.new_receiver( + name=name, limit=limit, warn_on_overflow=warn_on_overflow + ) def __str__(self) -> str: """Return a string representation of this sender.""" @@ -390,12 +465,12 @@ def __repr__(self) -> str: return f"{type(self).__name__}({self._channel!r})" -class _Receiver(Receiver[_T]): +class BroadcastReceiver(Receiver[_T]): """A receiver to receive messages from the broadcast channel. Should not be created directly, but through the - [Broadcast.new_receiver()][frequenz.channels.Broadcast.new_receiver] - method. + [BroadcastSender.subscribe()][frequenz.channels.BroadcastSender.subscribe] + method of an existing sender. """ def __init__( @@ -441,7 +516,7 @@ def __init__( self._closed: bool = False """Whether the receiver is closed.""" - def enqueue(self, message: _T, /) -> None: + def _enqueue(self, message: _T, /) -> None: """Put a message into this receiver's queue. To be called by broadcast senders. If the receiver's queue is already @@ -492,6 +567,9 @@ async def ready(self) -> bool: while len(self._q) == 0: if self._channel._closed or self._closed: return False + if self._channel._auto_close_enabled and self._channel._sender_count == 0: + await self._channel.aclose() + return False async with self._channel._recv_cv: await self._channel._recv_cv.wait() return True @@ -541,3 +619,214 @@ def __repr__(self) -> str: f"{type(self).__name__}(name={self._name!r}, limit={limit!r}, " f"{self._channel!r}):" ) + + +class BroadcastChannel( + tuple[BroadcastSender[ChannelMessageT], BroadcastReceiver[ChannelMessageT]] +): + """A channel that deliver all messages to all receivers. + + # Description + + [BroadcastChannel][frequenz.channels.BroadcastChannel]s can have multiple + [senders][frequenz.channels.BroadcastSender] and multiple + [receivers][frequenz.channels.BroadcastReceiver]. Each message sent through + any of the senders will be received by all receivers. + +
+ ```bob + .---------. msg1 msg1,msg2 .-----------. + | Sender +------. .---------->| Receiver | + '---------' | .----------. | '-----------' + +----->| Channel +-----+ + .---------. | '----------' | .-----------. + | Sender +------' '----------->| Receiver | + '---------' msg2 msg1,msg2 '-----------' + ``` +
+ + !!! Note inline end "Characteristics" + + * **Buffered:** Yes, with one buffer per receiver + * **Buffer full policy:** Drop oldest message + * **Multiple receivers:** Yes + * **Multiple senders:** Yes + * **Thread-safe:** No + + This channel is buffered, and when messages are not being consumed fast + enough and the buffer fills up, old messages will get dropped. + + Each receiver has its own buffer, so messages will only be dropped for + receivers that can't keep up with the senders, and not for the whole + channel. + + Instantiating this class will create a new broadcast channel, and return an + initial sender and a receiver. Further senders and receivers can be created + with the [BroadcastSender.clone()][frequenz.channels.BroadcastSender.clone], + and + [BroadcastSender.subscribe()][frequenz.channels.BroadcastSender.subscribe] + methods respectively. + + When a sender or a receiver is not needed anymore, it should be closed with + [`aclose()`][frequenz.channels.BroadcastSender.aclose] or + [`close()`][frequenz.channels.BroadcastReceiver.close]. This will prevent + further attempts to [`send()`][frequenz.channels.BroadcastSender.send] data, + and will allow receivers to drain the pending items on their queues, but + after that, subsequent [receive()][frequenz.channels.Receiver.receive] calls + will raise a + [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError]. + + When all senders of a channel are closed, all its receivers will be + automatically closed, and vice versa. + + This channel is useful, for example, to implement a pub/sub pattern, where + multiple consumers can subscribe to a channel to receive all messages. + + # Examples + + Example: Send a few numbers to a receiver + This is a very simple example that sends a few numbers from a single sender to + a single receiver. + + ```python + import asyncio + + from frequenz.channels import BroadcastChannel, Sender + + + async def send(sender: Sender[int]) -> None: + for message in range(3): + print(f"sending {message}") + await sender.send(message) + await sender.aclose() + + + async def main() -> None: + sender, receiver = BroadcastChannel[int](name="numbers") + + async with asyncio.TaskGroup() as task_group: + task_group.create_task(send(sender)) + for _ in range(3): + message = await receiver.receive() + print(f"received {message}") + await asyncio.sleep(0.1) # sleep (or work) with the data + + + asyncio.run(main()) + ``` + + The output should look something like (although the sending and received might + appear more interleaved): + + ``` + sending 0 + sending 1 + sending 2 + received 0 + received 1 + received 2 + ``` + + Example: Send a few number from multiple senders to multiple receivers + This is a more complex example that sends a few numbers from multiple senders to + multiple receivers, using a small buffer to force the senders to block. + + ```python + import asyncio + + from frequenz.channels import BroadcastChannel, Receiver, ReceiverStoppedError, Sender + + + async def send(name: str, sender: Sender[int], start: int, stop: int) -> None: + for message in range(start, stop): + print(f"{name} sending {message}") + await sender.send(message) + await sender.aclose() + + + async def recv(name: str, receiver: Receiver[int]) -> None: + try: + async for message in receiver: + print(f"{name} received {message}") + await asyncio.sleep(0.1) # sleep (or work) with the data + except ReceiverStoppedError: + pass + + + async def main() -> None: + sender_1, receiver_1 = BroadcastChannel[int](name="numbers") + sender_2 = sender_1.clone() + receiver_2 = sender_1.subscribe() + async with asyncio.TaskGroup() as task_group: + task_group.create_task(send("sender_1", sender_1, 10, 13)) + task_group.create_task(send("sender_2", sender_2, 20, 22)) + task_group.create_task(recv("receiver_1", receiver_1)) + task_group.create_task(recv("receiver_2", receiver_2)) + + + asyncio.run(main()) + ``` + + The output should look something like this(although the sending and received + might appear interleaved in a different way): + + ``` + sender_1 sending 10 + sender_1 sending 11 + sender_1 sending 12 + sender_2 sending 20 + sender_2 sending 21 + receiver_1 received 10 + receiver_1 received 11 + receiver_1 received 12 + receiver_1 received 20 + receiver_1 received 21 + receiver_2 received 10 + receiver_2 received 11 + receiver_2 received 12 + receiver_2 received 20 + receiver_2 received 21 + ``` + """ + + def __new__( + cls, + name: str, + resend_latest: bool = False, + limit: int = 50, + warn_on_overflow: bool = True, + ) -> BroadcastChannel[ChannelMessageT]: + """Create a new BroadcastChannel instance. + + Args: + name: The name of the channel. This is for logging purposes, and it will be + shown in the string representation of the channel. + resend_latest: When True, every time a new receiver is created with + `new_receiver`, the last message seen by the channel will be sent to the + new receiver automatically. This allows new receivers on slow streams to + get the latest message as soon as they are created, without having to + wait for the next message on the channel to arrive. It is safe to be + set in data/reporting channels, but is not recommended for use in + channels that stream control instructions. + limit: Number of messages the receivers can hold in their buffers. + warn_on_overflow: Whether to log a warning when a receiver's buffer is full + and a message is dropped. + + Returns: + A new BroadcastChannel instance that can be destructured into an initial + sender and receiver. + """ + channel = Broadcast[ChannelMessageT]( + name=name, resend_latest=resend_latest, auto_close=True + ) + return tuple.__new__( + cls, + ( + channel.new_sender(), + channel.new_receiver( + name=f"{name}_receiver", + limit=limit, + warn_on_overflow=warn_on_overflow, + ), + ), + ) diff --git a/src/frequenz/channels/_sender.py b/src/frequenz/channels/_sender.py index 908d00f8..093f708c 100644 --- a/src/frequenz/channels/_sender.py +++ b/src/frequenz/channels/_sender.py @@ -49,11 +49,14 @@ ``` """ +from __future__ import annotations + from abc import ABC, abstractmethod from typing import Generic from ._exceptions import Error from ._generic import SenderMessageT_co, SenderMessageT_contra +from ._receiver import Receiver class Sender(ABC, Generic[SenderMessageT_contra]): @@ -109,3 +112,35 @@ def __init__(self, sender: Sender[SenderMessageT_co]): sender: The [Sender][frequenz.channels.Sender] that was closed. """ super().__init__("Sender is closed", sender) + + +class SubscribableSender(Sender[SenderMessageT_contra], ABC): + """A [Sender][frequenz.channels.Sender] that can be subscribed to.""" + + @abstractmethod + def subscribe(self) -> Receiver[SenderMessageT_contra]: + """Subscribe to this sender. + + Returns: + A new receiver attached to this sender's channel. + """ + + +class CloneableSender(Sender[SenderMessageT_contra], ABC): + """A [Sender][frequenz.channels.Sender] that can be cloned.""" + + @abstractmethod + def clone(self) -> CloneableSender[SenderMessageT_contra]: + """Clone this sender. + + Returns: + A new sender that sends messages to the same channel as this sender. + """ + + +class CloneableSubscribableSender( + SubscribableSender[SenderMessageT_contra], + CloneableSender[SenderMessageT_contra], + ABC, +): + """A [Sender][frequenz.channels.Sender] that can be both cloned and subscribed to.""" diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index 0cc89f33..457e0659 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -12,6 +12,7 @@ from frequenz.channels import ( Broadcast, + BroadcastChannel, ChannelClosedError, Receiver, ReceiverStoppedError, @@ -107,7 +108,7 @@ async def test_broadcast_after_close() -> None: async def test_broadcast_overflow() -> None: """Ensure messages sent to full broadcast receivers get dropped.""" from frequenz.channels._broadcast import ( # pylint: disable=import-outside-toplevel - _Receiver, + BroadcastReceiver, ) bcast: Broadcast[int] = Broadcast(name="meter_5") @@ -117,9 +118,9 @@ async def test_broadcast_overflow() -> None: sender = bcast.new_sender() big_receiver = bcast.new_receiver(name="named-recv", limit=big_recv_size) - assert isinstance(big_receiver, _Receiver) + assert isinstance(big_receiver, BroadcastReceiver) small_receiver = bcast.new_receiver(limit=small_recv_size) - assert isinstance(small_receiver, _Receiver) + assert isinstance(small_receiver, BroadcastReceiver) async def drain_receivers() -> tuple[int, int]: big_sum = 0 @@ -425,3 +426,65 @@ async def test_broadcast_close_receiver() -> None: with pytest.raises(ReceiverStoppedError): _ = await receiver_2.receive() + + +async def test_broadcast_auto_close_1() -> None: + """Ensure broadcast auto close works when all receivers are closed.""" + sender, receiver = BroadcastChannel[int](name="auto-close-test") + + receiver_2 = sender.subscribe() + + await sender.send(1) + + assert (await receiver.receive()) == 1 + assert (await receiver_2.receive()) == 1 + + receiver.close() + + await sender.send(2) + + assert (await receiver_2.receive()) == 2 + + receiver_2.close() + + with pytest.raises(SenderError) as excinfo: + await sender.send(3) + assert isinstance(excinfo.value.__cause__, ChannelClosedError) + + +async def test_broadcast_auto_close_2() -> None: + """Ensure broadcast auto close works when all senders are closed.""" + sender, receiver = BroadcastChannel[int](name="auto-close-test") + + await sender.send(1) + + assert (await receiver.receive()) == 1 + + sender_2 = sender.clone() + + await sender.aclose() + + await sender_2.send(2) + + await sender_2.aclose() + + assert (await receiver.receive()) == 2 + + with pytest.raises(ReceiverStoppedError) as excinfo: + await receiver.receive() + assert isinstance(excinfo.value.__cause__, ChannelClosedError) + + +async def test_broadcast_closed_channels_remain_closed() -> None: + """Ensure that a closed channel can't be resurrected.""" + sender, receiver = BroadcastChannel[int](name="auto-close-test") + + receiver.close() + + with pytest.raises(SenderError) as excinfo: + await sender.send(1) + assert isinstance(excinfo.value.__cause__, ChannelClosedError) + + with pytest.raises(SenderError) as excinfo: + _ = sender.subscribe() + assert isinstance(excinfo.value.__cause__, ChannelClosedError)