-
Notifications
You must be signed in to change notification settings - Fork 21
Remove channel registry #1371
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: v1.x.x
Are you sure you want to change the base?
Remove channel registry #1371
Changes from all commits
b8a37c1
cab6b8d
8e6f2ac
dfef151
0a5fbeb
215cfe7
d8ec935
ea7a169
f3e6b47
d575130
b03e0b1
eda22e9
d2384ea
25d31b4
af6d3a7
4c4f70d
342d826
4ea3f5d
0c733df
3e65895
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -16,15 +16,17 @@ | |||||
| from time import perf_counter | ||||||
| from typing import Any | ||||||
|
|
||||||
| from frequenz.channels import Broadcast, Receiver, ReceiverStoppedError | ||||||
| from frequenz.channels import Broadcast, OneshotChannel, Receiver, ReceiverStoppedError | ||||||
| from frequenz.channels._broadcast import BroadcastReceiver | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Always use public imports. If the symbols are not exported publicly, we need to fix it in the channels library. |
||||||
| from frequenz.client.microgrid.metrics import Metric | ||||||
| from frequenz.quantities import Quantity | ||||||
|
|
||||||
| from frequenz.sdk import microgrid | ||||||
| from frequenz.sdk._internal._channels import ChannelRegistry | ||||||
| from frequenz.sdk.microgrid._data_sourcing import ( | ||||||
| ComponentMetricRequest, | ||||||
| DataSourcingActor, | ||||||
| ) | ||||||
| from frequenz.sdk.timeseries import Sample | ||||||
|
|
||||||
| try: | ||||||
| from tests.timeseries.mock_microgrid import MockMicrogrid | ||||||
|
|
@@ -80,7 +82,6 @@ async def benchmark_data_sourcing( # pylint: disable=too-many-locals | |||||
| name="DataSourcingActor Request Channel" | ||||||
| ) | ||||||
|
|
||||||
| channel_registry = ChannelRegistry(name="Microgrid Channel Registry") | ||||||
| request_receiver = request_channel.new_receiver( | ||||||
| name="datasourcing-benchmark", | ||||||
| limit=(num_ev_chargers * len(COMPONENT_METRIC_IDS)), | ||||||
|
|
@@ -105,18 +106,21 @@ async def consume(channel: Receiver[Any]) -> None: | |||||
|
|
||||||
| for evc_id in mock_grid.evc_ids: | ||||||
| for component_metric_id in COMPONENT_METRIC_IDS: | ||||||
| telem_stream_sender, telem_stream_receiver = OneshotChannel[ | ||||||
| BroadcastReceiver[Sample[Quantity]] | ||||||
| ]() | ||||||
| request = ComponentMetricRequest( | ||||||
| "current_phase_requests", evc_id, component_metric_id, None | ||||||
| namespace="current_phase_requests", | ||||||
| component_id=evc_id, | ||||||
| metric=component_metric_id, | ||||||
| start_time=None, | ||||||
| telem_stream_sender=telem_stream_sender, | ||||||
| ) | ||||||
|
|
||||||
| recv_channel = channel_registry.get_or_create( | ||||||
| ComponentMetricRequest, request.get_channel_name() | ||||||
| ).new_receiver() | ||||||
|
|
||||||
| await request_sender.send(request) | ||||||
| consume_tasks.append(asyncio.create_task(consume(recv_channel))) | ||||||
| stream_receiver = await telem_stream_receiver.receive() | ||||||
| consume_tasks.append(asyncio.create_task(consume(stream_receiver))) | ||||||
|
|
||||||
| async with DataSourcingActor(request_receiver, channel_registry): | ||||||
| async with DataSourcingActor(request_receiver): | ||||||
| await asyncio.gather(*consume_tasks) | ||||||
|
|
||||||
| time_taken = perf_counter() - start_time | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -6,13 +6,18 @@ | |||||||||
| from dataclasses import dataclass | ||||||||||
| from datetime import datetime | ||||||||||
|
|
||||||||||
| from frequenz.channels._broadcast import BroadcastReceiver | ||||||||||
| from frequenz.channels._oneshot import OneshotSender | ||||||||||
|
Comment on lines
+9
to
+10
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Once more, I will stop it here as I guess you get the point, but this should be fixed in all files.
Suggested change
|
||||||||||
| from frequenz.client.common.microgrid.components import ComponentId | ||||||||||
| from frequenz.client.microgrid.metrics import Metric | ||||||||||
| from frequenz.quantities import Quantity | ||||||||||
|
|
||||||||||
| from frequenz.sdk.microgrid._old_component_data import TransitionalMetric | ||||||||||
|
|
||||||||||
| __all__ = ["ComponentMetricRequest", "Metric"] | ||||||||||
|
|
||||||||||
| from frequenz.sdk.timeseries import Sample | ||||||||||
|
|
||||||||||
|
|
||||||||||
| @dataclass | ||||||||||
| class ComponentMetricRequest: | ||||||||||
|
|
@@ -30,10 +35,8 @@ class ComponentMetricRequest: | |||||||||
| `namespace`, `component_id`, and `metric` will use the same channel, preventing | ||||||||||
| unnecessary duplication of data streams. | ||||||||||
|
|
||||||||||
| The requester and provider must use the same channel name so that they can | ||||||||||
| independently retrieve the same channel from the `ChannelRegistry`. This is | ||||||||||
| achieved by using the `get_channel_name` method to generate the name on both sides | ||||||||||
| based on parameters set by the requesters. | ||||||||||
| The `get_channel_name` method can be used to obtain a name that uniquely identifies | ||||||||||
| the metric data that will be sent through the telemetry stream. | ||||||||||
| """ | ||||||||||
|
|
||||||||||
| namespace: str | ||||||||||
|
|
@@ -51,6 +54,9 @@ class ComponentMetricRequest: | |||||||||
| If None, only live data is streamed. | ||||||||||
| """ | ||||||||||
|
|
||||||||||
| telem_stream_sender: OneshotSender[BroadcastReceiver[Sample[Quantity]]] | ||||||||||
| """Sender of a oneshot channel used to send the data stream back to the requester.""" | ||||||||||
|
|
||||||||||
| def get_channel_name(self) -> str: | ||||||||||
| """Construct the channel name based on the request parameters. | ||||||||||
|
|
||||||||||
|
|
||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, did you run the benchmark before and after the changes to see if there are any significant performance changes?