Skip to content

Reset jobs on run start/stop from filewriter topic#795

Merged
SimonHeybrock merged 5 commits intomainfrom
worktree-run-transition-reset
Mar 18, 2026
Merged

Reset jobs on run start/stop from filewriter topic#795
SimonHeybrock merged 5 commits intomainfrom
worktree-run-transition-reset

Conversation

@SimonHeybrock
Copy link
Member

@SimonHeybrock SimonHeybrock commented Mar 13, 2026

Summary

  • Subscribe services to the {instrument}_filewriter Kafka topic for run start (pl72) and run stop (6s4t) messages
  • Defer job resets to when the data stream reaches the transition timestamp, rather than resetting immediately when the control message is processed
  • Propagate stop_time from pl72 run-start messages, scheduling a second reset at stop time when set
  • Convert run control timestamps from ms (FlatBuffer wire format) to ns (domain convention) at the adapter boundary
  • Per-workflow reset_on_run_transition flag (default True) allows timeseries workflows to opt out. We may need to expose this to the user as a setting in the future - either globally or as a flag when starting a workflow. The mechanism in JobManager should make it simple to change this in the future (need to change source of the flag curently set as reset_on_run_transition=workflow_spec.reset_on_run_transition).

Fixes #791.

Motivation

When a new instrument run starts or stops, data reduction workflows should start fresh. Run control messages may carry future timestamps (e.g., "run starts in 1 minute"), so resetting immediately on receipt would discard data that still belongs to the current run. Resets must fire when the data stream actually reaches the transition point.

Design

  • Deferred resets: on_run_start/on_run_stop schedule reset times via bisect.insort into a sorted pending list on JobManager. Resets fire in _advance_to_time when end_time of incoming data reaches the scheduled time. Multiple pending resets within the same data batch collapse into a single _reset_eligible_jobs call.
  • Own domain types: RunStart/RunStop dataclasses in core/message.py, decoupled from streaming_data_types FlatBuffer types.
  • ms→ns conversion: Done once in RunControlAdapter. All downstream code uses nanoseconds.
  • Per-job opt-in: reset_on_run_transition flag on WorkflowSpec, threaded through to Job. Timeseries workflows set False.

Test plan

We do not have fakes that create run start/stop - and if we did we would need to ensure they match what NICOS is producing. It is probably more productive to deploy this and try it in the wild.

  • Deploy and verify with real NICOS run start/stop messages

🤖 Generated with Claude Code

Subscribe all services to the {instrument}_filewriter Kafka topic
and reset eligible jobs when a run starts or stops. This ensures
accumulators don't carry stale data across run boundaries.

Key changes:
- Add RunStart/RunStop domain types in core/message.py
- Add RunControlAdapter to deserialize pl72/6s4t FlatBuffer messages
- Add filewriter_topic to StreamMapping (derived from instrument name)
- Add on_run_start/on_run_stop to JobManager with per-job opt-in
- Add reset_on_run_transition flag to WorkflowSpec (default True)
- Timeseries workflows opt out (reset_on_run_transition=False)
- All four services (detector, monitor, reduction, timeseries) subscribe
@SimonHeybrock SimonHeybrock force-pushed the worktree-run-transition-reset branch from 51745fd to 489b654 Compare March 16, 2026 13:25
Run start/stop messages carry future timestamps, but resets were firing
immediately when the message was processed. Now reset times are scheduled
and only fire when the data stream reaches the transition point.

Also propagates stop_time from pl72 (run start) messages and converts
all run control timestamps from ms (wire format) to ns (domain convention)
at the adapter boundary.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@SimonHeybrock SimonHeybrock changed the title Reset jobs on run start/stop from filewriter topic Reset jobs on run transitions using data-driven timestamps Mar 16, 2026
@SimonHeybrock SimonHeybrock marked this pull request as ready for review March 17, 2026 05:37
@SimonHeybrock SimonHeybrock changed the title Reset jobs on run transitions using data-driven timestamps Reset jobs on run transitions Mar 17, 2026
@SimonHeybrock SimonHeybrock changed the title Reset jobs on run transitions Reset jobs on run start/stop from filewriter topic Mar 17, 2026
The existing test_past_reset_time_fires_on_next_data only tested a reset
time that hadn't been reached yet. The new test covers the case where data
has already advanced past T=5000 and a RunStart arrives with T=3000,
verifying the reset fires on the next data push.
Copy link
Member

@YooSunYoung YooSunYoung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some minor comments/questions but looks fine in general!

Comment on lines +327 to +328
if info.stop_time is not None:
self._schedule_reset(info.stop_time)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does NICOS not send run stop message if there is stop_time?
If they do, I think this part is unnecessary.

Copy link
Member Author

@SimonHeybrock SimonHeybrock Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know, better to keep it rather than making assumptions that might not be true. No harm to this, is there?

Comment on lines +338 to +350
def _fire_pending_resets(self, end_time: int) -> None:
"""Fire pending resets whose scheduled time has been reached by data."""
if not self._pending_reset_times:
return
triggered = 0
for t in self._pending_reset_times:
if t <= end_time:
triggered += 1
else:
break
if triggered:
self._pending_reset_times = self._pending_reset_times[triggered:]
self._reset_eligible_jobs()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also write this with bisect_right?

Comment on lines +185 to +186
manager.on_run_start(RunStart(run_name='run_1', start_time=100))
# No jobs scheduled/active, but push data past the reset time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MINOR]
Can we add another assertion to make sure it has the reset time before the data was pushed?

Suggested change
manager.on_run_start(RunStart(run_name='run_1', start_time=100))
# No jobs scheduled/active, but push data past the reset time
manager.on_run_start(RunStart(run_name='run_1', start_time=100))
assert manager._pending_reset_times == [100]
# No jobs scheduled/active, but push data past the reset time

SimonHeybrock and others added 2 commits March 18, 2026 10:27
Use bisect_right instead of manual loop in _fire_pending_resets.
Add assertion for pending reset state before data push in test.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@SimonHeybrock SimonHeybrock enabled auto-merge March 18, 2026 12:21
@SimonHeybrock SimonHeybrock merged commit b6e2674 into main Mar 18, 2026
4 checks passed
@SimonHeybrock SimonHeybrock deleted the worktree-run-transition-reset branch March 18, 2026 12:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

NICOS should be able to restart the workflow and start a cumulative run from start.

2 participants