feat(engine): buffered stream anchored on CannonStateNotifications + defer trie root computation on dedicated thread#449
Conversation
CannonStateNotifications + defer trie root computation on dedicated thread
| /// Placeholder for future task handle variants. Currently unused — the | ||
| /// hook updates P2P state directly via the flushed cursor. | ||
| #[derive(Clone, Debug)] | ||
| pub enum TrieTaskHandle {} |
| /// A flashblock drained from the buffer, ready for the coordinator. | ||
| pub struct PendingFlashblockEvent { | ||
| /// Resolve when the flashblock is in the in-memory tree. | ||
| pub tx: oneshot::Sender<()>, |
…/github.com/worldcoin/world-chain into osiris/better-task-management-and-messaging
…/github.com/worldcoin/world-chain into osiris/better-task-management-and-messaging
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 3 potential issues.
Bugbot Autofix is kicking off a free cloud agent to fix these issues. This run is complimentary, but you can enable autofix for all future PRs in the Cursor dashboard.
| Poll::Ready(Some(event)) => { | ||
| this.state.step(event); | ||
| Poll::Ready(this.state.output.pop_front()) | ||
| } |
There was a problem hiding this comment.
BufferedStream returns Ready(None) prematurely, terminating stream
High Severity
When the inner stream yields an event that step() buffers without producing output (e.g., a Pending base flashblock arriving before its canon tip, a stale base flashblock, or a delta with no active epoch), output.pop_front() returns None. Wrapping that in Poll::Ready(None) signals end-of-stream to all downstream consumers. The coordinator's while let Some(event) = stream.next().await loop exits, silently killing the critical "flashblocks executor" task. The fix is to loop back and re-poll the inner stream (or return Poll::Pending) when step() produces no output.
| tags: | ||
| - v* | ||
|
|
||
| pull_request: |
There was a problem hiding this comment.
Sync workflow accidentally triggers on all pull requests
Medium Severity
Adding bare pull_request: to sync.yml triggers the full sync test (60-minute timeout, builds and runs a chain sync) on every pull request. This workflow was previously scoped to workflow_dispatch, weekly schedule, and tag pushes only. This appears to be accidentally committed and will waste CI resources on every PR.
| } | ||
| } | ||
| // Broadcast to local subscribers — ordering handled by WorldChainEventsStream | ||
| self.flashblock_tx.send(payload.clone()).ok(); |
There was a problem hiding this comment.
Duplicate flashblocks now broadcast to all P2P peers
Medium Severity
The old publish() checked whether a flashblock index had already been seen before forwarding to peers and the local broadcast channel. The new code unconditionally calls send_flashblock_to_send_set and flashblock_tx.send for every invocation, even if the same flashblock index was already published. This removes deduplication, causing duplicate flashblocks to be broadcast over the P2P network and processed by downstream consumers unnecessarily.
|
Is this still ok to review? As a general note I suggest you to split the work in several PRs so that each of them has a unique scope. This way it's better to review, to bench and to revert if anything goes wrong. |
|
got it, I'd rather review this once other ones are closed then because it's harder to review this PR as there are many things changing at once. I've already reviewed the other 2 PRs |
|
Yes, makes sense to me! |
a30d12f to
0c5c918
Compare
|
can we close this @0xOsiris ? |
I think we should add this functionality |
yes but this PR contains more things at once and the buffered stream is also already fixed in the repo so it's not necessary anymore |
Sure, but these changes do actually implement deferred trie computation so it's a good reference. And I'd like to keep the branch |


Wip - Process a single shared Event Stream anchoring the flashblocks stream on the canonical tip, and defer trie computation for parallel processing.
Note
High Risk
High risk because it changes how flashblocks are buffered/gated on canonical tip and how pending execution/trie data is produced and scheduled, which can affect correctness and liveness under reorgs or high load.
Overview
Adds a new canon-aware
WorldChainEventsStreamthat merges P2P flashblocks withCanonStateSubscriptionsnotifications, buffering and only emittingPendingflashblocks once their epoch parent matches the canonical tip, while updating P2P state to reject stale epochs.Refactors
FlashblocksExecutionCoordinatorto consume this event stream, cancel in-flight processing on new events, serialize pending-block writes via a semaphore, and buildExecutedBlocks withDeferredTrieData(tracking ancestor handles) while deferring trie sorting to a background rayon task; introduces new coordinator metrics and tracing spans.Updates E2E tests to validate event-stream invariants and replaces prior block-production/validation composition with a new
EngineDriverharness plus Eth RPC helper macros. Also updates workspace deps (reth-engine-tree,pin-project,op-alloy-provider, metrics crates) and tweaks CI to auto-runjust fmtand push formatting commits, and enablessync.ymlonpull_request.Written by Cursor Bugbot for commit b07117f. This will update automatically on new commits. Configure here.