Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
b8a37c1
Retrieve data stream via oneshot channel
simonvoelcker Mar 9, 2026
cab6b8d
Use oneshot channel setup in resampler
simonvoelcker Mar 9, 2026
8e6f2ac
Use oneshot channel setup in GridFrequency
simonvoelcker Mar 9, 2026
dfef151
Use oneshot channel setup in VoltageStreamer
simonvoelcker Mar 9, 2026
0a5fbeb
Added oneshot channel to ReportRequest
simonvoelcker Mar 9, 2026
215cfe7
Using oneshot channel for report requests in all use cases
simonvoelcker Mar 9, 2026
d8ec935
Stopped using ChannelRegistry in benchmarks
simonvoelcker Mar 5, 2026
ea7a169
Remove ChannelRegistry
simonvoelcker Mar 5, 2026
f3e6b47
Keep a reference to Pipe objects to prevent it from being garbage col…
simonvoelcker Mar 9, 2026
d575130
Deprecate GridFrequency.new_receiver in favor of .subscribe
simonvoelcker Mar 9, 2026
b03e0b1
Update to channels changes
simonvoelcker Mar 10, 2026
eda22e9
Address PR comments
simonvoelcker Mar 10, 2026
d2384ea
Split resampling actor subscribe-logic for better readability
simonvoelcker Mar 10, 2026
25d31b4
Use more precise channel sender and receiver types
simonvoelcker Mar 10, 2026
af6d3a7
Update dependencies after rebase
simonvoelcker Mar 16, 2026
4c4f70d
Remove remaining mentions of the channel registry
simonvoelcker Apr 2, 2026
342d826
Send new stream receiver for existing channels in microgrid API source
simonvoelcker Apr 2, 2026
4ea3f5d
Rename PowerManagingActor._channel_lookup to _channels for consistency
simonvoelcker Apr 2, 2026
0c733df
Send stream receiver even for existing channel in power managing actor
simonvoelcker Apr 2, 2026
3e65895
Cleaner way of mapping samples in grid frequency
simonvoelcker Apr 2, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions benchmarks/timeseries/benchmark_datasourcing.py
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, did you run the benchmark before and after the changes to see if there are any significant performance changes?

Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@
from time import perf_counter
from typing import Any

from frequenz.channels import Broadcast, Receiver, ReceiverStoppedError
from frequenz.channels import Broadcast, OneshotChannel, Receiver, ReceiverStoppedError
from frequenz.channels._broadcast import BroadcastReceiver
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from frequenz.channels._broadcast import BroadcastReceiver
from frequenz.channels import BroadcastReceiver

Always use public imports. If the symbols are not exported publicly, we need to fix it in the channels library.

from frequenz.client.microgrid.metrics import Metric
from frequenz.quantities import Quantity

from frequenz.sdk import microgrid
from frequenz.sdk._internal._channels import ChannelRegistry
from frequenz.sdk.microgrid._data_sourcing import (
ComponentMetricRequest,
DataSourcingActor,
)
from frequenz.sdk.timeseries import Sample

try:
from tests.timeseries.mock_microgrid import MockMicrogrid
Expand Down Expand Up @@ -80,7 +82,6 @@ async def benchmark_data_sourcing( # pylint: disable=too-many-locals
name="DataSourcingActor Request Channel"
)

