Skip to content

Commit f29ee59

Browse files
authored
Close receiver when stopping LatestValueCache (#495)
2 parents 9797c28 + d8de23e commit f29ee59

3 files changed

Lines changed: 14 additions & 4 deletions

File tree

RELEASE_NOTES.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66

77
## Upgrading
88

9-
<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
9+
- `LatestValueCache` now closes the receiver when it is stopped.
10+
- Fetching values from stopped `LatestValueCache` instances is now disallowed.
1011

1112
## New Features
1213

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ dev-mkdocs = [
5959
"mkdocstrings[python] == 0.30.1",
6060
"mkdocstrings-python == 1.18.2",
6161
"pymdownx-superfence-filter-lines == 0.1.0",
62+
"griffe==1.15.0",
6263
]
6364
dev-mypy = [
6465
# For checking the noxfile, docs/ script, and tests

src/frequenz/channels/_latest_value_cache.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ class LatestValueCache(typing.Generic[T_co]):
5353
5454
It provides a way to look up the latest value in a stream without any delay,
5555
as long as there has been one value received.
56+
57+
Takes ownership of the receiver. When the cache is stopped, the receiver
58+
will be closed.
5659
"""
5760

5861
def __init__(
@@ -66,12 +69,13 @@ def __init__(
6669
provided, a unique identifier will be generated from the object's
6770
[`id()`][id]. It is used mostly for debugging purposes.
6871
"""
69-
self._receiver = receiver
72+
self._receiver: Receiver[T_co] = receiver
7073
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
7174
self._latest_value: T_co | _Sentinel = _Sentinel()
72-
self._task = asyncio.create_task(
75+
self._task: asyncio.Task[None] = asyncio.create_task(
7376
self._run(), name=f"LatestValueCache«{self._unique_id}»"
7477
)
78+
self._stopped: bool = False
7579

7680
@property
7781
def unique_id(self) -> str:
@@ -93,6 +97,8 @@ def get(self) -> T_co:
9397
"""
9498
if isinstance(self._latest_value, _Sentinel):
9599
raise ValueError("No value has been received yet.")
100+
if self._stopped:
101+
raise ValueError("Cache has been stopped.")
96102
return self._latest_value
97103

98104
def has_value(self) -> bool:
@@ -108,7 +114,9 @@ async def _run(self) -> None:
108114
self._latest_value = value
109115

110116
async def stop(self) -> None:
111-
"""Stop the cache."""
117+
"""Stop the cache and close the owned receiver."""
118+
self._receiver.close()
119+
self._stopped = True
112120
if not self._task.done():
113121
self._task.cancel()
114122
try:

0 commit comments

Comments
 (0)