Reset jobs on run start/stop from filewriter topic#795
Merged
SimonHeybrock merged 5 commits intomainfrom Mar 18, 2026
Merged
Conversation
Subscribe all services to the {instrument}_filewriter Kafka topic
and reset eligible jobs when a run starts or stops. This ensures
accumulators don't carry stale data across run boundaries.
Key changes:
- Add RunStart/RunStop domain types in core/message.py
- Add RunControlAdapter to deserialize pl72/6s4t FlatBuffer messages
- Add filewriter_topic to StreamMapping (derived from instrument name)
- Add on_run_start/on_run_stop to JobManager with per-job opt-in
- Add reset_on_run_transition flag to WorkflowSpec (default True)
- Timeseries workflows opt out (reset_on_run_transition=False)
- All four services (detector, monitor, reduction, timeseries) subscribe
51745fd to
489b654
Compare
Run start/stop messages carry future timestamps, but resets were firing immediately when the message was processed. Now reset times are scheduled and only fire when the data stream reaches the transition point. Also propagates stop_time from pl72 (run start) messages and converts all run control timestamps from ms (wire format) to ns (domain convention) at the adapter boundary. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The existing test_past_reset_time_fires_on_next_data only tested a reset time that hadn't been reached yet. The new test covers the case where data has already advanced past T=5000 and a RunStart arrives with T=3000, verifying the reset fires on the next data push.
YooSunYoung
approved these changes
Mar 18, 2026
Member
YooSunYoung
left a comment
There was a problem hiding this comment.
I have some minor comments/questions but looks fine in general!
Comment on lines
+327
to
+328
| if info.stop_time is not None: | ||
| self._schedule_reset(info.stop_time) |
Member
There was a problem hiding this comment.
Does NICOS not send run stop message if there is stop_time?
If they do, I think this part is unnecessary.
Member
Author
There was a problem hiding this comment.
I don't know, better to keep it rather than making assumptions that might not be true. No harm to this, is there?
Comment on lines
+338
to
+350
| def _fire_pending_resets(self, end_time: int) -> None: | ||
| """Fire pending resets whose scheduled time has been reached by data.""" | ||
| if not self._pending_reset_times: | ||
| return | ||
| triggered = 0 | ||
| for t in self._pending_reset_times: | ||
| if t <= end_time: | ||
| triggered += 1 | ||
| else: | ||
| break | ||
| if triggered: | ||
| self._pending_reset_times = self._pending_reset_times[triggered:] | ||
| self._reset_eligible_jobs() |
Member
There was a problem hiding this comment.
Can we also write this with bisect_right?
Comment on lines
+185
to
+186
| manager.on_run_start(RunStart(run_name='run_1', start_time=100)) | ||
| # No jobs scheduled/active, but push data past the reset time |
Member
There was a problem hiding this comment.
[MINOR]
Can we add another assertion to make sure it has the reset time before the data was pushed?
Suggested change
| manager.on_run_start(RunStart(run_name='run_1', start_time=100)) | |
| # No jobs scheduled/active, but push data past the reset time | |
| manager.on_run_start(RunStart(run_name='run_1', start_time=100)) | |
| assert manager._pending_reset_times == [100] | |
| # No jobs scheduled/active, but push data past the reset time |
Use bisect_right instead of manual loop in _fire_pending_resets. Add assertion for pending reset state before data push in test. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
{instrument}_filewriterKafka topic for run start (pl72) and run stop (6s4t) messagesstop_timefrompl72run-start messages, scheduling a second reset at stop time when setreset_on_run_transitionflag (defaultTrue) allows timeseries workflows to opt out. We may need to expose this to the user as a setting in the future - either globally or as a flag when starting a workflow. The mechanism inJobManagershould make it simple to change this in the future (need to change source of the flag curently set asreset_on_run_transition=workflow_spec.reset_on_run_transition).Fixes #791.
Motivation
When a new instrument run starts or stops, data reduction workflows should start fresh. Run control messages may carry future timestamps (e.g., "run starts in 1 minute"), so resetting immediately on receipt would discard data that still belongs to the current run. Resets must fire when the data stream actually reaches the transition point.
Design
on_run_start/on_run_stopschedule reset times viabisect.insortinto a sorted pending list onJobManager. Resets fire in_advance_to_timewhenend_timeof incoming data reaches the scheduled time. Multiple pending resets within the same data batch collapse into a single_reset_eligible_jobscall.RunStart/RunStopdataclasses incore/message.py, decoupled fromstreaming_data_typesFlatBuffer types.RunControlAdapter. All downstream code uses nanoseconds.reset_on_run_transitionflag onWorkflowSpec, threaded through toJob. Timeseries workflows setFalse.Test plan
We do not have fakes that create run start/stop - and if we did we would need to ensure they match what NICOS is producing. It is probably more productive to deploy this and try it in the wild.
🤖 Generated with Claude Code