Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion examples/app-core/src/application/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ impl Application for WalletApp {
Ok(())
}

fn execute_direct_input(&mut self, _payload: &[u8]) -> Result<(), AppError> {
fn execute_direct_input(
&mut self,
_input: &sequencer_core::l2_tx::DirectInput,
) -> Result<(), AppError> {
self.executed_input_count = self.executed_input_count.saturating_add(1);
Ok(())
}
Expand Down
23 changes: 18 additions & 5 deletions examples/canonical-app/src/scheduler/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use alloy_sol_types::Eip712Domain;
use alloy_sol_types::SolStruct;
use sequencer_core::application::Application;
use sequencer_core::batch::{Batch, Frame, WireUserOp};
use sequencer_core::l2_tx::DirectInput;
use std::collections::VecDeque;

pub const SEQUENCER_ADDRESS: Address = address!("0x1111111111111111111111111111111111111111");
Expand Down Expand Up @@ -51,6 +52,7 @@ pub struct Scheduler<A: Application> {

#[derive(Debug, Clone, PartialEq, Eq)]
struct QueuedDirectInput {
sender: Address,
payload: Vec<u8>,
inclusion_block: u64,
}
Expand All @@ -76,6 +78,7 @@ impl<A: Application> Scheduler<A> {

if input.sender != self.config.sequencer_address {
self.direct_q.push_back(QueuedDirectInput {
sender: input.sender,
payload: input.payload,
inclusion_block: input.inclusion_block,
});
Expand Down Expand Up @@ -179,7 +182,12 @@ impl<A: Application> Scheduler<A> {
break;
}
let queued = self.direct_q.pop_front().expect("queue front must exist");
if let Err(err) = self.app.execute_direct_input(queued.payload.as_slice()) {
let input = DirectInput {
sender: queued.sender,
block_number: queued.inclusion_block,
payload: queued.payload,
};
if let Err(err) = self.app.execute_direct_input(&input) {
eprintln!("scheduler failed to execute drained direct input: {err}");
}
}
Expand All @@ -192,7 +200,12 @@ impl<A: Application> Scheduler<A> {
self.config.max_wait_blocks,
current_block,
) {
let status = self.app.execute_direct_input(front.payload.as_slice());
let input = DirectInput {
sender: front.sender,
block_number: front.inclusion_block,
payload: front.payload.clone(),
};
let status = self.app.execute_direct_input(&input);
if let Err(err) = status {
eprintln!("scheduler failed to execute overdue direct input: {err}");
}
Expand Down Expand Up @@ -340,9 +353,9 @@ mod tests {

fn execute_direct_input(
&mut self,
payload: &[u8],
input: &DirectInput,
) -> Result<(), sequencer_core::application::AppError> {
let marker = payload.first().copied().unwrap_or(0);
let marker = input.payload.first().copied().unwrap_or(0);
self.executed.push(RecordedTx::Direct(marker));
Ok(())
}
Expand Down Expand Up @@ -419,7 +432,7 @@ mod tests {
}

#[test]
fn batch_drains_safe_directs_before_executing_user_ops() {
fn batch_drains_safe_inputs_before_executing_user_ops() {
let mut scheduler = Scheduler::new(
RecordingApp::default(),
SchedulerConfig {
Expand Down
3 changes: 2 additions & 1 deletion sequencer-core/src/application/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

use crate::l2_tx::DirectInput;
use crate::l2_tx::ValidUserOp;
use crate::user_op::UserOp;
use alloy_primitives::{Address, U256};
Expand Down Expand Up @@ -92,7 +93,7 @@ pub trait Application: Send {
Ok(ExecutionOutcome::Included)
}

fn execute_direct_input(&mut self, _payload: &[u8]) -> Result<(), AppError> {
fn execute_direct_input(&mut self, _input: &DirectInput) -> Result<(), AppError> {
Ok(())
}

Expand Down
4 changes: 4 additions & 0 deletions sequencer-core/src/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub enum BroadcastTxMessage {
},
DirectInput {
offset: u64,
sender: String,
block_number: u64,
payload: String,
},
}
Expand All @@ -37,6 +39,8 @@ impl BroadcastTxMessage {
},
SequencedL2Tx::Direct(direct) => Self::DirectInput {
offset,
sender: direct.sender.to_string(),
block_number: direct.block_number,
payload: alloy_primitives::hex::encode_prefixed(direct.payload.as_slice()),
},
}
Expand Down
2 changes: 2 additions & 0 deletions sequencer-core/src/l2_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use alloy_primitives::Address;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DirectInput {
pub sender: Address,
pub block_number: u64,
pub payload: Vec<u8>,
}

Expand Down
1 change: 1 addition & 0 deletions sequencer/src/api/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ mod tests {
crate::l2_tx_feed::L2TxFeedConfig {
idle_poll_interval: std::time::Duration::from_millis(2),
page_size: 64,
batch_submitter_address: None,
},
);

Expand Down
223 changes: 223 additions & 0 deletions sequencer/src/batch_submitter/batch_poster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

use alloy::providers::Provider;
use async_trait::async_trait;
use cartesi_rollups_contracts::input_box::InputBox;
use sequencer_core::batch::Batch;
use thiserror::Error;

use crate::input_reader::logs::{decode_evm_advance_input, get_input_added_events};

pub type TxHash = alloy_primitives::B256;

#[derive(Debug, Clone)]
pub struct BatchPosterConfig {
pub l1_submit_address: alloy_primitives::Address,
pub app_address: alloy_primitives::Address,
pub batch_submitter_address: alloy_primitives::Address,
pub start_block: u64,
pub confirmation_depth: u64,
}

#[derive(Debug, Error)]
pub enum BatchPosterError {
#[error("provider/transport: {0}")]
Provider(String),
}

#[async_trait]
pub trait BatchPoster: Send + Sync {
async fn submit_batch(&self, payload: Vec<u8>) -> Result<TxHash, BatchPosterError>;

async fn observed_submitted_batch_nonces(
&self,
from_block: u64,
) -> Result<Vec<u64>, BatchPosterError>;
}

#[derive(Clone)]
pub struct EthereumBatchPoster<P: Provider + Send + Sync + Clone + 'static> {
provider: P,
config: BatchPosterConfig,
}

impl<P> EthereumBatchPoster<P>
where
P: Provider + Send + Sync + Clone + 'static,
{
pub fn new(provider: P, config: BatchPosterConfig) -> Self {
Self { provider, config }
}
}

#[async_trait]
impl<P> BatchPoster for EthereumBatchPoster<P>
where
P: Provider + Send + Sync + Clone + 'static,
{
async fn submit_batch(&self, payload: Vec<u8>) -> Result<TxHash, BatchPosterError> {
let input_box = InputBox::new(self.config.l1_submit_address, &self.provider);
let pending = input_box
.addInput(self.config.app_address, payload.into())
.send()
.await
.map_err(|err| BatchPosterError::Provider(err.to_string()))?;
let tx_hash = *pending.tx_hash();

pending
.with_required_confirmations(self.config.confirmation_depth.saturating_add(1))
.watch()
.await
.map_err(|err| BatchPosterError::Provider(err.to_string()))?;

Ok(tx_hash)
}

async fn observed_submitted_batch_nonces(
&self,
from_block: u64,
) -> Result<Vec<u64>, BatchPosterError> {
let latest = self
.provider
.get_block_number()
.await
.map_err(|err| BatchPosterError::Provider(err.to_string()))?;
let end_block = latest.saturating_sub(self.config.confirmation_depth);
let start_block = from_block.max(self.config.start_block);
if start_block > end_block {
return Ok(Vec::new());
}

let events = get_input_added_events(
&self.provider,
self.config.app_address,
&self.config.l1_submit_address,
start_block,
end_block,
&[],
)
.await
.map_err(|errs| {
BatchPosterError::Provider(
errs.into_iter()
.next()
.map(|e| e.to_string())
.unwrap_or_default(),
)
})?;

let mut observed_nonces = Vec::new();
for (event, _log) in events {
let evm_advance = decode_evm_advance_input(event.input.as_ref())
.map_err(BatchPosterError::Provider)?;
if evm_advance.msgSender != self.config.batch_submitter_address {
continue;
}
let batch: Batch = ssz::Decode::from_ssz_bytes(evm_advance.payload.as_ref())
.map_err(|err| BatchPosterError::Provider(format!("{err:?}")))?;
observed_nonces.push(batch.nonce);
}

Ok(observed_nonces)
}
}

#[cfg(test)]
pub(crate) mod mock {
use super::{Batch, BatchPoster, BatchPosterError, TxHash};
use async_trait::async_trait;
use std::sync::Mutex;

#[derive(Debug)]
pub struct MockBatchPoster {
pub submissions: Mutex<Vec<(u64, usize)>>,
pub fail_submit: Mutex<bool>,
pub observed_submitted_nonces: Mutex<Vec<u64>>,
pub observed_submitted_error: Mutex<Option<String>>,
pub last_from_block: Mutex<Option<u64>>,
}

impl MockBatchPoster {
pub fn new() -> Self {
Self {
submissions: Mutex::new(Vec::new()),
fail_submit: Mutex::new(false),
observed_submitted_nonces: Mutex::new(Vec::new()),
observed_submitted_error: Mutex::new(None),
last_from_block: Mutex::new(None),
}
}

pub fn submissions(&self) -> Vec<(u64, usize)> {
self.submissions.lock().expect("lock").clone()
}

pub fn set_observed_submitted_nonces(&self, value: Vec<u64>) {
*self.observed_submitted_nonces.lock().expect("lock") = value;
}

pub fn set_observed_submitted_error(&self, value: Option<&str>) {
*self.observed_submitted_error.lock().expect("lock") = value.map(str::to_string);
}

pub fn last_from_block(&self) -> Option<u64> {
*self.last_from_block.lock().expect("lock")
}
}

#[async_trait]
impl BatchPoster for MockBatchPoster {
async fn submit_batch(&self, payload: Vec<u8>) -> Result<TxHash, BatchPosterError> {
if *self.fail_submit.lock().expect("lock") {
return Err(BatchPosterError::Provider("mock submit fail".into()));
}
let batch_index = ssz::Decode::from_ssz_bytes(payload.as_ref())
.map(|b: Batch| b.nonce)
.unwrap_or(0);
self.submissions
.lock()
.expect("lock")
.push((batch_index, payload.len()));
Ok(TxHash::ZERO)
}

async fn observed_submitted_batch_nonces(
&self,
from_block: u64,
) -> Result<Vec<u64>, BatchPosterError> {
*self.last_from_block.lock().expect("lock") = Some(from_block);
if let Some(err) = self.observed_submitted_error.lock().expect("lock").clone() {
return Err(BatchPosterError::Provider(err));
}
let configured = self.observed_submitted_nonces.lock().expect("lock").clone();
if !configured.is_empty() {
return Ok(configured);
}
Ok(self
.submissions
.lock()
.expect("lock")
.iter()
.map(|(idx, _)| *idx)
.collect())
}
}
}

#[cfg(test)]
mod tests {
use super::{BatchPoster, mock::MockBatchPoster};

#[tokio::test]
async fn mock_poster_tracks_requested_suffix_start_block() {
let poster = MockBatchPoster::new();
let observed = poster
.observed_submitted_batch_nonces(42)
.await
.expect("observe submitted batches");

assert!(observed.is_empty());
assert_eq!(poster.last_from_block(), Some(42));
}
}
2 changes: 0 additions & 2 deletions sequencer/src/batch_submitter/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ use std::time::Duration;
pub struct BatchSubmitterConfig {
/// How often the submitter polls for new work when idle.
pub idle_poll_interval_ms: u64,
/// Maximum number of batches to submit in a single loop iteration.
pub max_batches_per_loop: usize,
}

impl BatchSubmitterConfig {
Expand Down
6 changes: 5 additions & 1 deletion sequencer/src/batch_submitter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
//! strictly increasing and invalidates otherwise, so duplicates are deduplicated at the
//! scheduler level. See `worker` for the wake → read S → compare → submit → sleep loop.

mod batch_poster;
mod config;
mod worker;

pub use batch_poster::{
BatchPoster, BatchPosterConfig, BatchPosterError, EthereumBatchPoster, TxHash,
};
pub use config::BatchSubmitterConfig;
pub use worker::BatchSubmitter;
pub use worker::{BatchSubmitter, BatchSubmitterError, TickOutcome};
Loading
Loading