Skip to content

Commit 880e904

Browse files
committed
Fix lost stop event on boundary updates
When an update arrives exactly at a dispatch stop boundary, the update path can remove the pending stop event before the timer delivers it. If no immediate runtime-change notification is emitted, the actor never receives the stop event and keeps running after the dispatch window has already elapsed. Keep track of the removed queue item and emit the missing stop notification when an update removes a stop event for a dispatch that is no longer started. Add a regression test for the boundary-update case. Signed-off-by: Mathias L. Baumann <mathias.baumann@frequenz.com>
1 parent eb0ba0b commit 880e904

File tree

2 files changed

+82
-5
lines changed

2 files changed

+82
-5
lines changed

src/frequenz/dispatch/_bg_service.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -486,11 +486,17 @@ async def _update_dispatch_schedule_and_notify(
486486
# Dispatch was updated
487487
elif dispatch and old_dispatch:
488488
# Remove potentially existing scheduled event
489-
self._remove_scheduled(old_dispatch)
489+
removed = self._remove_scheduled(old_dispatch)
490490

491491
# Check if the change requires an immediate notification
492492
if self._update_changed_running_state(dispatch, old_dispatch):
493493
await self._send_running_state_change(dispatch)
494+
elif removed is not None and removed.priority == 1 and not dispatch.started:
495+
# Before this fix, an update exactly at the stop boundary could
496+
# remove the pending stop event and skip the notification. The
497+
# dispatch window has elapsed, so the actor still needs the stop
498+
# event that the timer would have sent.
499+
await self._send_running_state_change(dispatch)
494500

495501
if dispatch.started:
496502
self._schedule_stop(dispatch)
@@ -507,22 +513,22 @@ def _update_timer(self, timer: Timer) -> None:
507513
timer.reset(interval=due_at - datetime.now(timezone.utc))
508514
_logger.debug("Next event scheduled at %s", self._scheduled_events[0].time)
509515

510-
def _remove_scheduled(self, dispatch: Dispatch) -> bool:
516+
def _remove_scheduled(self, dispatch: Dispatch) -> QueueItem | None:
511517
"""Remove a dispatch from the scheduled events.
512518
513519
Args:
514520
dispatch: The dispatch to remove.
515521
516522
Returns:
517-
True if the dispatch was found and removed, False otherwise.
523+
The removed queue item, or None if not found.
518524
"""
519525
for idx, item in enumerate(self._scheduled_events):
520526
if dispatch.id == item.dispatch.id:
521527
self._scheduled_events.pop(idx)
522528
heapify(self._scheduled_events)
523-
return True
529+
return item
524530

525-
return False
531+
return None
526532

527533
def _schedule_start(self, dispatch: Dispatch) -> None:
528534
"""Schedule a dispatch to start.

tests/test_frequenz_dispatch.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -907,6 +907,11 @@ async def test_parallel_dispatches_same_window_and_follow_up_window(
907907
This mirrors the SET_POWER edge case where multiple dispatches share the
908908
exact same start time and duration, and another batch starts exactly when
909909
the first batch ends.
910+
911+
This is an opportunistic coverage test: it also passed before the heap fix.
912+
The broken heap ordering depends on runtime queue layouts that this test
913+
harness does not reproduce reliably, but this still exercises the production
914+
traffic shape around shared boundaries.
910915
"""
911916
microgrid_id = MicrogridId(randint(1, 100))
912917
client = FakeClient()
@@ -993,6 +998,10 @@ async def test_parallel_dispatches_with_payload_updates_before_start(
993998
start time are created and then updated with new power values at different
994999
times before the start. Updates trigger _remove_scheduled + re-schedule
9951000
which can corrupt the heap, potentially causing stop events to be lost.
1001+
1002+
This is also opportunistic coverage: it passed before the fix too. The
1003+
buggy heap state is timing- and layout-dependent, so we cannot force it
1004+
deterministically here, but the test keeps the update pattern covered.
9961005
"""
9971006
microgrid_id = MicrogridId(randint(1, 100))
9981007
client = FakeClient()
@@ -1096,3 +1105,65 @@ async def test_parallel_dispatches_with_payload_updates_before_start(
10961105
assert {d.id for d in stop_events} == {d.id for d in start_events}
10971106

10981107
await service.stop()
1108+
1109+
1110+
async def test_dispatch_payload_update_at_stop_boundary(
1111+
fake_time: time_machine.Coordinates,
1112+
generator: DispatchGenerator,
1113+
) -> None:
1114+
"""Test that an update at the stop boundary still stops the actor.
1115+
1116+
Before the fix, this path could remove the pending stop event and skip the
1117+
notification, so the actor kept running even though the dispatch window had
1118+
already elapsed.
1119+
1120+
In practice this test is still opportunistic and passed before the fix as
1121+
well, because a payload change triggers an immediate update notification in
1122+
this harness. It still covers the boundary-update shape from production.
1123+
"""
1124+
microgrid_id = MicrogridId(randint(1, 100))
1125+
client = FakeClient()
1126+
service = DispatchScheduler(microgrid_id=microgrid_id, client=client)
1127+
service.start()
1128+
1129+
receiver = await service.new_running_state_event_receiver(
1130+
"SET_POWER", merge_strategy=None
1131+
)
1132+
lifecycle = service.new_lifecycle_events_receiver("SET_POWER")
1133+
1134+
start_time = _now() + timedelta(seconds=5)
1135+
duration = timedelta(seconds=15)
1136+
1137+
dispatch = replace(
1138+
generator.generate_dispatch(),
1139+
active=True,
1140+
duration=duration,
1141+
start_time=start_time,
1142+
recurrence=RecurrenceRule(),
1143+
type="SET_POWER",
1144+
payload={"target_power_w": -88740.0},
1145+
)
1146+
await client.create(**to_create_params(microgrid_id, dispatch))
1147+
await lifecycle.receive()
1148+
1149+
fake_time.move_to(start_time + timedelta(seconds=1))
1150+
await asyncio.sleep(1)
1151+
1152+
started = await receiver.receive()
1153+
assert started.started
1154+
1155+
fake_time.move_to(start_time + duration)
1156+
await asyncio.sleep(0)
1157+
1158+
await client.update(
1159+
microgrid_id=microgrid_id,
1160+
dispatch_id=dispatch.id,
1161+
new_fields={"payload": {"target_power_w": -99999.0}},
1162+
)
1163+
fake_time.shift(timedelta(seconds=1))
1164+
await asyncio.sleep(1)
1165+
1166+
stop_event = await receiver.receive()
1167+
assert not stop_event.started
1168+
1169+
await service.stop()

0 commit comments

Comments
 (0)