Skip to content

Commit 8e716a9

Browse files
authored
Fix scheduler bugs on v1.x.x (#248)
Rebased follow-up of #246 for `v1.x.x`. - restores the scheduler heap after removing queued events - emits the missing stop notification when an update lands on the stop boundary - adds regression coverage for both scheduler issues - adapts the new scheduler tests to the `time-machine` 3 API used on `v1.x.x`
2 parents 4f03a8f + 6b16b7d commit 8e716a9

File tree

3 files changed

+327
-7
lines changed

3 files changed

+327
-7
lines changed

RELEASE_NOTES.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,8 @@
1515
## Bug Fixes
1616

1717
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
18+
19+
* Fixed bugs that could cause a dispatch actor not to stop when an update
20+
arrives exactly at the moment a dispatch window closes. Also fixed a
21+
related issue that could cause incorrect event ordering when multiple
22+
dispatches are updated concurrently.

src/frequenz/dispatch/_bg_service.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from contextlib import closing
1414
from dataclasses import dataclass, field
1515
from datetime import datetime, timedelta, timezone
16-
from heapq import heappop, heappush
16+
from heapq import heapify, heappop, heappush
1717

1818
import grpc.aio
1919
from frequenz.channels import Broadcast, Receiver, select, selected_from
@@ -486,11 +486,19 @@ async def _update_dispatch_schedule_and_notify(
486486
# Dispatch was updated
487487
elif dispatch and old_dispatch:
488488
# Remove potentially existing scheduled event
489-
self._remove_scheduled(old_dispatch)
489+
removed = self._remove_scheduled(old_dispatch)
490490

491491
# Check if the change requires an immediate notification
492492
if self._update_changed_running_state(dispatch, old_dispatch):
493493
await self._send_running_state_change(dispatch)
494+
elif removed is not None and removed.priority == 1 and not dispatch.started:
495+
# priority == 1 means a stop event (see QueueItem.__init__).
496+
# If we removed a pending stop event and the dispatch is no
497+
# longer started, the update arrived exactly at the stop
498+
# boundary. The timer would have delivered the stop event, but
499+
# _remove_scheduled consumed it first. Send the notification
500+
# here so the actor is not left running past the window end.
501+
await self._send_running_state_change(dispatch)
494502

495503
if dispatch.started:
496504
self._schedule_stop(dispatch)
@@ -507,21 +515,26 @@ def _update_timer(self, timer: Timer) -> None:
507515
timer.reset(interval=due_at - datetime.now(timezone.utc))
508516
_logger.debug("Next event scheduled at %s", self._scheduled_events[0].time)
509517

510-
def _remove_scheduled(self, dispatch: Dispatch) -> bool:
518+
def _remove_scheduled(self, dispatch: Dispatch) -> "QueueItem | None":
511519
"""Remove a dispatch from the scheduled events.
512520
513521
Args:
514522
dispatch: The dispatch to remove.
515523
516524
Returns:
517-
True if the dispatch was found and removed, False otherwise.
525+
The removed queue item, or None if not found.
518526
"""
519527
for idx, item in enumerate(self._scheduled_events):
520528
if dispatch.id == item.dispatch.id:
521529
self._scheduled_events.pop(idx)
522-
return True
523-
524-
return False
530+
# heappop() only removes the root (index 0) and does not accept
531+
# an index argument, so we use list.pop(idx) instead. After
532+
# removing an arbitrary element the heap property is broken and
533+
# must be restored explicitly.
534+
heapify(self._scheduled_events)
535+
return item
536+
537+
return None
525538

526539
def _schedule_start(self, dispatch: Dispatch) -> None:
527540
"""Schedule a dispatch to start.

tests/test_dispatcher_scheduler.py

Lines changed: 302 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,302 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
# pylint: disable=too-many-arguments,too-many-positional-arguments,too-many-locals
5+
6+
"""Tests for scheduler edge cases in DispatchScheduler."""
7+
8+
import asyncio
9+
from collections.abc import AsyncIterator, Iterator
10+
from dataclasses import replace
11+
from datetime import datetime, timedelta, timezone
12+
from random import randint
13+
14+
import async_solipsism
15+
import time_machine
16+
from frequenz.channels import Receiver
17+
from frequenz.client.common.microgrid import MicrogridId
18+
from frequenz.client.dispatch.recurrence import RecurrenceRule
19+
from frequenz.client.dispatch.test.client import FakeClient, to_create_params
20+
from frequenz.client.dispatch.test.generator import DispatchGenerator
21+
from pytest import fixture
22+
23+
from frequenz.dispatch import Dispatch, DispatchEvent
24+
from frequenz.dispatch._bg_service import DispatchScheduler
25+
26+
27+
@fixture
28+
def generator() -> DispatchGenerator:
29+
"""Return a dispatch generator."""
30+
return DispatchGenerator()
31+
32+
33+
@fixture
34+
def event_loop_policy() -> async_solipsism.EventLoopPolicy:
35+
"""Set the event loop policy to use async_solipsism."""
36+
policy = async_solipsism.EventLoopPolicy()
37+
asyncio.set_event_loop_policy(policy)
38+
return policy
39+
40+
41+
@fixture
42+
def fake_time() -> Iterator[time_machine.Traveller]:
43+
"""Replace real time with a time machine that doesn't automatically tick."""
44+
with time_machine.travel(destination=0, tick=False) as traveller:
45+
yield traveller
46+
47+
48+
def _now() -> datetime:
49+
"""Return the current time in UTC."""
50+
return datetime.now(tz=timezone.utc)
51+
52+
53+
@fixture
54+
def microgrid_id() -> MicrogridId:
55+
"""Return a random microgrid ID."""
56+
return MicrogridId(randint(1, 100))
57+
58+
59+
@fixture
60+
def client() -> FakeClient:
61+
"""Return a fake dispatch API client."""
62+
return FakeClient()
63+
64+
65+
@fixture
66+
async def scheduler(
67+
microgrid_id: MicrogridId, client: FakeClient
68+
) -> AsyncIterator[DispatchScheduler]:
69+
"""Start a DispatchScheduler and stop it after the test."""
70+
sched = DispatchScheduler(microgrid_id=microgrid_id, client=client)
71+
sched.start()
72+
try:
73+
yield sched
74+
finally:
75+
await sched.stop()
76+
77+
78+
@fixture
79+
async def event_receiver(scheduler: DispatchScheduler) -> Receiver[Dispatch]:
80+
"""Return a running-state event receiver for SET_POWER dispatches."""
81+
return await scheduler.new_running_state_event_receiver(
82+
"SET_POWER", merge_strategy=None
83+
)
84+
85+
86+
@fixture
87+
def lifecycle_receiver(scheduler: DispatchScheduler) -> Receiver[DispatchEvent]:
88+
"""Return a lifecycle event receiver for SET_POWER dispatches."""
89+
return scheduler.new_lifecycle_events_receiver("SET_POWER")
90+
91+
92+
async def test_parallel_dispatches_same_window_and_follow_up_window(
93+
fake_time: time_machine.Traveller,
94+
generator: DispatchGenerator,
95+
microgrid_id: MicrogridId,
96+
client: FakeClient,
97+
event_receiver: Receiver[Dispatch],
98+
lifecycle_receiver: Receiver[DispatchEvent],
99+
) -> None:
100+
"""Test that unmerged dispatches stop correctly at a shared boundary.
101+
102+
This mirrors the SET_POWER edge case where multiple dispatches share the
103+
exact same start time and duration, and another batch starts exactly when
104+
the first batch ends.
105+
106+
This is an opportunistic test: it tries to detect corruption in the
107+
_scheduled_events heap by exercising the production traffic shape around
108+
shared boundaries. The broken heap ordering depends on runtime queue
109+
layouts that this harness does not reproduce reliably, but the test still
110+
covers the correct boundary behaviour.
111+
"""
112+
start_time = _now() + timedelta(seconds=5)
113+
duration = timedelta(seconds=15)
114+
follow_up_start_time = start_time + duration
115+
116+
first_batch = [
117+
replace(
118+
generator.generate_dispatch(),
119+
active=True,
120+
duration=duration,
121+
start_time=start_time,
122+
recurrence=RecurrenceRule(),
123+
type="SET_POWER",
124+
)
125+
for _ in range(3)
126+
]
127+
second_batch = [
128+
replace(
129+
generator.generate_dispatch(),
130+
active=True,
131+
duration=duration,
132+
start_time=follow_up_start_time,
133+
recurrence=RecurrenceRule(),
134+
type="SET_POWER",
135+
)
136+
for _ in range(4)
137+
]
138+
139+
for dispatch in [*first_batch, *second_batch]:
140+
await client.create(**to_create_params(microgrid_id, dispatch))
141+
142+
for _ in range(7):
143+
await lifecycle_receiver.receive()
144+
145+
fake_time.move_to(start_time + timedelta(seconds=1))
146+
await asyncio.sleep(1)
147+
148+
first_start_events = [await event_receiver.receive() for _ in range(3)]
149+
assert {dispatch.id for dispatch in first_start_events} == {
150+
dispatch.id for dispatch in first_batch
151+
}
152+
assert all(dispatch.started for dispatch in first_start_events)
153+
154+
fake_time.move_to(follow_up_start_time + timedelta(seconds=1))
155+
await asyncio.sleep(1)
156+
157+
boundary_events = [await event_receiver.receive() for _ in range(7)]
158+
second_starts = [dispatch for dispatch in boundary_events if dispatch.started]
159+
first_stops = [dispatch for dispatch in boundary_events if not dispatch.started]
160+
161+
assert {dispatch.id for dispatch in second_starts} == {
162+
dispatch.id for dispatch in second_batch
163+
}
164+
assert all(dispatch.started for dispatch in second_starts)
165+
166+
assert {dispatch.id for dispatch in first_stops} == {
167+
dispatch.id for dispatch in first_batch
168+
}
169+
assert all(not dispatch.started for dispatch in first_stops)
170+
171+
fake_time.move_to(follow_up_start_time + duration + timedelta(seconds=1))
172+
await asyncio.sleep(1)
173+
174+
second_stop_events = [await event_receiver.receive() for _ in range(4)]
175+
assert {dispatch.id for dispatch in second_stop_events} == {
176+
dispatch.id for dispatch in second_batch
177+
}
178+
assert all(not dispatch.started for dispatch in second_stop_events)
179+
180+
181+
async def test_parallel_dispatches_with_payload_updates_before_start(
182+
fake_time: time_machine.Traveller,
183+
generator: DispatchGenerator,
184+
microgrid_id: MicrogridId,
185+
client: FakeClient,
186+
event_receiver: Receiver[Dispatch],
187+
lifecycle_receiver: Receiver[DispatchEvent],
188+
) -> None:
189+
"""Test dispatches updated before start still stop correctly.
190+
191+
Mirrors the production SET_POWER scenario: multiple dispatches with the same
192+
start time are created and then updated with new power values at different
193+
times before the start. Updates trigger _remove_scheduled + re-schedule
194+
which can corrupt the heap, potentially causing stop events to be lost.
195+
196+
This is an opportunistic test: it tries to detect corruption in the
197+
_scheduled_events heap by exercising the update-before-start pattern. The
198+
buggy heap state is timing- and layout-dependent and cannot be forced
199+
deterministically here, but the test still covers the correct behaviour.
200+
"""
201+
start_time = _now() + timedelta(minutes=25)
202+
duration = timedelta(minutes=15)
203+
204+
updates = [
205+
(-80000.0, -88740.0, timedelta(seconds=20)),
206+
(-80000.0, -88740.0, timedelta(minutes=4)),
207+
(-100000.0, -133680.0, timedelta(seconds=50)),
208+
]
209+
dispatches = []
210+
211+
for initial_power, updated_power, wait_time in updates:
212+
dispatch = replace(
213+
generator.generate_dispatch(),
214+
active=True,
215+
duration=duration,
216+
start_time=start_time,
217+
recurrence=RecurrenceRule(),
218+
type="SET_POWER",
219+
payload={"target_power_w": initial_power},
220+
)
221+
dispatches.append(dispatch)
222+
await client.create(**to_create_params(microgrid_id, dispatch))
223+
await lifecycle_receiver.receive()
224+
fake_time.shift(wait_time)
225+
await asyncio.sleep(1)
226+
await client.update(
227+
microgrid_id=microgrid_id,
228+
dispatch_id=dispatch.id,
229+
new_fields={"payload": {"target_power_w": updated_power}},
230+
)
231+
fake_time.shift(timedelta(seconds=1))
232+
await asyncio.sleep(1)
233+
await lifecycle_receiver.receive()
234+
235+
fake_time.move_to(start_time + timedelta(seconds=1))
236+
await asyncio.sleep(1)
237+
238+
start_events = [await event_receiver.receive() for _ in dispatches]
239+
assert all(dispatch.started for dispatch in start_events)
240+
241+
fake_time.move_to(start_time + duration + timedelta(seconds=1))
242+
await asyncio.sleep(1)
243+
244+
stop_events = [await event_receiver.receive() for _ in dispatches]
245+
assert all(not dispatch.started for dispatch in stop_events)
246+
assert {dispatch.id for dispatch in stop_events} == {
247+
dispatch.id for dispatch in start_events
248+
}
249+
250+
251+
async def test_dispatch_payload_update_at_stop_boundary(
252+
fake_time: time_machine.Traveller,
253+
generator: DispatchGenerator,
254+
microgrid_id: MicrogridId,
255+
client: FakeClient,
256+
event_receiver: Receiver[Dispatch],
257+
lifecycle_receiver: Receiver[DispatchEvent],
258+
) -> None:
259+
"""Test that an update at the stop boundary still stops the actor.
260+
261+
An update exactly at the stop boundary can remove the pending stop event
262+
before the timer fires it. Without an explicit notification at that point,
263+
the actor is left running even though the dispatch window has elapsed.
264+
265+
In practice this test is still opportunistic: a payload change triggers an
266+
immediate update notification in this harness, so it also passes without the
267+
fix. It still covers the boundary-update shape from production.
268+
"""
269+
start_time = _now() + timedelta(seconds=5)
270+
duration = timedelta(seconds=15)
271+
272+
dispatch = replace(
273+
generator.generate_dispatch(),
274+
active=True,
275+
duration=duration,
276+
start_time=start_time,
277+
recurrence=RecurrenceRule(),
278+
type="SET_POWER",
279+
payload={"target_power_w": -88740.0},
280+
)
281+
await client.create(**to_create_params(microgrid_id, dispatch))
282+
await lifecycle_receiver.receive()
283+
284+
fake_time.move_to(start_time + timedelta(seconds=1))
285+
await asyncio.sleep(1)
286+
287+
started = await event_receiver.receive()
288+
assert started.started
289+
290+
fake_time.move_to(start_time + duration)
291+
await asyncio.sleep(0)
292+
293+
await client.update(
294+
microgrid_id=microgrid_id,
295+
dispatch_id=dispatch.id,
296+
new_fields={"payload": {"target_power_w": -99999.0}},
297+
)
298+
fake_time.shift(timedelta(seconds=1))
299+
await asyncio.sleep(1)
300+
301+
stop_event = await event_receiver.receive()
302+
assert not stop_event.started

0 commit comments

Comments
 (0)