Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Primary objective in this phase: make sequencer behavior, safety checks, and per
- Storage model is append-oriented; avoid mutable status flags for open/closed entities.
- Open batch/frame are derived by “latest row” convention.
- A frame’s leading direct-input prefix is derivable from `sequenced_l2_txs` plus `frames.safe_block`.
- `direct_inputs` contains only L1 app direct input **bodies**. InputBox payload first byte: **0x00** = direct input (tag stripped, body stored and executed), **0x01** = batch submission (for scheduler, not stored), **others** = discarded (invalid/garbage). The input reader only accepts 0x00-tagged payloads and stores `payload[1..]`.
- Safe cursor/head values should be derived from persisted facts when possible, not duplicated as mutable fields.
- Replay/catch-up must use persisted ordering plus persisted frame fee (`frames.fee`) to mirror inclusion semantics.
- Included user-op identity is constrained by `UNIQUE(sender, nonce)`.
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions benchmarks/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ impl ManagedSequencer {
.open(log_path.as_path())?;
let stderr_log = stdout_log.try_clone()?;

let batch_submitter_key = default_private_keys().first().cloned().unwrap_or_else(|| {
"0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80".to_string()
});
let mut child = Command::new(config.sequencer_bin.as_str())
.arg("--http-addr")
.arg(http_addr)
Expand All @@ -81,6 +84,8 @@ impl ManagedSequencer {
.arg(domain.chain_id.to_string())
.arg("--domain-verifying-contract")
.arg(domain.verifying_contract.to_string())
.arg("--batch-submitter-private-key")
.arg(&batch_submitter_key)
.env("RUST_LOG", DEFAULT_SEQUENCER_RUST_LOG)
.stdout(Stdio::from(stdout_log))
.stderr(Stdio::from(stderr_log))
Expand Down
5 changes: 4 additions & 1 deletion examples/app-core/src/application/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ impl Application for WalletApp {
Ok(())
}

fn execute_direct_input(&mut self, _payload: &[u8]) -> Result<(), AppError> {
fn execute_direct_input(
&mut self,
_input: &sequencer_core::l2_tx::DirectInput,
) -> Result<(), AppError> {
self.executed_input_count = self.executed_input_count.saturating_add(1);
Ok(())
}
Expand Down
45 changes: 39 additions & 6 deletions examples/canonical-app/src/scheduler/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use alloy_sol_types::Eip712Domain;
use alloy_sol_types::SolStruct;
use sequencer_core::application::Application;
use sequencer_core::batch::{Batch, Frame, WireUserOp};
use sequencer_core::l2_tx::DirectInput;
use std::collections::VecDeque;

pub const SEQUENCER_ADDRESS: Address = address!("0x1111111111111111111111111111111111111111");
Expand Down Expand Up @@ -46,10 +47,12 @@ pub struct Scheduler<A: Application> {
app: A,
config: SchedulerConfig,
direct_q: VecDeque<QueuedDirectInput>,
next_expected_batch_nonce: u64,
}

#[derive(Debug, Clone, PartialEq, Eq)]
struct QueuedDirectInput {
sender: Address,
payload: Vec<u8>,
inclusion_block: u64,
}
Expand All @@ -60,6 +63,7 @@ impl<A: Application> Scheduler<A> {
app,
config,
direct_q: VecDeque::new(),
next_expected_batch_nonce: 0,
}
}

Expand All @@ -74,6 +78,7 @@ impl<A: Application> Scheduler<A> {

if input.sender != self.config.sequencer_address {
self.direct_q.push_back(QueuedDirectInput {
sender: input.sender,
payload: input.payload,
inclusion_block: input.inclusion_block,
});
Expand All @@ -93,7 +98,12 @@ impl<A: Application> Scheduler<A> {
return ProcessOutcome::BatchInvalid;
};

if batch.nonce != self.next_expected_batch_nonce {
return ProcessOutcome::BatchInvalid;
}

let Some((frame_head, frame_tail)) = batch.frames.split_first() else {
self.next_expected_batch_nonce = batch.nonce + 1;
return ProcessOutcome::BatchExecuted;
};

Expand All @@ -115,6 +125,7 @@ impl<A: Application> Scheduler<A> {
self.execute_frame_user_ops(domain, frame);
}

self.next_expected_batch_nonce = batch.nonce + 1;
ProcessOutcome::BatchExecuted
}

Expand Down Expand Up @@ -171,7 +182,12 @@ impl<A: Application> Scheduler<A> {
break;
}
let queued = self.direct_q.pop_front().expect("queue front must exist");
if let Err(err) = self.app.execute_direct_input(queued.payload.as_slice()) {
let input = DirectInput {
sender: queued.sender,
block_number: queued.inclusion_block,
payload: queued.payload,
};
if let Err(err) = self.app.execute_direct_input(&input) {
eprintln!("scheduler failed to execute drained direct input: {err}");
}
}
Expand All @@ -184,7 +200,12 @@ impl<A: Application> Scheduler<A> {
self.config.max_wait_blocks,
current_block,
) {
let status = self.app.execute_direct_input(front.payload.as_slice());
let input = DirectInput {
sender: front.sender,
block_number: front.inclusion_block,
payload: front.payload.clone(),
};
let status = self.app.execute_direct_input(&input);
if let Err(err) = status {
eprintln!("scheduler failed to execute overdue direct input: {err}");
}
Expand Down Expand Up @@ -332,9 +353,9 @@ mod tests {

fn execute_direct_input(
&mut self,
payload: &[u8],
input: &DirectInput,
) -> Result<(), sequencer_core::application::AppError> {
let marker = payload.first().copied().unwrap_or(0);
let marker = input.payload.first().copied().unwrap_or(0);
self.executed.push(RecordedTx::Direct(marker));
Ok(())
}
Expand Down Expand Up @@ -411,7 +432,7 @@ mod tests {
}

#[test]
fn batch_drains_safe_directs_before_executing_user_ops() {
fn batch_drains_safe_inputs_before_executing_user_ops() {
let mut scheduler = Scheduler::new(
RecordingApp::default(),
SchedulerConfig {
Expand All @@ -428,6 +449,7 @@ mod tests {
let signing_key = SigningKey::from_bytes((&[1_u8; 32]).into()).expect("signing key");

let batch = Batch {
nonce: 0,
frames: vec![Frame {
user_ops: vec![sign_wire_user_op(
&test_domain(),
Expand Down Expand Up @@ -465,6 +487,7 @@ mod tests {
scheduler.process_input(direct_input(1, 1));
let signing_key = SigningKey::from_bytes((&[2_u8; 32]).into()).expect("signing key");
let batch = Batch {
nonce: 0,
frames: vec![Frame {
user_ops: vec![sign_wire_user_op(
&test_domain(),
Expand Down Expand Up @@ -498,6 +521,7 @@ mod tests {
scheduler.process_input(direct_input(1, 9));
let signing_key = SigningKey::from_bytes((&[3_u8; 32]).into()).expect("signing key");
let stale_batch = Batch {
nonce: 0,
frames: vec![Frame {
user_ops: vec![sign_wire_user_op(
&test_domain(),
Expand Down Expand Up @@ -529,6 +553,7 @@ mod tests {
let signing_key_a = SigningKey::from_bytes((&[4_u8; 32]).into()).expect("signing key a");
let signing_key_b = SigningKey::from_bytes((&[5_u8; 32]).into()).expect("signing key b");
let invalid = Batch {
nonce: 0,
frames: vec![
Frame {
user_ops: vec![sign_wire_user_op(
Expand Down Expand Up @@ -574,6 +599,7 @@ mod tests {

let signing_key = SigningKey::from_bytes((&[6_u8; 32]).into()).expect("signing key");
let invalid = Batch {
nonce: 0,
frames: vec![Frame {
user_ops: vec![sign_wire_user_op(
&test_domain(),
Expand Down Expand Up @@ -607,6 +633,7 @@ mod tests {
scheduler.process_input(direct_input(10, 1));
scheduler.process_input(direct_input(11, 2));
let batch = Batch {
nonce: 0,
frames: vec![Frame {
user_ops: vec![],
safe_block: 10,
Expand Down Expand Up @@ -679,6 +706,7 @@ mod tests {
);

let batch = Batch {
nonce: 0,
frames: vec![Frame {
user_ops: vec![WireUserOp {
nonce: 0,
Expand Down Expand Up @@ -717,6 +745,7 @@ mod tests {
let valid = sign_wire_user_op(&test_domain(), &signing_key, 0, 10, vec![4]);

let batch = Batch {
nonce: 0,
frames: vec![
Frame {
user_ops: vec![bad_nonce],
Expand Down Expand Up @@ -758,7 +787,10 @@ mod tests {
},
);

let batch = Batch { frames: vec![] };
let batch = Batch {
nonce: 0,
frames: vec![],
};

assert_eq!(
scheduler.process_input(batch_input(10, batch)),
Expand All @@ -784,6 +816,7 @@ mod tests {
address!("0x3333333333333333333333333333333333333333"),
);
let batch = Batch {
nonce: 0,
frames: vec![Frame {
user_ops: vec![sign_wire_user_op(
&batch_domain,
Expand Down
4 changes: 3 additions & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ check-all-targets:
test:
cargo test --workspace

# Run sequencer tests sequentially so partition static config (init) is not shared across parallel tests.
test-sequencer:
cargo test -p sequencer --lib
cargo test -p sequencer --lib -- --test-threads=1
cargo test -p sequencer --test e2e_sequencer -- --test-threads=1
cargo test -p sequencer --test ws_broadcaster -- --test-threads=1
cargo test -p sequencer --test batch_submitter_integration -- --test-threads=1

bench target="all":
just -f benchmarks/justfile {{target}}
Expand Down
3 changes: 2 additions & 1 deletion sequencer-core/src/application/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

use crate::l2_tx::DirectInput;
use crate::l2_tx::ValidUserOp;
use crate::user_op::UserOp;
use alloy_primitives::{Address, U256};
Expand Down Expand Up @@ -92,7 +93,7 @@ pub trait Application: Send {
Ok(ExecutionOutcome::Included)
}

fn execute_direct_input(&mut self, _payload: &[u8]) -> Result<(), AppError> {
fn execute_direct_input(&mut self, _input: &DirectInput) -> Result<(), AppError> {
Ok(())
}

Expand Down
30 changes: 30 additions & 0 deletions sequencer-core/src/batch.rs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this wire format is carrying more structure than it needs.

Right now we have two different ways to classify inputs:

  • Scheduler: metadata.msg_sender at the rollups layer
  • Batch Submitter: an explicit tag byte in the payload

So the scheduler already has the real protocol boundary available via msg_sender, and already treats the sequencer address differently from all other senders. So the tag feels redundant.

I’d simplify this to:

  • classify by sender only
  • move the batch nonce into Batch itself, so that it becomes Batch { nonce, frames }

That would make the payload just:

  • ssz(Batch { nonce, frames })

instead of the current:

  • tag || nonce || ssz(Batch { frames })

I think that would give us a much cleaner boundary:

  • one source of truth for “is this a batch?”
  • one protocol object
  • one decode path

It would also make the nonce part of the actual wire type, which feels more honest than keeping it as an out-of-band prefix.

Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,17 @@
use crate::user_op::UserOp;
use ssz_derive::{Decode, Encode};

/// Tag byte for InputBox payloads that are L1 app direct inputs (e.g. deposits).
/// L1/app must post such inputs as `0x00 || body`. Only these are stored (body only) and executed.
pub const INPUT_TAG_DIRECT_INPUT: u8 = 0x00;

/// Batch submissions are sent as raw `ssz(Batch)` with no tag; classification at L1 is by
/// attempting SSZ decode, and at the rollup by msg_sender.

#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
pub struct Batch {
/// Batch index (nonce) for deduplication and ordering at the scheduler.
pub nonce: u64,
pub frames: Vec<Frame>,
}

Expand Down Expand Up @@ -35,3 +44,24 @@ impl WireUserOp {
}
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BatchForSubmission {
pub batch_index: u64,
pub created_at_ms: u64,
pub batch: Batch,
}

impl BatchForSubmission {
/// Encode the batch for the scheduler as a single SSZ payload.
///
/// Payload is `ssz(Batch { nonce: batch_index, frames })`. The scheduler decodes this
/// and uses `batch.nonce` for deduplication; classification at the rollup is by msg_sender.
pub fn encode_for_scheduler(&self) -> Vec<u8> {
let batch = Batch {
nonce: self.batch_index,
frames: self.batch.frames.clone(),
};
ssz::Encode::as_ssz_bytes(&batch)
}
}
4 changes: 4 additions & 0 deletions sequencer-core/src/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub enum BroadcastTxMessage {
},
DirectInput {
offset: u64,
sender: String,
block_number: u64,
payload: String,
},
}
Expand All @@ -37,6 +39,8 @@ impl BroadcastTxMessage {
},
SequencedL2Tx::Direct(direct) => Self::DirectInput {
offset,
sender: direct.sender.to_string(),
block_number: direct.block_number,
payload: alloy_primitives::hex::encode_prefixed(direct.payload.as_slice()),
},
}
Expand Down
2 changes: 2 additions & 0 deletions sequencer-core/src/l2_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use alloy_primitives::Address;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DirectInput {
pub sender: Address,
pub block_number: u64,
pub payload: Vec<u8>,
}

Expand Down
4 changes: 3 additions & 1 deletion sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ rusqlite = { version = "0.38.0", features = ["bundled"] }
rusqlite_migration = "2.3.0"
alloy-primitives = { version = "1.4.1", features = ["serde", "k256"] }
alloy-sol-types = "1.4.1"
alloy = { version = "1.0", features = ["contract", "network", "reqwest", "rpc-types", "sol-types", "node-bindings"] }
alloy = { version = "1.0", features = ["contract", "network", "reqwest", "rpc-types", "sol-types", "node-bindings", "signer-local", "signers"] }
alloy-network-primitives = "1.7"
thiserror = "1"
ssz = { package = "ethereum_ssz", version = "0.10" }
ssz_derive = { package = "ethereum_ssz_derive", version = "0.10" }
clap = { version = "4", features = ["derive", "env"] }
async-recursion = "1"
cartesi-rollups-contracts = "=2.2.0"
async-trait = "0.1"

[dev-dependencies]
futures-util = "0.3"
Expand Down
1 change: 1 addition & 0 deletions sequencer/src/api/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ mod tests {
crate::l2_tx_feed::L2TxFeedConfig {
idle_poll_interval: std::time::Duration::from_millis(2),
page_size: 64,
batch_submitter_address: None,
},
);

Expand Down
Loading
Loading