channel_registry = ChannelRegistry(name="Microgrid Channel Registry")
request_receiver = request_channel.new_receiver(
name="datasourcing-benchmark",
limit=(num_ev_chargers * len(COMPONENT_METRIC_IDS)),
Expand All @@ -105,18 +106,21 @@ async def consume(channel: Receiver[Any]) -> None:

for evc_id in mock_grid.evc_ids:
for component_metric_id in COMPONENT_METRIC_IDS:
telem_stream_sender, telem_stream_receiver = OneshotChannel[
BroadcastReceiver[Sample[Quantity]]
]()
request = ComponentMetricRequest(
"current_phase_requests", evc_id, component_metric_id, None
namespace="current_phase_requests",
component_id=evc_id,
metric=component_metric_id,
start_time=None,
telem_stream_sender=telem_stream_sender,
)

recv_channel = channel_registry.get_or_create(
ComponentMetricRequest, request.get_channel_name()
).new_receiver()

await request_sender.send(request)
consume_tasks.append(asyncio.create_task(consume(recv_channel)))
stream_receiver = await telem_stream_receiver.receive()
consume_tasks.append(asyncio.create_task(consume(stream_receiver)))

async with DataSourcingActor(request_receiver, channel_registry):
async with DataSourcingActor(request_receiver):
await asyncio.gather(*consume_tasks)

time_taken = perf_counter() - start_time
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ dependencies = [
"frequenz-client-microgrid >= 0.18.1, < 0.19.0",
"frequenz-microgrid-component-graph >= 0.3.4, < 0.4",
"frequenz-client-common >= 0.3.6, < 0.4.0",
"frequenz-channels >= 1.6.1, < 2.0.0",
"frequenz-channels @ git+https://github.com/shsms/frequenz-channels-python.git@auto-closing-broadcast",
"frequenz-quantities[marshmallow] >= 1.0.0, < 2.0.0",
"numpy >= 2.1.0, < 3",
"typing_extensions >= 4.14.1, < 5",
Expand Down
142 changes: 1 addition & 141 deletions src/frequenz/sdk/_internal/_channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
"""General purpose classes for use with channels."""

import abc
import dataclasses
import logging
import traceback
import typing

from frequenz.channels import Broadcast, Receiver
from frequenz.channels import Receiver

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -61,141 +59,3 @@ def new_receiver(self, *, limit: int = 50) -> Receiver[U_co]:
A receiver instance.
"""
return self._mapping_function(self._fetcher.new_receiver(limit=limit))


class ChannelRegistry:
"""Dynamically creates, own and provide access to broadcast channels.

It can be used by actors to dynamically establish a communication channel
between each other.

The registry is responsible for creating channels when they are first requested via
the [`get_or_create()`][frequenz.sdk.actor.ChannelRegistry.get_or_create] method.

The registry also stores type information to make sure that the same channel is not
used for different message types.

Since the registry owns the channels, it is also responsible for closing them when
they are no longer needed. There is no way to remove a channel without closing it.

Note:
This registry stores [`Broadcast`][frequenz.channels.Broadcast] channels.
"""

def __init__(self, *, name: str) -> None:
"""Initialize this registry.

Args:
name: A name to identify the registry in the logs. This name is also used as
a prefix for the channel names.
"""
self._name = name
self._channels: dict[str, _Entry] = {}

@property
def name(self) -> str:
"""The name of this registry."""
return self._name

def message_type(self, key: str) -> type:
"""Get the message type of the channel for the given key.

Args:
key: The key to identify the channel.

Returns:
The message type of the channel.

Raises:
KeyError: If the channel does not exist.
"""
entry = self._channels.get(key)
if entry is None:
raise KeyError(f"No channel for key {key!r} exists.")
return entry.message_type

def __contains__(self, key: str) -> bool:
"""Check whether the channel for the given `key` exists."""
return key in self._channels

def get_or_create(self, message_type: type[T], key: str) -> Broadcast[T]:
"""Get or create a channel for the given key.

If a channel for the given key already exists, the message type of the existing
channel is checked against the requested message type. If they do not match,
a `ValueError` is raised.

Note:
The types have to match exactly, it doesn't do a subtype check due to
technical limitations. In the future subtype checks might be supported.

Args:
message_type: The type of the message that is sent through the channel.
key: The key to identify the channel.

Returns:
The channel for the given key.

Raises:
ValueError: If the channel exists and the message type does not match.
"""
if key not in self._channels:
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(
"Creating a new channel for key %r with type %s at:\n%s",
key,
message_type,
"".join(traceback.format_stack(limit=10)[:9]),
)
self._channels[key] = _Entry(
message_type, Broadcast(name=f"{self._name}-{key}")
)

entry = self._channels[key]
if entry.message_type is not message_type:
error_message = (
f"Type mismatch, a channel for key {key!r} exists and the requested "
f"message type {message_type} is not the same as the existing "
f"message type {entry.message_type}."
)
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(
"%s at:\n%s",
error_message,
# We skip the last frame because it's this method, and limit the
# stack to 9 frames to avoid adding too much noise.
"".join(traceback.format_stack(limit=10)[:9]),
)
raise ValueError(error_message)

return typing.cast(Broadcast[T], entry.channel)

async def close_and_remove(self, key: str) -> None:
"""Remove the channel for the given key.

Args:
key: The key to identify the channel.

Raises:
KeyError: If the channel does not exist.
"""
entry = self._channels.pop(key, None)
if entry is None:
raise KeyError(f"No channel for key {key!r} exists.")
await entry.channel.close()


@dataclasses.dataclass(frozen=True)
class _Entry:
"""An entry in a channel registry."""

message_type: type
"""The type of the message that is sent through the channel in this entry."""

# We use object instead of Any to minimize the chances of hindering type checking.
# If for some reason the channel is not casted to the proper underlaying type, when
# using object at least accessing any member that's not part of the object base
# class will yield a type error, while if we used Any, it would not and the issue
# would be much harder to find.
channel: Broadcast[object]
"""The channel in this entry."""
23 changes: 1 addition & 22 deletions src/frequenz/sdk/microgrid/_data_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from frequenz.client.common.microgrid.components import ComponentId
from frequenz.client.microgrid.component import Battery, EvCharger, SolarInverter

from .._internal._channels import ChannelRegistry
from ..actor._actor import Actor
from ..timeseries import ResamplerConfig
from ..timeseries._voltage_streamer import VoltageStreamer
Expand Down Expand Up @@ -96,29 +95,22 @@ def __init__(
"""
self._resampler_config: ResamplerConfig = resampler_config

self._channel_registry: ChannelRegistry = ChannelRegistry(
name="Data Pipeline Registry"
)

self._data_sourcing_actor: _ActorInfo | None = None
self._resampling_actor: _ActorInfo | None = None

self._battery_power_wrapper = PowerWrapper(
self._channel_registry,
api_power_request_timeout=api_power_request_timeout,
power_manager_algorithm=battery_power_manager_algorithm,
default_power=DefaultPower.ZERO,
component_class=Battery,
)
self._ev_power_wrapper = PowerWrapper(
self._channel_registry,
api_power_request_timeout=api_power_request_timeout,
power_manager_algorithm=PowerManagerAlgorithm.MATRYOSHKA,
default_power=DefaultPower.MAX,
component_class=EvCharger,
)
self._pv_power_wrapper = PowerWrapper(
self._channel_registry,
api_power_request_timeout=api_power_request_timeout,
power_manager_algorithm=PowerManagerAlgorithm.MATRYOSHKA,
default_power=DefaultPower.MIN,
Expand Down Expand Up @@ -158,18 +150,14 @@ def frequency(self) -> GridFrequency:
if self._frequency_instance is None:
self._frequency_instance = GridFrequency(
self._data_sourcing_request_sender(),
self._channel_registry,
)

return self._frequency_instance

def voltage_per_phase(self) -> VoltageStreamer:
"""Return the per-phase voltage measuring point."""
if not self._voltage_instance:
self._voltage_instance = VoltageStreamer(
self._resampling_request_sender(),
self._channel_registry,
)
self._voltage_instance = VoltageStreamer(self._resampling_request_sender())

return self._voltage_instance

Expand All @@ -179,7 +167,6 @@ def logical_meter(self) -> LogicalMeter:

if self._logical_meter is None:
self._logical_meter = LogicalMeter(
channel_registry=self._channel_registry,
resampler_subscription_sender=self._resampling_request_sender(),
)
return self._logical_meter
Expand All @@ -190,7 +177,6 @@ def consumer(self) -> Consumer:

if self._consumer is None:
self._consumer = Consumer(
channel_registry=self._channel_registry,
resampler_subscription_sender=self._resampling_request_sender(),
)
return self._consumer
Expand All @@ -201,7 +187,6 @@ def producer(self) -> Producer:

if self._producer is None:
self._producer = Producer(
channel_registry=self._channel_registry,
resampler_subscription_sender=self._resampling_request_sender(),
)
return self._producer
Expand All @@ -213,7 +198,6 @@ def grid(self) -> Grid:

if self._grid is None:
initialize_grid(
channel_registry=self._channel_registry,
resampler_subscription_sender=self._resampling_request_sender(),
)
self._grid = get_grid()
Expand Down Expand Up @@ -273,7 +257,6 @@ def new_ev_charger_pool(
if ref_store_key not in self._ev_charger_pool_reference_stores:
self._ev_charger_pool_reference_stores[ref_store_key] = (
EVChargerPoolReferenceStore(
channel_registry=self._channel_registry,
resampler_subscription_sender=self._resampling_request_sender(),
status_receiver=self._ev_power_wrapper.status_channel.new_receiver(
limit=1
Expand Down Expand Up @@ -346,7 +329,6 @@ def new_pv_pool(

if ref_store_key not in self._pv_pool_reference_stores:
self._pv_pool_reference_stores[ref_store_key] = PVPoolReferenceStore(
channel_registry=self._channel_registry,
resampler_subscription_sender=self._resampling_request_sender(),
status_receiver=(
self._pv_power_wrapper.status_channel.new_receiver(limit=1)
Expand Down Expand Up @@ -422,7 +404,6 @@ def new_battery_pool(
if ref_store_key not in self._battery_pool_reference_stores:
self._battery_pool_reference_stores[ref_store_key] = (
BatteryPoolReferenceStore(
channel_registry=self._channel_registry,
resampler_subscription_sender=self._resampling_request_sender(),
batteries_status_receiver=(
self._battery_power_wrapper.status_channel.new_receiver(limit=1)
Expand Down Expand Up @@ -461,7 +442,6 @@ def _data_sourcing_request_sender(self) -> Sender[ComponentMetricRequest]:
)
actor = DataSourcingActor(
request_receiver=channel.new_receiver(limit=_REQUEST_RECV_BUFFER_SIZE),
registry=self._channel_registry,
)
self._data_sourcing_actor = _ActorInfo(actor, channel)
self._data_sourcing_actor.actor.start()
Expand All @@ -482,7 +462,6 @@ def _resampling_request_sender(self) -> Sender[ComponentMetricRequest]:
name="Data Pipeline: Component Metric Resampling Actor Request Channel"
)
actor = ComponentMetricsResamplingActor(
channel_registry=self._channel_registry,
data_sourcing_request_sender=self._data_sourcing_request_sender(),
resampling_request_receiver=channel.new_receiver(
limit=_REQUEST_RECV_BUFFER_SIZE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,18 @@
from dataclasses import dataclass
from datetime import datetime

from frequenz.channels._broadcast import BroadcastReceiver
from frequenz.channels._oneshot import OneshotSender
Comment on lines +9 to +10
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once more, I will stop it here as I guess you get the point, but this should be fixed in all files.

Suggested change
from frequenz.channels._broadcast import BroadcastReceiver
from frequenz.channels._oneshot import OneshotSender
from frequenz.channels import BroadcastReceiver
from frequenz.channels import OneshotSender

from frequenz.client.common.microgrid.components import ComponentId
from frequenz.client.microgrid.metrics import Metric
from frequenz.quantities import Quantity

from frequenz.sdk.microgrid._old_component_data import TransitionalMetric

__all__ = ["ComponentMetricRequest", "Metric"]

from frequenz.sdk.timeseries import Sample


@dataclass
class ComponentMetricRequest:
Expand All @@ -30,10 +35,8 @@ class ComponentMetricRequest:
`namespace`, `component_id`, and `metric` will use the same channel, preventing
unnecessary duplication of data streams.

The requester and provider must use the same channel name so that they can
independently retrieve the same channel from the `ChannelRegistry`. This is
achieved by using the `get_channel_name` method to generate the name on both sides
based on parameters set by the requesters.
The `get_channel_name` method can be used to obtain a name that uniquely identifies
the metric data that will be sent through the telemetry stream.
"""

namespace: str
Expand All @@ -51,6 +54,9 @@ class ComponentMetricRequest:
If None, only live data is streamed.
"""

telem_stream_sender: OneshotSender[BroadcastReceiver[Sample[Quantity]]]
"""Sender of a oneshot channel used to send the data stream back to the requester."""

def get_channel_name(self) -> str:
"""Construct the channel name based on the request parameters.

Expand Down
Loading
Loading