From 9a0c053c4450b989fa96fb2bcba0a1fabd7c5bef Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 9 Feb 2026 11:40:40 +0100 Subject: [PATCH 01/12] Make broadcast sender clonable and subscribable Signed-off-by: Sahas Subramanian --- src/frequenz/channels/_broadcast.py | 43 +++++++++++++++++++++++++++-- src/frequenz/channels/_sender.py | 35 +++++++++++++++++++++++ 2 files changed, 75 insertions(+), 3 deletions(-) diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index f55003a5..0c4329fa 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -16,7 +16,7 @@ from ._exceptions import ChannelClosedError from ._generic import ChannelMessageT from ._receiver import Receiver, ReceiverStoppedError -from ._sender import Sender, SenderClosedError, SenderError +from ._sender import ClonableSubscribableSender, SenderClosedError, SenderError _logger = logging.getLogger(__name__) @@ -269,7 +269,7 @@ async def close(self) -> None: # noqa: D402 """Close the channel, deprecated alias for `aclose()`.""" # noqa: D402 return await self.aclose() - def new_sender(self) -> Sender[ChannelMessageT]: + def new_sender(self) -> ClonableSubscribableSender[ChannelMessageT]: """Return a new sender attached to this channel.""" return _Sender(self) @@ -317,7 +317,7 @@ def __repr__(self) -> str: _T = TypeVar("_T") -class _Sender(Sender[_T]): +class _Sender(ClonableSubscribableSender[_T]): """A sender to send messages to the broadcast channel. Should not be created directly, but through the @@ -381,6 +381,43 @@ async def aclose(self) -> None: """ self._closed = True + @override + def clone(self) -> _Sender[_T]: + """Return a clone of this sender.""" + return _Sender(self._channel) + + @override + def subscribe( + self, + name: str | None = None, + limit: int = 50, + warn_on_overflow: bool = True, + ) -> Receiver[_T]: + """Return a new receiver attached to this sender's channel. + + Args: + name: A name to identify the receiver in the logs. + limit: Number of messages the receiver can hold in its buffer. + warn_on_overflow: Whether to log a warning when the receiver's buffer is + full and a message is dropped. + + Returns: + A new receiver attached to this sender's channel. + + Raises: + SenderError: If the underlying channel is closed. + SenderClosedError: If this sender is closed. + """ + if self._closed: + raise SenderClosedError(self) + if self._channel._closed: # pylint: disable=protected-access + raise SenderError("The channel was closed", self) from ChannelClosedError( + self._channel + ) + return self._channel.new_receiver( + name=name, limit=limit, warn_on_overflow=warn_on_overflow + ) + def __str__(self) -> str: """Return a string representation of this sender.""" return f"{self._channel}:{type(self).__name__}" diff --git a/src/frequenz/channels/_sender.py b/src/frequenz/channels/_sender.py index 908d00f8..4c1a6d07 100644 --- a/src/frequenz/channels/_sender.py +++ b/src/frequenz/channels/_sender.py @@ -49,11 +49,14 @@ ``` """ +from __future__ import annotations + from abc import ABC, abstractmethod from typing import Generic from ._exceptions import Error from ._generic import SenderMessageT_co, SenderMessageT_contra +from ._receiver import Receiver class Sender(ABC, Generic[SenderMessageT_contra]): @@ -109,3 +112,35 @@ def __init__(self, sender: Sender[SenderMessageT_co]): sender: The [Sender][frequenz.channels.Sender] that was closed. """ super().__init__("Sender is closed", sender) + + +class SubscribableSender(Sender[SenderMessageT_contra], ABC): + """A [Sender][frequenz.channels.Sender] that can be subscribed to.""" + + @abstractmethod + def subscribe(self) -> Receiver[SenderMessageT_contra]: + """Subscribe to this sender. + + Returns: + A new sender that sends messages to the same channel as this sender. + """ + + +class ClonableSender(Sender[SenderMessageT_contra], ABC): + """A [Sender][frequenz.channels.Sender] that can be cloned.""" + + @abstractmethod + def clone(self) -> ClonableSender[SenderMessageT_contra]: + """Clone this sender. + + Returns: + A new sender that sends messages to the same channel as this sender. + """ + + +class ClonableSubscribableSender( + SubscribableSender[SenderMessageT_contra], + ClonableSender[SenderMessageT_contra], + ABC, +): + """A [Sender][frequenz.channels.Sender] that can be both cloned and subscribed to.""" From e696143fdeabec90323124b1aee96704cab69e9f Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 9 Feb 2026 12:19:28 +0100 Subject: [PATCH 02/12] Track sender count on sender create and close Signed-off-by: Sahas Subramanian --- src/frequenz/channels/_broadcast.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 0c4329fa..fd2d6632 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -207,6 +207,9 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None: self._recv_cv: Condition = Condition() """The condition to wait for data in the channel's buffer.""" + self._sender_count: int = 0 + """The number of senders attached to this channel.""" + self._receivers: dict[ int, weakref.ReferenceType[_Receiver[ChannelMessageT]] ] = {} @@ -337,6 +340,13 @@ def __init__(self, channel: Broadcast[_T], /) -> None: self._closed: bool = False """Whether this sender is closed.""" + self._channel._sender_count += 1 + + @property + def sender_count(self) -> int: + """Return the number of open senders attached to this sender's channel.""" + return self._channel._sender_count # pylint: disable=protected-access + @override async def send(self, message: _T, /) -> None: """Send a message to all broadcast receivers. @@ -379,7 +389,15 @@ async def aclose(self) -> None: attempt to send a message through a closed sender will raise a [SenderClosedError][frequenz.channels.SenderClosedError]. """ + if self._closed: + return self._closed = True + self._channel._sender_count -= 1 + + def __del__(self) -> None: + """Clean up this sender.""" + if not self._closed: + self._channel._sender_count -= 1 @override def clone(self) -> _Sender[_T]: From 31426a3d9129cdffd530c2a9b5441b4ab757062c Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 9 Mar 2026 13:54:03 +0100 Subject: [PATCH 03/12] Expose strongly-typed Senders and Receivers from Broadcast Signed-off-by: Sahas Subramanian --- src/frequenz/channels/__init__.py | 4 +++- src/frequenz/channels/_broadcast.py | 20 ++++++++++---------- tests/test_broadcast.py | 6 +++--- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/src/frequenz/channels/__init__.py b/src/frequenz/channels/__init__.py index a6a092a9..d8fc6a04 100644 --- a/src/frequenz/channels/__init__.py +++ b/src/frequenz/channels/__init__.py @@ -80,7 +80,7 @@ """ from ._anycast import Anycast -from ._broadcast import Broadcast +from ._broadcast import Broadcast, BroadcastReceiver, BroadcastSender from ._exceptions import ChannelClosedError, ChannelError, Error from ._generic import ( ChannelMessageT, @@ -106,6 +106,8 @@ __all__ = [ "Anycast", "Broadcast", + "BroadcastReceiver", + "BroadcastSender", "ChannelClosedError", "ChannelError", "ChannelMessageT", diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index fd2d6632..959e7a03 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -211,7 +211,7 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None: """The number of senders attached to this channel.""" self._receivers: dict[ - int, weakref.ReferenceType[_Receiver[ChannelMessageT]] + int, weakref.ReferenceType[BroadcastReceiver[ChannelMessageT]] ] = {} """The receivers attached to the channel, indexed by their hash().""" @@ -272,13 +272,13 @@ async def close(self) -> None: # noqa: D402 """Close the channel, deprecated alias for `aclose()`.""" # noqa: D402 return await self.aclose() - def new_sender(self) -> ClonableSubscribableSender[ChannelMessageT]: + def new_sender(self) -> BroadcastSender[ChannelMessageT]: """Return a new sender attached to this channel.""" - return _Sender(self) + return BroadcastSender(self) def new_receiver( self, *, name: str | None = None, limit: int = 50, warn_on_overflow: bool = True - ) -> Receiver[ChannelMessageT]: + ) -> BroadcastReceiver[ChannelMessageT]: """Return a new receiver attached to this channel. Broadcast receivers have their own buffer, and when messages are not @@ -294,7 +294,7 @@ def new_receiver( Returns: A new receiver attached to this channel. """ - recv: _Receiver[ChannelMessageT] = _Receiver( + recv: BroadcastReceiver[ChannelMessageT] = BroadcastReceiver( self, name=name, limit=limit, warn_on_overflow=warn_on_overflow ) self._receivers[hash(recv)] = weakref.ref(recv) @@ -320,7 +320,7 @@ def __repr__(self) -> str: _T = TypeVar("_T") -class _Sender(ClonableSubscribableSender[_T]): +class BroadcastSender(ClonableSubscribableSender[_T]): """A sender to send messages to the broadcast channel. Should not be created directly, but through the @@ -400,9 +400,9 @@ def __del__(self) -> None: self._channel._sender_count -= 1 @override - def clone(self) -> _Sender[_T]: + def clone(self) -> BroadcastSender[_T]: """Return a clone of this sender.""" - return _Sender(self._channel) + return BroadcastSender(self._channel) @override def subscribe( @@ -410,7 +410,7 @@ def subscribe( name: str | None = None, limit: int = 50, warn_on_overflow: bool = True, - ) -> Receiver[_T]: + ) -> BroadcastReceiver[_T]: """Return a new receiver attached to this sender's channel. Args: @@ -445,7 +445,7 @@ def __repr__(self) -> str: return f"{type(self).__name__}({self._channel!r})" -class _Receiver(Receiver[_T]): +class BroadcastReceiver(Receiver[_T]): """A receiver to receive messages from the broadcast channel. Should not be created directly, but through the diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index 0cc89f33..e2b8b199 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -107,7 +107,7 @@ async def test_broadcast_after_close() -> None: async def test_broadcast_overflow() -> None: """Ensure messages sent to full broadcast receivers get dropped.""" from frequenz.channels._broadcast import ( # pylint: disable=import-outside-toplevel - _Receiver, + BroadcastReceiver, ) bcast: Broadcast[int] = Broadcast(name="meter_5") @@ -117,9 +117,9 @@ async def test_broadcast_overflow() -> None: sender = bcast.new_sender() big_receiver = bcast.new_receiver(name="named-recv", limit=big_recv_size) - assert isinstance(big_receiver, _Receiver) + assert isinstance(big_receiver, BroadcastReceiver) small_receiver = bcast.new_receiver(limit=small_recv_size) - assert isinstance(small_receiver, _Receiver) + assert isinstance(small_receiver, BroadcastReceiver) async def drain_receivers() -> tuple[int, int]: big_sum = 0 From 389574c9aa2fdca879ed08dee15eb60b7103a78d Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 9 Feb 2026 16:39:58 +0100 Subject: [PATCH 04/12] Implement auto-close support for Broadcast channels Signed-off-by: Sahas Subramanian --- src/frequenz/channels/_broadcast.py | 33 +++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 959e7a03..4bb79b6f 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -21,7 +21,9 @@ _logger = logging.getLogger(__name__) -class Broadcast(Generic[ChannelMessageT]): +class Broadcast( # pylint: disable=too-many-instance-attributes + Generic[ChannelMessageT] +): """A channel that deliver all messages to all receivers. # Description @@ -184,7 +186,13 @@ async def main() -> None: ``` """ - def __init__(self, *, name: str, resend_latest: bool = False) -> None: + def __init__( + self, + *, + name: str, + resend_latest: bool = False, + auto_close: bool = False, + ) -> None: """Initialize this channel. Args: @@ -197,6 +205,8 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None: wait for the next message on the channel to arrive. It is safe to be set in data/reporting channels, but is not recommended for use in channels that stream control instructions. + auto_close: If True, the channel will be closed when all senders or all + receivers are closed. """ self._name: str = name """The name of the broadcast channel. @@ -221,6 +231,9 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None: self._latest: ChannelMessageT | None = None """The latest message sent to the channel.""" + self._auto_close_enabled: bool = auto_close + """Whether to close the channel when all senders or all receivers are closed.""" + self.resend_latest: bool = resend_latest """Whether to resend the latest message to new receivers. @@ -367,6 +380,10 @@ async def send(self, message: _T, /) -> None: raise SenderError("The channel was closed", self) from ChannelClosedError( self._channel ) + if self._channel._auto_close_enabled and len(self._channel._receivers) == 0: + raise SenderError("The channel was closed", self) from ChannelClosedError( + self._channel + ) self._channel._latest = message stale_refs = [] for _hash, recv_ref in self._channel._receivers.items(): @@ -394,10 +411,11 @@ async def aclose(self) -> None: self._closed = True self._channel._sender_count -= 1 - def __del__(self) -> None: - """Clean up this sender.""" - if not self._closed: - self._channel._sender_count -= 1 + if ( + self._channel._sender_count == 0 # pylint: disable=protected-access + and self._channel._auto_close_enabled # pylint: disable=protected-access + ): + await self._channel.aclose() @override def clone(self) -> BroadcastSender[_T]: @@ -547,6 +565,9 @@ async def ready(self) -> bool: while len(self._q) == 0: if self._channel._closed or self._closed: return False + if self._channel._auto_close_enabled and self._channel._sender_count == 0: + await self._channel.aclose() + return False async with self._channel._recv_cv: await self._channel._recv_cv.wait() return True From ddfab00303d447a08499405cb6a61b1748d68a16 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 9 Feb 2026 16:45:38 +0100 Subject: [PATCH 05/12] Add `BroadcastChannel` and deprecate `Broadcast` class The `BroadcastChannel` would return a sender and a receiver from an auto-closing channel. Signed-off-by: Sahas Subramanian --- src/frequenz/channels/__init__.py | 15 +- src/frequenz/channels/_broadcast.py | 220 +++++++++++++++++++++++++++- src/frequenz/channels/_sender.py | 2 +- 3 files changed, 230 insertions(+), 7 deletions(-) diff --git a/src/frequenz/channels/__init__.py b/src/frequenz/channels/__init__.py index d8fc6a04..08bf2b2d 100644 --- a/src/frequenz/channels/__init__.py +++ b/src/frequenz/channels/__init__.py @@ -80,7 +80,7 @@ """ from ._anycast import Anycast -from ._broadcast import Broadcast, BroadcastReceiver, BroadcastSender +from ._broadcast import Broadcast, BroadcastChannel, BroadcastReceiver, BroadcastSender from ._exceptions import ChannelClosedError, ChannelError, Error from ._generic import ( ChannelMessageT, @@ -101,16 +101,26 @@ select, selected_from, ) -from ._sender import Sender, SenderClosedError, SenderError +from ._sender import ( + ClonableSender, + ClonableSubscribableSender, + Sender, + SenderClosedError, + SenderError, + SubscribableSender, +) __all__ = [ "Anycast", "Broadcast", + "BroadcastChannel", "BroadcastReceiver", "BroadcastSender", "ChannelClosedError", "ChannelError", "ChannelMessageT", + "ClonableSender", + "ClonableSubscribableSender", "Error", "ErroredChannelT_co", "LatestValueCache", @@ -130,6 +140,7 @@ "SenderError", "SenderMessageT_co", "SenderMessageT_contra", + "SubscribableSender", "UnhandledSelectedError", "merge", "select", diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 4bb79b6f..a6570703 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -21,6 +21,7 @@ _logger = logging.getLogger(__name__) +@deprecated("Please use BroadcastChannel instead.") class Broadcast( # pylint: disable=too-many-instance-attributes Generic[ChannelMessageT] ): @@ -337,8 +338,8 @@ class BroadcastSender(ClonableSubscribableSender[_T]): """A sender to send messages to the broadcast channel. Should not be created directly, but through the - [Broadcast.new_sender()][frequenz.channels.Broadcast.new_sender] - method. + [BroadcastSender.clone()][frequenz.channels.BroadcastSender.clone] + method of an existing sender. """ def __init__(self, channel: Broadcast[_T], /) -> None: @@ -467,8 +468,8 @@ class BroadcastReceiver(Receiver[_T]): """A receiver to receive messages from the broadcast channel. Should not be created directly, but through the - [Broadcast.new_receiver()][frequenz.channels.Broadcast.new_receiver] - method. + [BroadcastSender.subscribe()][frequenz.channels.BroadcastSender.subscribe] + method of an existing sender. """ def __init__( @@ -617,3 +618,214 @@ def __repr__(self) -> str: f"{type(self).__name__}(name={self._name!r}, limit={limit!r}, " f"{self._channel!r}):" ) + + +class BroadcastChannel( + tuple[BroadcastSender[ChannelMessageT], BroadcastReceiver[ChannelMessageT]] +): + """A channel that deliver all messages to all receivers. + + # Description + + [BroadcastChannel][frequenz.channels.BroadcastChannel]s can have multiple + [senders][frequenz.channels.BroadcastSender] and multiple + [receivers][frequenz.channels.BroadcastReceiver]. Each message sent through + any of the senders will be received by all receivers. + +
+ ```bob + .---------. msg1 msg1,msg2 .-----------. + | Sender +------. .---------->| Receiver | + '---------' | .----------. | '-----------' + +----->| Channel +-----+ + .---------. | '----------' | .-----------. + | Sender +------' '----------->| Receiver | + '---------' msg2 msg1,msg2 '-----------' + ``` +
+ + !!! Note inline end "Characteristics" + + * **Buffered:** Yes, with one buffer per receiver + * **Buffer full policy:** Drop oldest message + * **Multiple receivers:** Yes + * **Multiple senders:** Yes + * **Thread-safe:** No + + This channel is buffered, and when messages are not being consumed fast + enough and the buffer fills up, old messages will get dropped. + + Each receiver has its own buffer, so messages will only be dropped for + receivers that can't keep up with the senders, and not for the whole + channel. + + Instantiating this class will create a new broadcast channel, and return an + initial sender and a receiver. Further senders and receivers can be created + with the [BroadcastSender.clone()][frequenz.channels.BroadcastSender.clone], + and + [BroadcastSender.subscribe()][frequenz.channels.BroadcastSender.subscribe] + methods respectively. + + When a sender or a receiver is not needed anymore, it should be closed with + [`aclose()`][frequenz.channels.BroadcastSender.aclose] or + [`close()`][frequenz.channels.BroadcastReceiver.close]. This will prevent + further attempts to [`send()`][frequenz.channels.BroadcastSender.send] data, + and will allow receivers to drain the pending items on their queues, but + after that, subsequent [receive()][frequenz.channels.Receiver.receive] calls + will raise a + [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError]. + + When all senders of a channel are closed, all its receivers will be + automatically closed, and vice versa. + + This channel is useful, for example, to implement a pub/sub pattern, where + multiple consumers can subscribe to a channel to receive all messages. + + # Examples + + Example: Send a few numbers to a receiver + This is a very simple example that sends a few numbers from a single sender to + a single receiver. + + ```python + import asyncio + + from frequenz.channels import BroadcastChannel, Sender + + + async def send(sender: Sender[int]) -> None: + for message in range(3): + print(f"sending {message}") + await sender.send(message) + await sender.aclose() + + + async def main() -> None: + sender, receiver = BroadcastChannel[int](name="numbers") + + async with asyncio.TaskGroup() as task_group: + task_group.create_task(send(sender)) + for _ in range(3): + message = await receiver.receive() + print(f"received {message}") + await asyncio.sleep(0.1) # sleep (or work) with the data + + + asyncio.run(main()) + ``` + + The output should look something like (although the sending and received might + appear more interleaved): + + ``` + sending 0 + sending 1 + sending 2 + received 0 + received 1 + received 2 + ``` + + Example: Send a few number from multiple senders to multiple receivers + This is a more complex example that sends a few numbers from multiple senders to + multiple receivers, using a small buffer to force the senders to block. + + ```python + import asyncio + + from frequenz.channels import BroadcastChannel, Receiver, ReceiverStoppedError, Sender + + + async def send(name: str, sender: Sender[int], start: int, stop: int) -> None: + for message in range(start, stop): + print(f"{name} sending {message}") + await sender.send(message) + await sender.aclose() + + + async def recv(name: str, receiver: Receiver[int]) -> None: + try: + async for message in receiver: + print(f"{name} received {message}") + await asyncio.sleep(0.1) # sleep (or work) with the data + except ReceiverStoppedError: + pass + + + async def main() -> None: + sender_1, receiver_1 = BroadcastChannel[int](name="numbers") + sender_2 = sender_1.clone() + receiver_2 = sender_1.subscribe() + async with asyncio.TaskGroup() as task_group: + task_group.create_task(send("sender_1", sender_1, 10, 13)) + task_group.create_task(send("sender_2", sender_2, 20, 22)) + task_group.create_task(recv("receiver_1", receiver_1)) + task_group.create_task(recv("receiver_2", receiver_2)) + + + asyncio.run(main()) + ``` + + The output should look something like this(although the sending and received + might appear interleaved in a different way): + + ``` + sender_1 sending 10 + sender_1 sending 11 + sender_1 sending 12 + sender_2 sending 20 + sender_2 sending 21 + receiver_1 received 10 + receiver_1 received 11 + receiver_1 received 12 + receiver_1 received 20 + receiver_1 received 21 + receiver_2 received 10 + receiver_2 received 11 + receiver_2 received 12 + receiver_2 received 20 + receiver_2 received 21 + ``` + """ + + def __new__( + cls, + name: str, + resend_latest: bool = False, + limit: int = 50, + warn_on_overflow: bool = True, + ) -> BroadcastChannel[ChannelMessageT]: + """Create a new BroadcastChannel instance. + + Args: + name: The name of the channel. This is for logging purposes, and it will be + shown in the string representation of the channel. + resend_latest: When True, every time a new receiver is created with + `new_receiver`, the last message seen by the channel will be sent to the + new receiver automatically. This allows new receivers on slow streams to + get the latest message as soon as they are created, without having to + wait for the next message on the channel to arrive. It is safe to be + set in data/reporting channels, but is not recommended for use in + channels that stream control instructions. + limit: Number of messages the receivers can hold in their buffers. + warn_on_overflow: Whether to log a warning when a receiver's buffer is full + and a message is dropped. + + Returns: + A new BroadcastChannel instance that can be destructured into an initial + sender and receiver. + """ + channel = Broadcast[ChannelMessageT]( + name=name, resend_latest=resend_latest, auto_close=True + ) + return tuple.__new__( + cls, + ( + channel.new_sender(), + channel.new_receiver( + name=f"{name}_receiver", + limit=limit, + warn_on_overflow=warn_on_overflow, + ), + ), + ) diff --git a/src/frequenz/channels/_sender.py b/src/frequenz/channels/_sender.py index 4c1a6d07..0c27436a 100644 --- a/src/frequenz/channels/_sender.py +++ b/src/frequenz/channels/_sender.py @@ -122,7 +122,7 @@ def subscribe(self) -> Receiver[SenderMessageT_contra]: """Subscribe to this sender. Returns: - A new sender that sends messages to the same channel as this sender. + A new receiver attached to this sender's channel. """ From 7a35af5d177435bd7ae63218e794f1ca72c2b838 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 9 Feb 2026 16:48:42 +0100 Subject: [PATCH 06/12] Test the auto-close feature of broadcast channels Signed-off-by: Sahas Subramanian --- tests/test_broadcast.py | 48 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index e2b8b199..1f391844 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -12,6 +12,7 @@ from frequenz.channels import ( Broadcast, + BroadcastChannel, ChannelClosedError, Receiver, ReceiverStoppedError, @@ -425,3 +426,50 @@ async def test_broadcast_close_receiver() -> None: with pytest.raises(ReceiverStoppedError): _ = await receiver_2.receive() + + +async def test_broadcast_auto_close_1() -> None: + """Ensure broadcast auto close works when all receivers are closed.""" + sender, receiver = BroadcastChannel[int](name="auto-close-test") + + receiver_2 = sender.subscribe() + + await sender.send(1) + + assert (await receiver.receive()) == 1 + assert (await receiver_2.receive()) == 1 + + receiver.close() + + await sender.send(2) + + assert (await receiver_2.receive()) == 2 + + receiver_2.close() + + with pytest.raises(SenderError) as excinfo: + await sender.send(3) + assert isinstance(excinfo.value.__cause__, ChannelClosedError) + + +async def test_broadcast_auto_close_2() -> None: + """Ensure broadcast auto close works when all senders are closed.""" + sender, receiver = BroadcastChannel[int](name="auto-close-test") + + await sender.send(1) + + assert (await receiver.receive()) == 1 + + sender_2 = sender.clone() + + await sender.aclose() + + await sender_2.send(2) + + await sender_2.aclose() + + assert (await receiver.receive()) == 2 + + with pytest.raises(ReceiverStoppedError) as excinfo: + await receiver.receive() + assert isinstance(excinfo.value.__cause__, ChannelClosedError) From 90c10dbf8307da262e3e03ba2502a214b33b8106 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 30 Mar 2026 11:00:34 +0200 Subject: [PATCH 07/12] Deprecate Anycast channels Signed-off-by: Sahas Subramanian --- src/frequenz/channels/_anycast.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index 5c3f8284..987c64ea 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -20,6 +20,12 @@ _logger = logging.getLogger(__name__) +@deprecated( + "Anycast channels are deprecated, because of the high cost of maintaining " + "something for which we don't have a use case. The implementation will " + "remain in the codebase in its current form at least until the next major " + "version." +) class Anycast(Generic[ChannelMessageT]): """A channel that delivers each message to exactly one receiver. From af8723f588d4c3381b073ce6eefbebdc898c789c Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 30 Mar 2026 11:51:07 +0200 Subject: [PATCH 08/12] =?UTF-8?q?Rename=20`BroadcastReceiver.enqueue`=20?= =?UTF-8?q?=E2=86=92=20`=5Fenqueue`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Now that `BroadcastReceiver` is a public type, its internal methods need to become protected. Signed-off-by: Sahas Subramanian --- src/frequenz/channels/_broadcast.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index a6570703..635c8b19 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -313,7 +313,7 @@ def new_receiver( ) self._receivers[hash(recv)] = weakref.ref(recv) if self.resend_latest and self._latest is not None: - recv.enqueue(self._latest) + recv._enqueue(self._latest) # pylint: disable=protected-access return recv def __str__(self) -> str: @@ -392,7 +392,7 @@ async def send(self, message: _T, /) -> None: if recv is None: stale_refs.append(_hash) continue - recv.enqueue(message) + recv._enqueue(message) for _hash in stale_refs: del self._channel._receivers[_hash] async with self._channel._recv_cv: @@ -515,7 +515,7 @@ def __init__( self._closed: bool = False """Whether the receiver is closed.""" - def enqueue(self, message: _T, /) -> None: + def _enqueue(self, message: _T, /) -> None: """Put a message into this receiver's queue. To be called by broadcast senders. If the receiver's queue is already From 417f55eab7cf982fade953b6a7b2da4b87d85b06 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 30 Mar 2026 11:59:47 +0200 Subject: [PATCH 09/12] Update release notes Signed-off-by: Sahas Subramanian --- RELEASE_NOTES.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 96a0240b..da314835 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,11 +6,13 @@ ## Upgrading - +- The old `Broadcast` class is deprecated in favour of the new auto-closing `BroadcastChannel`. + +- The `Anycast` class is deprecated, because of the lack of use-cases and the maintenance cost. ## New Features - +- There's a new `BroadcastChannel`, which returns a broadcast sender and a broadcast receiver. The channel is auto-closing, meaning when all the senders or all the receivers are closed, the channel is closed. ## Bug Fixes From 8559a81e567b0e58ae59cca4e7e6edb2ef6743ba Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 30 Mar 2026 13:32:06 +0200 Subject: [PATCH 10/12] Check for empty receivers after expired weakrefs are pruned Signed-off-by: Sahas Subramanian --- src/frequenz/channels/_broadcast.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 635c8b19..6ddae918 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -381,11 +381,6 @@ async def send(self, message: _T, /) -> None: raise SenderError("The channel was closed", self) from ChannelClosedError( self._channel ) - if self._channel._auto_close_enabled and len(self._channel._receivers) == 0: - raise SenderError("The channel was closed", self) from ChannelClosedError( - self._channel - ) - self._channel._latest = message stale_refs = [] for _hash, recv_ref in self._channel._receivers.items(): recv = recv_ref() @@ -395,6 +390,11 @@ async def send(self, message: _T, /) -> None: recv._enqueue(message) for _hash in stale_refs: del self._channel._receivers[_hash] + if self._channel._auto_close_enabled and len(self._channel._receivers) == 0: + raise SenderError("The channel was closed", self) from ChannelClosedError( + self._channel + ) + self._channel._latest = message async with self._channel._recv_cv: self._channel._recv_cv.notify_all() # pylint: enable=protected-access From 5351d41467430136ec5ad68115f6b46d6e0595d6 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 30 Mar 2026 15:11:00 +0200 Subject: [PATCH 11/12] =?UTF-8?q?Rename=20`Clonable*Sender`=20=E2=86=92=20?= =?UTF-8?q?`Cloneable*Sender`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sahas Subramanian --- src/frequenz/channels/__init__.py | 8 ++++---- src/frequenz/channels/_broadcast.py | 4 ++-- src/frequenz/channels/_sender.py | 8 ++++---- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/frequenz/channels/__init__.py b/src/frequenz/channels/__init__.py index 08bf2b2d..83618343 100644 --- a/src/frequenz/channels/__init__.py +++ b/src/frequenz/channels/__init__.py @@ -102,8 +102,8 @@ selected_from, ) from ._sender import ( - ClonableSender, - ClonableSubscribableSender, + CloneableSender, + CloneableSubscribableSender, Sender, SenderClosedError, SenderError, @@ -119,8 +119,8 @@ "ChannelClosedError", "ChannelError", "ChannelMessageT", - "ClonableSender", - "ClonableSubscribableSender", + "CloneableSender", + "CloneableSubscribableSender", "Error", "ErroredChannelT_co", "LatestValueCache", diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 6ddae918..b02e8ebb 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -16,7 +16,7 @@ from ._exceptions import ChannelClosedError from ._generic import ChannelMessageT from ._receiver import Receiver, ReceiverStoppedError -from ._sender import ClonableSubscribableSender, SenderClosedError, SenderError +from ._sender import CloneableSubscribableSender, SenderClosedError, SenderError _logger = logging.getLogger(__name__) @@ -334,7 +334,7 @@ def __repr__(self) -> str: _T = TypeVar("_T") -class BroadcastSender(ClonableSubscribableSender[_T]): +class BroadcastSender(CloneableSubscribableSender[_T]): """A sender to send messages to the broadcast channel. Should not be created directly, but through the diff --git a/src/frequenz/channels/_sender.py b/src/frequenz/channels/_sender.py index 0c27436a..093f708c 100644 --- a/src/frequenz/channels/_sender.py +++ b/src/frequenz/channels/_sender.py @@ -126,11 +126,11 @@ def subscribe(self) -> Receiver[SenderMessageT_contra]: """ -class ClonableSender(Sender[SenderMessageT_contra], ABC): +class CloneableSender(Sender[SenderMessageT_contra], ABC): """A [Sender][frequenz.channels.Sender] that can be cloned.""" @abstractmethod - def clone(self) -> ClonableSender[SenderMessageT_contra]: + def clone(self) -> CloneableSender[SenderMessageT_contra]: """Clone this sender. Returns: @@ -138,9 +138,9 @@ def clone(self) -> ClonableSender[SenderMessageT_contra]: """ -class ClonableSubscribableSender( +class CloneableSubscribableSender( SubscribableSender[SenderMessageT_contra], - ClonableSender[SenderMessageT_contra], + CloneableSender[SenderMessageT_contra], ABC, ): """A [Sender][frequenz.channels.Sender] that can be both cloned and subscribed to.""" From 3843e3f656fb7e5641f32c9e8c43fcf5a06bf427 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Tue, 7 Apr 2026 11:30:28 +0200 Subject: [PATCH 12/12] Close underlying channel when all receivers are gone Signed-off-by: Sahas Subramanian --- src/frequenz/channels/_broadcast.py | 1 + tests/test_broadcast.py | 15 +++++++++++++++ 2 files changed, 16 insertions(+) diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index b02e8ebb..47e33767 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -391,6 +391,7 @@ async def send(self, message: _T, /) -> None: for _hash in stale_refs: del self._channel._receivers[_hash] if self._channel._auto_close_enabled and len(self._channel._receivers) == 0: + await self._channel.aclose() raise SenderError("The channel was closed", self) from ChannelClosedError( self._channel ) diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index 1f391844..457e0659 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -473,3 +473,18 @@ async def test_broadcast_auto_close_2() -> None: with pytest.raises(ReceiverStoppedError) as excinfo: await receiver.receive() assert isinstance(excinfo.value.__cause__, ChannelClosedError) + + +async def test_broadcast_closed_channels_remain_closed() -> None: + """Ensure that a closed channel can't be resurrected.""" + sender, receiver = BroadcastChannel[int](name="auto-close-test") + + receiver.close() + + with pytest.raises(SenderError) as excinfo: + await sender.send(1) + assert isinstance(excinfo.value.__cause__, ChannelClosedError) + + with pytest.raises(SenderError) as excinfo: + _ = sender.subscribe() + assert isinstance(excinfo.value.__cause__, ChannelClosedError)