Skip to content

feat(engine): buffered stream anchored on CannonStateNotifications + defer trie root computation on dedicated thread#449

Open
0xOsiris wants to merge 47 commits into
mainfrom
osiris/better-task-management-and-messaging
Open

feat(engine): buffered stream anchored on CannonStateNotifications + defer trie root computation on dedicated thread#449
0xOsiris wants to merge 47 commits into
mainfrom
osiris/better-task-management-and-messaging

Conversation

@0xOsiris
Copy link
Copy Markdown
Contributor

@0xOsiris 0xOsiris commented Mar 13, 2026

Wip - Process a single shared Event Stream anchoring the flashblocks stream on the canonical tip, and defer trie computation for parallel processing.


Note

High Risk
High risk because it changes how flashblocks are buffered/gated on canonical tip and how pending execution/trie data is produced and scheduled, which can affect correctness and liveness under reorgs or high load.

Overview
Adds a new canon-aware WorldChainEventsStream that merges P2P flashblocks with CanonStateSubscriptions notifications, buffering and only emitting Pending flashblocks once their epoch parent matches the canonical tip, while updating P2P state to reject stale epochs.

Refactors FlashblocksExecutionCoordinator to consume this event stream, cancel in-flight processing on new events, serialize pending-block writes via a semaphore, and build ExecutedBlocks with DeferredTrieData (tracking ancestor handles) while deferring trie sorting to a background rayon task; introduces new coordinator metrics and tracing spans.

Updates E2E tests to validate event-stream invariants and replaces prior block-production/validation composition with a new EngineDriver harness plus Eth RPC helper macros. Also updates workspace deps (reth-engine-tree, pin-project, op-alloy-provider, metrics crates) and tweaks CI to auto-run just fmt and push formatting commits, and enables sync.yml on pull_request.

Written by Cursor Bugbot for commit b07117f. This will update automatically on new commits. Configure here.

@0xOsiris 0xOsiris changed the title Osiris/better task management and messaging feat(engine): buffered stream anchored on CannonStateNotifications + defer trie root computation on dedicated thread Mar 13, 2026
Comment thread crates/flashblocks/p2p/src/protocol/event.rs
Comment thread crates/flashblocks/builder/src/coordinator.rs
Comment thread crates/flashblocks/p2p/src/protocol/event.rs Outdated
/// Placeholder for future task handle variants. Currently unused — the
/// hook updates P2P state directly via the flushed cursor.
#[derive(Clone, Debug)]
pub enum TrieTaskHandle {}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Finish

/// A flashblock drained from the buffer, ready for the coordinator.
pub struct PendingFlashblockEvent {
/// Resolve when the flashblock is in the in-memory tree.
pub tx: oneshot::Sender<()>,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: finish

Comment thread crates/flashblocks/p2p/src/protocol/event.rs Outdated
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 3 potential issues.

Fix All in Cursor

Bugbot Autofix is kicking off a free cloud agent to fix these issues. This run is complimentary, but you can enable autofix for all future PRs in the Cursor dashboard.

Poll::Ready(Some(event)) => {
this.state.step(event);
Poll::Ready(this.state.output.pop_front())
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BufferedStream returns Ready(None) prematurely, terminating stream

High Severity

When the inner stream yields an event that step() buffers without producing output (e.g., a Pending base flashblock arriving before its canon tip, a stale base flashblock, or a delta with no active epoch), output.pop_front() returns None. Wrapping that in Poll::Ready(None) signals end-of-stream to all downstream consumers. The coordinator's while let Some(event) = stream.next().await loop exits, silently killing the critical "flashblocks executor" task. The fix is to loop back and re-poll the inner stream (or return Poll::Pending) when step() produces no output.

Fix in Cursor Fix in Web

tags:
- v*

pull_request:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sync workflow accidentally triggers on all pull requests

Medium Severity

Adding bare pull_request: to sync.yml triggers the full sync test (60-minute timeout, builds and runs a chain sync) on every pull request. This workflow was previously scoped to workflow_dispatch, weekly schedule, and tag pushes only. This appears to be accidentally committed and will waste CI resources on every PR.

Fix in Cursor Fix in Web

}
}
// Broadcast to local subscribers — ordering handled by WorldChainEventsStream
self.flashblock_tx.send(payload.clone()).ok();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate flashblocks now broadcast to all P2P peers

Medium Severity

The old publish() checked whether a flashblock index had already been seen before forwarding to peers and the local broadcast channel. The new code unconditionally calls send_flashblock_to_send_set and flashblock_tx.send for every invocation, even if the same flashblock index was already published. This removes deduplication, causing duplicate flashblocks to be broadcast over the P2P network and processed by downstream consumers unnecessarily.

Fix in Cursor Fix in Web

@alessandromazza98
Copy link
Copy Markdown
Contributor

Is this still ok to review? As a general note I suggest you to split the work in several PRs so that each of them has a unique scope. This way it's better to review, to bench and to revert if anything goes wrong.

@0xOsiris
Copy link
Copy Markdown
Contributor Author

Is this still ok to review? As a general note I suggest you to split the work in several PRs so that each of them has a unique scope. This way it's better to review, to bench and to revert if anything goes wrong.

It's stacked on #452, and #447. So, those changes need to be reviewed first but yes

@alessandromazza98
Copy link
Copy Markdown
Contributor

got it, I'd rather review this once other ones are closed then because it's harder to review this PR as there are many things changing at once. I've already reviewed the other 2 PRs

Copy link
Copy Markdown
Contributor Author

Yes, makes sense to me!

@0xForerunner 0xForerunner force-pushed the fr/flashblocks-p2p-v2 branch from a30d12f to 0c5c918 Compare March 19, 2026 20:37
Base automatically changed from fr/flashblocks-p2p-v2 to main March 19, 2026 23:56
@alessandromazza98
Copy link
Copy Markdown
Contributor

can we close this @0xOsiris ?

@0xOsiris
Copy link
Copy Markdown
Contributor Author

can we close this @0xOsiris ?

I think we should add this functionality

@alessandromazza98
Copy link
Copy Markdown
Contributor

I think we should add this functionality

yes but this PR contains more things at once and the buffered stream is also already fixed in the repo so it's not necessary anymore

@0xOsiris
Copy link
Copy Markdown
Contributor Author

I think we should add this functionality

yes but this PR contains more things at once and the buffered stream is also already fixed in the repo so it's not necessary anymore

Sure, but these changes do actually implement deferred trie computation so it's a good reference. And I'd like to keep the branch

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants