diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 0d52607..87a8181 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -32,6 +32,7 @@ essential-node-api = { path = "../node-api" } essential-sign = { workspace = true } essential-state-asm = { workspace = true } reqwest = { workspace = true } +rusqlite = { workspace = true, features = ["hooks"] } secp256k1 = { workspace = true } tempfile = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/crates/node/src/db.rs b/crates/node/src/db.rs index 59fc85f..8057f7b 100644 --- a/crates/node/src/db.rs +++ b/crates/node/src/db.rs @@ -76,6 +76,12 @@ impl ConnectionPool { Ok(Self(new_conn_pool(conf)?)) } + #[cfg(any(feature = "test-utils", test))] + /// Create a new connection pool from the inner pool for testing purposes. + pub fn new_from_inner(inner: AsyncConnectionPool) -> Self { + Self(inner) + } + /// Acquire a temporary database [`ConnectionHandle`] from the inner pool. /// /// In the case that all connections are busy, waits for the first available diff --git a/crates/node/src/state_derivation/tests.rs b/crates/node/src/state_derivation/tests.rs index 4701af5..acd126e 100644 --- a/crates/node/src/state_derivation/tests.rs +++ b/crates/node/src/state_derivation/tests.rs @@ -1,23 +1,26 @@ use super::*; use crate::test_utils::{ self, assert_multiple_block_mutations, assert_state_progress_is_none, - assert_state_progress_is_some, test_conn_pool, + assert_state_progress_is_some, test_conn_pool_with_hook, }; use essential_node_db::{insert_block, insert_contract}; use essential_types::{contract::Contract, Block}; use rusqlite::Connection; -use std::time::Duration; // Insert a block to the database and send a notification to the stream -fn insert_block_and_send_notification( +async fn insert_block_and_send_notification( conn: &mut Connection, block: &Block, state_tx: &tokio::sync::watch::Sender<()>, + rx: &mut tokio::sync::mpsc::Receiver<()>, ) { let tx = conn.transaction().unwrap(); insert_block(&tx, block).unwrap(); tx.commit().unwrap(); state_tx.send(()).unwrap(); + for _ in 0..2 { + rx.recv().await.unwrap(); + } } fn insert_contracts_to_db(conn: &mut Connection, contracts: Vec) { @@ -33,7 +36,7 @@ async fn can_derive_state() { #[cfg(feature = "tracing")] let _ = tracing_subscriber::fmt::try_init(); - let conn_pool = test_conn_pool(); + let (conn_pool, mut rx) = test_conn_pool_with_hook(); let mut conn = conn_pool.acquire().await.unwrap(); let test_blocks_count = 4; @@ -50,28 +53,27 @@ async fn can_derive_state() { // Initially, the state progress is none assert_state_progress_is_none(&conn); + while rx.try_recv().is_ok() {} + // Process block 0 - insert_block_and_send_notification(&mut conn, &blocks[0], &state_tx); - tokio::time::sleep(Duration::from_millis(100)).await; + insert_block_and_send_notification(&mut conn, &blocks[0], &state_tx, &mut rx).await; + // Assert state progress is block 0 assert_state_progress_is_some(&conn, &hashes[0]); // Assert mutations in block 0 are in database assert_multiple_block_mutations(&conn, &[&blocks[0]]); // Process block 1 - insert_block_and_send_notification(&mut conn, &blocks[1], &state_tx); - tokio::time::sleep(Duration::from_millis(100)).await; + insert_block_and_send_notification(&mut conn, &blocks[1], &state_tx, &mut rx).await; // Process block 2 - insert_block_and_send_notification(&mut conn, &blocks[2], &state_tx); - tokio::time::sleep(Duration::from_millis(100)).await; + insert_block_and_send_notification(&mut conn, &blocks[2], &state_tx, &mut rx).await; // Assert state progress is block 2 assert_state_progress_is_some(&conn, &hashes[2]); // Assert mutations in block 1 and 2 are in database assert_multiple_block_mutations(&conn, &[&blocks[1], &blocks[2]]); // Process block 3 - insert_block_and_send_notification(&mut conn, &blocks[3], &state_tx); - tokio::time::sleep(Duration::from_millis(100)).await; + insert_block_and_send_notification(&mut conn, &blocks[3], &state_tx, &mut rx).await; // Assert state progress is block 3 assert_state_progress_is_some(&conn, &hashes[3]); // Assert mutations in block 3 are in database diff --git a/crates/node/src/test_utils.rs b/crates/node/src/test_utils.rs index 84fb0b3..8ef5cb7 100644 --- a/crates/node/src/test_utils.rs +++ b/crates/node/src/test_utils.rs @@ -1,6 +1,6 @@ #![allow(dead_code)] -use crate::db::{Config, ConnectionPool, Source}; +use crate::db::{with_tx, Config, ConnectionPool, Source}; use essential_node_db::{get_state_progress, get_validation_progress, query_state}; use essential_types::{ contract::Contract, @@ -9,6 +9,7 @@ use essential_types::{ Block, ConstraintBytecode, ContentAddress, PredicateAddress, StateReadBytecode, Word, }; use rusqlite::Connection; +use rusqlite_pool::tokio::AsyncConnectionPool; use std::time::Duration; pub fn test_conn_pool() -> ConnectionPool { @@ -23,6 +24,44 @@ pub fn test_db_conf() -> Config { } } +pub fn test_conn_pool_with_hook() -> (ConnectionPool, tokio::sync::mpsc::Receiver<()>) { + let (tx, rx) = tokio::sync::mpsc::channel(100); + let conn_pool = test_mem_conn_pool_modify(4, |conn| { + let tx = tx.clone(); + conn.commit_hook(Some(move || { + tx.try_send(()).unwrap(); + false + })); + conn + }); + (conn_pool, rx) +} + +fn test_mem_conn_pool_modify( + conn_limit: usize, + modify: impl Fn(Connection) -> Connection, +) -> ConnectionPool { + let uuid = uuid::Uuid::new_v4(); + let conn_str = format!("file:/{uuid}"); + let conn = AsyncConnectionPool::new(conn_limit, || { + let conn = rusqlite::Connection::open_with_flags_and_vfs( + conn_str.clone(), + Default::default(), + "memdb", + )?; + let conn = modify(conn); + Ok(conn) + }) + .unwrap(); + let db = ConnectionPool::new_from_inner(conn); + + let mut conn = db.try_acquire().unwrap(); + conn.pragma_update(None, "foreign_keys", true).unwrap(); + with_tx(&mut conn, |tx| essential_node_db::create_tables(tx)).unwrap(); + + db +} + pub fn test_blocks(n: Word) -> (Vec, Vec) { let (blocks, contracts) = (0..n) .map(|i| test_block(i, Duration::from_secs(i as _))) diff --git a/crates/node/src/validation/tests.rs b/crates/node/src/validation/tests.rs index 3a315c2..631333d 100644 --- a/crates/node/src/validation/tests.rs +++ b/crates/node/src/validation/tests.rs @@ -1,7 +1,7 @@ use super::*; use crate::test_utils::{ assert_validation_progress_is_none, assert_validation_progress_is_some, test_blocks, - test_conn_pool, test_invalid_block, + test_conn_pool_with_hook, test_invalid_block, }; use essential_node_db::{insert_block, insert_contract}; use essential_types::{contract::Contract, Block, Word}; @@ -9,15 +9,19 @@ use rusqlite::Connection; use std::time::Duration; // Insert a block to the database and send a notification to the stream -fn insert_block_and_send_notification( +async fn insert_block_and_send_notification( conn: &mut Connection, block: &Block, state_rx: &tokio::sync::watch::Sender<()>, + rx: &mut tokio::sync::mpsc::Receiver<()>, ) { let tx = conn.transaction().unwrap(); insert_block(&tx, block).unwrap(); tx.commit().unwrap(); state_rx.send(()).unwrap(); + for _ in 0..2 { + rx.recv().await.unwrap(); + } } fn insert_contracts_to_db(conn: &mut Connection, contracts: Vec) { @@ -33,7 +37,7 @@ async fn can_validate() { #[cfg(feature = "tracing")] let _ = tracing_subscriber::fmt::try_init(); - let conn_pool = test_conn_pool(); + let (conn_pool, mut rx) = test_conn_pool_with_hook(); let mut conn = conn_pool.acquire().await.unwrap(); const NUM_TEST_BLOCKS: Word = 4; @@ -50,24 +54,22 @@ async fn can_validate() { // Initially, the validation progress is none assert_validation_progress_is_none(&conn); + while rx.try_recv().is_ok() {} + // Process block 0 - insert_block_and_send_notification(&mut conn, &blocks[0], &block_tx); - tokio::time::sleep(Duration::from_millis(100)).await; + insert_block_and_send_notification(&mut conn, &blocks[0], &block_tx, &mut rx).await; // Assert validation progress is block 0 assert_validation_progress_is_some(&conn, &hashes[0]); // Process block 1 - insert_block_and_send_notification(&mut conn, &blocks[1], &block_tx); - tokio::time::sleep(Duration::from_millis(100)).await; + insert_block_and_send_notification(&mut conn, &blocks[1], &block_tx, &mut rx).await; // Process block 2 - insert_block_and_send_notification(&mut conn, &blocks[2], &block_tx); - tokio::time::sleep(Duration::from_millis(100)).await; + insert_block_and_send_notification(&mut conn, &blocks[2], &block_tx, &mut rx).await; // Assert validation progress is block 2 assert_validation_progress_is_some(&conn, &hashes[2]); // Process block 3 - insert_block_and_send_notification(&mut conn, &blocks[3], &block_tx); - tokio::time::sleep(Duration::from_millis(100)).await; + insert_block_and_send_notification(&mut conn, &blocks[3], &block_tx, &mut rx).await; // Assert validation progress is block 3 assert_validation_progress_is_some(&conn, &hashes[3]); @@ -76,7 +78,7 @@ async fn can_validate() { #[tokio::test] async fn test_invalid_block_validation() { - let conn_pool = test_conn_pool(); + let (conn_pool, mut rx) = test_conn_pool_with_hook(); let mut conn = conn_pool.acquire().await.unwrap(); let (block, contract) = test_invalid_block(0, Duration::from_secs(0)); @@ -89,9 +91,10 @@ async fn test_invalid_block_validation() { // Initially, the validation progress is none assert_validation_progress_is_none(&conn); + while rx.try_recv().is_ok() {} + // Process invalid block - insert_block_and_send_notification(&mut conn, &block, &block_tx); - tokio::time::sleep(Duration::from_millis(100)).await; + insert_block_and_send_notification(&mut conn, &block, &block_tx, &mut rx).await; // Assert validation progress is still none assert_validation_progress_is_none(&conn); // Assert block is in failed blocks table @@ -111,7 +114,7 @@ async fn can_process_valid_and_invalid_blocks() { #[cfg(feature = "tracing")] let _ = tracing_subscriber::fmt::try_init(); - let conn_pool = test_conn_pool(); + let (conn_pool, mut rx) = test_conn_pool_with_hook(); let mut conn = conn_pool.acquire().await.unwrap(); // Two valid blocks with number 0 and 1 @@ -131,15 +134,15 @@ async fn can_process_valid_and_invalid_blocks() { // Initially, the validation progress is none assert_validation_progress_is_none(&conn); + while rx.try_recv().is_ok() {} + // Process block 0 - insert_block_and_send_notification(&mut conn, &blocks[0], &block_tx); - tokio::time::sleep(Duration::from_millis(100)).await; + insert_block_and_send_notification(&mut conn, &blocks[0], &block_tx, &mut rx).await; // Assert validation progress is block 0 assert_validation_progress_is_some(&conn, &hashes[0]); // Process invalid block - insert_block_and_send_notification(&mut conn, &invalid_block, &block_tx); - tokio::time::sleep(Duration::from_millis(100)).await; + insert_block_and_send_notification(&mut conn, &invalid_block, &block_tx, &mut rx).await; // Assert validation progress is still block 0 assert_validation_progress_is_some(&conn, &hashes[0]); // Assert block is in failed blocks table @@ -152,8 +155,7 @@ async fn can_process_valid_and_invalid_blocks() { ); // Process block 1 - insert_block_and_send_notification(&mut conn, &blocks[1], &block_tx); - tokio::time::sleep(Duration::from_millis(100)).await; + insert_block_and_send_notification(&mut conn, &blocks[1], &block_tx, &mut rx).await; // Assert validation progress is block 1 assert_validation_progress_is_some(&conn, &hashes[1]); diff --git a/crates/node/tests/run.rs b/crates/node/tests/run.rs index b458bd6..e51097e 100644 --- a/crates/node/tests/run.rs +++ b/crates/node/tests/run.rs @@ -7,7 +7,7 @@ use essential_node::{ test_utils::{ assert_multiple_block_mutations, assert_state_progress_is_none, assert_state_progress_is_some, assert_validation_progress_is_none, - assert_validation_progress_is_some, test_blocks, test_db_conf, + assert_validation_progress_is_some, test_blocks, test_conn_pool_with_hook, }, BlockTx, RunConfig, }; @@ -22,13 +22,25 @@ struct NodeServer { conn_pool: ConnectionPool, } +async fn insert_block_and_notify( + conn_pool: &ConnectionPool, + block: Arc, + source_block_tx: &BlockTx, + rx: &mut tokio::sync::mpsc::Receiver<()>, +) { + conn_pool.insert_block(block).await.unwrap(); + source_block_tx.notify(); + for _ in 0..3 { + rx.recv().await.unwrap(); + } +} + #[tokio::test] async fn test_run() { let (node_server, source_block_tx) = test_node().await; // Setup node - let conf = test_db_conf(); - let db = node::db(&conf).unwrap(); + let (conn_pool, mut rx) = test_conn_pool_with_hook(); // Run node let block_tx = BlockTx::new(); @@ -37,7 +49,8 @@ async fn test_run() { run_state_derivation: true, run_validation: true, }; - let _handle = node::run(db.clone(), run_conf, block_tx).unwrap(); + + let _handle = node::run(conn_pool.clone(), run_conf, block_tx).unwrap(); // Create test blocks let test_blocks_count = 4; @@ -45,59 +58,60 @@ async fn test_run() { // Insert contracts to database for contract in &test_contracts { - db.insert_contract(Arc::new(contract.clone()), 0) + conn_pool + .insert_contract(Arc::new(contract.clone()), 0) .await .unwrap(); } - let conn = db.acquire().await.unwrap(); + let conn = conn_pool.acquire().await.unwrap(); // Initially, the state progress and validation progress are none assert_state_progress_is_none(&conn); assert_validation_progress_is_none(&conn); + while rx.try_recv().is_ok() {} + // Insert block 0 to database and send notification - node_server - .conn_pool - .insert_block(test_blocks[0].clone().into()) - .await - .unwrap(); - source_block_tx.notify(); - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + insert_block_and_notify( + &node_server.conn_pool, + test_blocks[0].clone().into(), + &source_block_tx, + &mut rx, + ) + .await; // Check block, state and state progress - let conn = db.acquire().await.unwrap(); assert_submit_solutions_effects(&conn, vec![test_blocks[0].clone()]); // Insert block 1 and 2 to database and send notification - node_server - .conn_pool - .insert_block(test_blocks[1].clone().into()) - .await - .unwrap(); - source_block_tx.notify(); - node_server - .conn_pool - .insert_block(test_blocks[2].clone().into()) - .await - .unwrap(); - source_block_tx.notify(); - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + insert_block_and_notify( + &node_server.conn_pool, + test_blocks[1].clone().into(), + &source_block_tx, + &mut rx, + ) + .await; + insert_block_and_notify( + &node_server.conn_pool, + test_blocks[2].clone().into(), + &source_block_tx, + &mut rx, + ) + .await; // Check block, state and state progress - let conn = db.acquire().await.unwrap(); assert_submit_solutions_effects(&conn, vec![test_blocks[1].clone(), test_blocks[2].clone()]); // Insert block 3 to database and send notification - node_server - .conn_pool - .insert_block(test_blocks[3].clone().into()) - .await - .unwrap(); - source_block_tx.notify(); - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + insert_block_and_notify( + &node_server.conn_pool, + test_blocks[3].clone().into(), + &source_block_tx, + &mut rx, + ) + .await; // Check block, state and state progress - let conn = db.acquire().await.unwrap(); assert_submit_solutions_effects(&conn, vec![test_blocks[3].clone()]); }