Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ essential-node-api = { path = "../node-api" }
essential-sign = { workspace = true }
essential-state-asm = { workspace = true }
reqwest = { workspace = true }
rusqlite = { workspace = true, features = ["hooks"] }
secp256k1 = { workspace = true }
tempfile = { workspace = true }
tracing-subscriber = { workspace = true }
Expand Down
6 changes: 6 additions & 0 deletions crates/node/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ impl ConnectionPool {
Ok(Self(new_conn_pool(conf)?))
}

#[cfg(any(feature = "test-utils", test))]
/// Create a new connection pool from the inner pool for testing purposes.
pub fn new_from_inner(inner: AsyncConnectionPool) -> Self {
Self(inner)
}

/// Acquire a temporary database [`ConnectionHandle`] from the inner pool.
///
/// In the case that all connections are busy, waits for the first available
Expand Down
26 changes: 14 additions & 12 deletions crates/node/src/state_derivation/tests.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
use super::*;
use crate::test_utils::{
self, assert_multiple_block_mutations, assert_state_progress_is_none,
assert_state_progress_is_some, test_conn_pool,
assert_state_progress_is_some, test_conn_pool_with_hook,
};
use essential_node_db::{insert_block, insert_contract};
use essential_types::{contract::Contract, Block};
use rusqlite::Connection;
use std::time::Duration;

