From 77e9cc43996bf4c615edc2f037f2919e4a1d2af7 Mon Sep 17 00:00:00 2001 From: Tom Date: Mon, 9 Sep 2024 11:28:32 +1000 Subject: [PATCH] WIP --- Cargo.lock | 10 ++++++++++ Cargo.toml | 1 + crates/node/Cargo.toml | 17 ++++++++++++----- crates/node/src/db.rs | 6 ++++++ crates/node/src/state/tests.rs | 21 +++++++++++++++++---- crates/node/src/test_utils.rs | 20 ++++++++++++++++++++ 6 files changed, 66 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fedd5da..c01fa3b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -553,6 +553,7 @@ dependencies = [ "tokio-stream", "tracing", "tracing-subscriber", + "uuid", ] [[package]] @@ -2220,6 +2221,15 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" +dependencies = [ + "getrandom", +] + [[package]] name = "valuable" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 7cd9763..8bbdc4e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ tower = "0.4.13" # Update to 0.5 pending axum tower update tower-http = { version = "0.5.2", features = ["cors"] } tracing = "0.1.40" tracing-subscriber = { version = "0.3", features = ["env-filter"] } +uuid = { version = "1.10.0", features = ["v4"] } essential-node = { path = "crates/node", version = "0.1.0" } essential-node-api = { path = "crates/node-api", version = "0.1.0" } diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index f57f46e..fe10509 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -1,11 +1,11 @@ [package] -name = "essential-node" -version = "0.1.0" +authors.workspace = true description = "Derive state from essential protocol" edition.workspace = true -authors.workspace = true homepage.workspace = true +name = "essential-node" repository.workspace = true +version = "0.1.0" [dependencies] essential-constraint-asm = { workspace = true, optional = true } @@ -21,17 +21,24 @@ rusqlite-pool = { workspace = true, features = ["tokio"] } thiserror = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } -tracing = { workspace = true, optional = true } +tracing = { workspace = true, optional = true } +uuid = { workspace = true, optional = true } [dev-dependencies] essential-constraint-asm = { workspace = true } essential-sign = { workspace = true } essential-state-asm = { workspace = true } reqwest = { workspace = true } +rusqlite = { workspace = true, features = ["hooks"] } secp256k1 = { workspace = true } tracing-subscriber = { workspace = true } +uuid.workspace = true [features] default = [] +test-utils = [ + "dep:essential-constraint-asm", + "dep:essential-state-asm", + "dep:uuid", +] tracing = ["dep:tracing", "essential-relayer/tracing"] -test-utils = ["dep:essential-constraint-asm", "dep:essential-state-asm"] diff --git a/crates/node/src/db.rs b/crates/node/src/db.rs index 9e9754a..a785870 100644 --- a/crates/node/src/db.rs +++ b/crates/node/src/db.rs @@ -71,6 +71,12 @@ impl ConnectionPool { Ok(Self(new_conn_pool(conf)?)) } + #[cfg(any(feature = "test-utils", test))] + /// Create a new connection pool from the inner pool for testing purposes. + pub fn new_from_inner(inner: AsyncConnectionPool) -> Self { + Self(inner) + } + /// Acquire a temporary database [`ConnectionHandle`] from the inner pool. /// /// In the case that all connections are busy, waits for the first available diff --git a/crates/node/src/state/tests.rs b/crates/node/src/state/tests.rs index ed7f0d7..1b19089 100644 --- a/crates/node/src/state/tests.rs +++ b/crates/node/src/state/tests.rs @@ -1,7 +1,7 @@ use super::*; use crate::test_utils::{ self, assert_multiple_block_mutations, assert_state_progress_is_none, - assert_state_progress_is_some, test_conn_pool, + assert_state_progress_is_some, test_conn_pool, test_mem_conn_pool_modify, }; use essential_node_db::{create_tables, insert_block, insert_contract}; use essential_types::{contract::Contract, Block}; @@ -34,7 +34,16 @@ async fn can_derive_state() { #[cfg(feature = "tracing")] let _ = tracing_subscriber::fmt::try_init(); - let conn_pool = test_conn_pool("can_derive_state"); + let (tx, mut rx) = tokio::sync::mpsc::channel(100); + + let conn_pool = test_mem_conn_pool_modify(4, |conn| { + let tx = tx.clone(); + conn.commit_hook(Some(move || { + tx.try_send(()).unwrap(); + false + })); + conn + }); let mut conn = conn_pool.acquire().await.unwrap(); let tx = conn.transaction().unwrap(); @@ -44,13 +53,14 @@ async fn can_derive_state() { let test_blocks_count = 4; let (test_blocks, contracts) = test_utils::test_blocks(test_blocks_count); insert_contracts_to_db(&mut conn, contracts); - tokio::time::sleep(Duration::from_millis(100)).await; let blocks = test_blocks; let hashes = blocks.iter().map(content_addr).collect::>(); let (state_tx, state_rx) = tokio::sync::watch::channel(()); + while rx.try_recv().is_ok() {} + let handle = derive_state_stream(conn_pool.clone(), state_rx, state_tx.clone()).unwrap(); // Initially, the state progress is none @@ -58,7 +68,10 @@ async fn can_derive_state() { // Process block 0 insert_block_and_send_notification(&mut conn, &blocks[0], &state_tx); - tokio::time::sleep(Duration::from_millis(100)).await; + for _ in 0..2 { + rx.recv().await.unwrap(); + } + // Assert state progress is block 0 assert_state_progress_is_some(&conn, &blocks[0], &hashes[0]); // Assert mutations in block 0 are in database diff --git a/crates/node/src/test_utils.rs b/crates/node/src/test_utils.rs index e7edaec..29ffaa8 100644 --- a/crates/node/src/test_utils.rs +++ b/crates/node/src/test_utils.rs @@ -9,6 +9,7 @@ use essential_types::{ Block, ConstraintBytecode, ContentAddress, PredicateAddress, StateReadBytecode, Word, }; use rusqlite::Connection; +use rusqlite_pool::tokio::AsyncConnectionPool; use std::{process::Stdio, time::Duration}; use tokio::{ io::{AsyncBufReadExt, BufReader}, @@ -29,6 +30,25 @@ pub fn test_db_conf(id: &str) -> crate::Config { conf } +pub fn test_mem_conn_pool_modify( + conn_limit: usize, + modify: impl Fn(Connection) -> Connection, +) -> ConnectionPool { + let uuid = uuid::Uuid::new_v4(); + let conn_str = format!("file:/{uuid}"); + let conn = AsyncConnectionPool::new(conn_limit, || { + let conn = rusqlite::Connection::open_with_flags_and_vfs( + conn_str.clone(), + Default::default(), + "memdb", + )?; + let conn = modify(conn); + Ok(conn) + }) + .unwrap(); + ConnectionPool::new_from_inner(conn) +} + pub fn test_blocks(n: u64) -> (Vec, Vec) { let (blocks, contracts) = (0..n) .map(|i| test_block(i, Duration::from_secs(i as _)))