// Insert a block to the database and send a notification to the stream
fn insert_block_and_send_notification(
async fn insert_block_and_send_notification(
conn: &mut Connection,
block: &Block,
state_tx: &tokio::sync::watch::Sender<()>,
rx: &mut tokio::sync::mpsc::Receiver<()>,
) {
let tx = conn.transaction().unwrap();
insert_block(&tx, block).unwrap();
tx.commit().unwrap();
state_tx.send(()).unwrap();
for _ in 0..2 {
rx.recv().await.unwrap();
}
}

fn insert_contracts_to_db(conn: &mut Connection, contracts: Vec<Contract>) {
Expand All @@ -33,7 +36,7 @@ async fn can_derive_state() {
#[cfg(feature = "tracing")]
let _ = tracing_subscriber::fmt::try_init();

let conn_pool = test_conn_pool();
let (conn_pool, mut rx) = test_conn_pool_with_hook();
let mut conn = conn_pool.acquire().await.unwrap();

let test_blocks_count = 4;
Expand All @@ -50,28 +53,27 @@ async fn can_derive_state() {
// Initially, the state progress is none
assert_state_progress_is_none(&conn);

while rx.try_recv().is_ok() {}

// Process block 0
insert_block_and_send_notification(&mut conn, &blocks[0], &state_tx);
tokio::time::sleep(Duration::from_millis(100)).await;
insert_block_and_send_notification(&mut conn, &blocks[0], &state_tx, &mut rx).await;

// Assert state progress is block 0
assert_state_progress_is_some(&conn, &hashes[0]);
// Assert mutations in block 0 are in database
assert_multiple_block_mutations(&conn, &[&blocks[0]]);

// Process block 1
insert_block_and_send_notification(&mut conn, &blocks[1], &state_tx);
tokio::time::sleep(Duration::from_millis(100)).await;
insert_block_and_send_notification(&mut conn, &blocks[1], &state_tx, &mut rx).await;
// Process block 2
insert_block_and_send_notification(&mut conn, &blocks[2], &state_tx);
tokio::time::sleep(Duration::from_millis(100)).await;
insert_block_and_send_notification(&mut conn, &blocks[2], &state_tx, &mut rx).await;
// Assert state progress is block 2
assert_state_progress_is_some(&conn, &hashes[2]);
// Assert mutations in block 1 and 2 are in database
assert_multiple_block_mutations(&conn, &[&blocks[1], &blocks[2]]);

// Process block 3
insert_block_and_send_notification(&mut conn, &blocks[3], &state_tx);
tokio::time::sleep(Duration::from_millis(100)).await;
insert_block_and_send_notification(&mut conn, &blocks[3], &state_tx, &mut rx).await;
// Assert state progress is block 3
assert_state_progress_is_some(&conn, &hashes[3]);
// Assert mutations in block 3 are in database
Expand Down
41 changes: 40 additions & 1 deletion crates/node/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(dead_code)]

use crate::db::{Config, ConnectionPool, Source};
use crate::db::{with_tx, Config, ConnectionPool, Source};
use essential_node_db::{get_state_progress, get_validation_progress, query_state};
use essential_types::{
contract::Contract,
Expand All @@ -9,6 +9,7 @@ use essential_types::{
Block, ConstraintBytecode, ContentAddress, PredicateAddress, StateReadBytecode, Word,
};
use rusqlite::Connection;
use rusqlite_pool::tokio::AsyncConnectionPool;
use std::time::Duration;

pub fn test_conn_pool() -> ConnectionPool {
Expand All @@ -23,6 +24,44 @@ pub fn test_db_conf() -> Config {
}
}

pub fn test_conn_pool_with_hook() -> (ConnectionPool, tokio::sync::mpsc::Receiver<()>) {
let (tx, rx) = tokio::sync::mpsc::channel(100);
let conn_pool = test_mem_conn_pool_modify(4, |conn| {
let tx = tx.clone();
conn.commit_hook(Some(move || {
tx.try_send(()).unwrap();
false
}));
conn
});
(conn_pool, rx)
}

fn test_mem_conn_pool_modify(
conn_limit: usize,
modify: impl Fn(Connection) -> Connection,
) -> ConnectionPool {
let uuid = uuid::Uuid::new_v4();
let conn_str = format!("file:/{uuid}");
let conn = AsyncConnectionPool::new(conn_limit, || {
let conn = rusqlite::Connection::open_with_flags_and_vfs(
conn_str.clone(),
Default::default(),
"memdb",
)?;
let conn = modify(conn);
Ok(conn)
})
.unwrap();
let db = ConnectionPool::new_from_inner(conn);

let mut conn = db.try_acquire().unwrap();
conn.pragma_update(None, "foreign_keys", true).unwrap();
with_tx(&mut conn, |tx| essential_node_db::create_tables(tx)).unwrap();

db
}

pub fn test_blocks(n: Word) -> (Vec<Block>, Vec<Contract>) {
let (blocks, contracts) = (0..n)
.map(|i| test_block(i, Duration::from_secs(i as _)))
Expand Down
44 changes: 23 additions & 21 deletions crates/node/src/validation/tests.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
use super::*;
use crate::test_utils::{
assert_validation_progress_is_none, assert_validation_progress_is_some, test_blocks,
test_conn_pool, test_invalid_block,
test_conn_pool_with_hook, test_invalid_block,
};
use essential_node_db::{insert_block, insert_contract};
use essential_types::{contract::Contract, Block, Word};
use rusqlite::Connection;
use std::time::Duration;

// Insert a block to the database and send a notification to the stream
fn insert_block_and_send_notification(
async fn insert_block_and_send_notification(
conn: &mut Connection,
block: &Block,
state_rx: &tokio::sync::watch::Sender<()>,
rx: &mut tokio::sync::mpsc::Receiver<()>,
) {
let tx = conn.transaction().unwrap();
insert_block(&tx, block).unwrap();
tx.commit().unwrap();
state_rx.send(()).unwrap();
for _ in 0..2 {
rx.recv().await.unwrap();
}
}

fn insert_contracts_to_db(conn: &mut Connection, contracts: Vec<Contract>) {
Expand All @@ -33,7 +37,7 @@ async fn can_validate() {
#[cfg(feature = "tracing")]
let _ = tracing_subscriber::fmt::try_init();

let conn_pool = test_conn_pool();
let (conn_pool, mut rx) = test_conn_pool_with_hook();
let mut conn = conn_pool.acquire().await.unwrap();

const NUM_TEST_BLOCKS: Word = 4;
Expand All @@ -50,24 +54,22 @@ async fn can_validate() {
// Initially, the validation progress is none
assert_validation_progress_is_none(&conn);

while rx.try_recv().is_ok() {}

// Process block 0
insert_block_and_send_notification(&mut conn, &blocks[0], &block_tx);
tokio::time::sleep(Duration::from_millis(100)).await;
insert_block_and_send_notification(&mut conn, &blocks[0], &block_tx, &mut rx).await;
// Assert validation progress is block 0
assert_validation_progress_is_some(&conn, &hashes[0]);

// Process block 1
insert_block_and_send_notification(&mut conn, &blocks[1], &block_tx);
tokio::time::sleep(Duration::from_millis(100)).await;
insert_block_and_send_notification(&mut conn, &blocks[1], &block_tx, &mut rx).await;
// Process block 2
insert_block_and_send_notification(&mut conn, &blocks[2], &block_tx);
tokio::time::sleep(Duration::from_millis(100)).await;
insert_block_and_send_notification(&mut conn, &blocks[2], &block_tx, &mut rx).await;
// Assert validation progress is block 2
assert_validation_progress_is_some(&conn, &hashes[2]);

// Process block 3
insert_block_and_send_notification(&mut conn, &blocks[3], &block_tx);
tokio::time::sleep(Duration::from_millis(100)).await;
insert_block_and_send_notification(&mut conn, &blocks[3], &block_tx, &mut rx).await;
// Assert validation progress is block 3
assert_validation_progress_is_some(&conn, &hashes[3]);

Expand All @@ -76,7 +78,7 @@ async fn can_validate() {

#[tokio::test]
async fn test_invalid_block_validation() {
let conn_pool = test_conn_pool();
let (conn_pool, mut rx) = test_conn_pool_with_hook();
let mut conn = conn_pool.acquire().await.unwrap();

let (block, contract) = test_invalid_block(0, Duration::from_secs(0));
Expand All @@ -89,9 +91,10 @@ async fn test_invalid_block_validation() {
// Initially, the validation progress is none
assert_validation_progress_is_none(&conn);

while rx.try_recv().is_ok() {}

// Process invalid block
insert_block_and_send_notification(&mut conn, &block, &block_tx);
tokio::time::sleep(Duration::from_millis(100)).await;
insert_block_and_send_notification(&mut conn, &block, &block_tx, &mut rx).await;
// Assert validation progress is still none
assert_validation_progress_is_none(&conn);
// Assert block is in failed blocks table
Expand All @@ -111,7 +114,7 @@ async fn can_process_valid_and_invalid_blocks() {
#[cfg(feature = "tracing")]
let _ = tracing_subscriber::fmt::try_init();

let conn_pool = test_conn_pool();
let (conn_pool, mut rx) = test_conn_pool_with_hook();
let mut conn = conn_pool.acquire().await.unwrap();

// Two valid blocks with number 0 and 1
Expand All @@ -131,15 +134,15 @@ async fn can_process_valid_and_invalid_blocks() {
// Initially, the validation progress is none
assert_validation_progress_is_none(&conn);

while rx.try_recv().is_ok() {}

// Process block 0
insert_block_and_send_notification(&mut conn, &blocks[0], &block_tx);
tokio::time::sleep(Duration::from_millis(100)).await;
insert_block_and_send_notification(&mut conn, &blocks[0], &block_tx, &mut rx).await;
// Assert validation progress is block 0
assert_validation_progress_is_some(&conn, &hashes[0]);

// Process invalid block
insert_block_and_send_notification(&mut conn, &invalid_block, &block_tx);
tokio::time::sleep(Duration::from_millis(100)).await;
insert_block_and_send_notification(&mut conn, &invalid_block, &block_tx, &mut rx).await;
// Assert validation progress is still block 0
assert_validation_progress_is_some(&conn, &hashes[0]);
// Assert block is in failed blocks table
Expand All @@ -152,8 +155,7 @@ async fn can_process_valid_and_invalid_blocks() {
);

// Process block 1
insert_block_and_send_notification(&mut conn, &blocks[1], &block_tx);
tokio::time::sleep(Duration::from_millis(100)).await;
insert_block_and_send_notification(&mut conn, &blocks[1], &block_tx, &mut rx).await;
// Assert validation progress is block 1
assert_validation_progress_is_some(&conn, &hashes[1]);

Expand Down
Loading