Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ tower = "0.4.13" # Update to 0.5 pending axum tower update
tower-http = { version = "0.5.2", features = ["cors"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "1.10.0", features = ["v4"] }

essential-node = { path = "crates/node", version = "0.1.0" }
essential-node-api = { path = "crates/node-api", version = "0.1.0" }
Expand Down
17 changes: 12 additions & 5 deletions crates/node/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
[package]
name = "essential-node"
version = "0.1.0"
authors.workspace = true
description = "Derive state from essential protocol"
edition.workspace = true
authors.workspace = true
homepage.workspace = true
name = "essential-node"
repository.workspace = true
version = "0.1.0"

[dependencies]
essential-constraint-asm = { workspace = true, optional = true }
Expand All @@ -21,17 +21,24 @@ rusqlite-pool = { workspace = true, features = ["tokio"] }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tracing = { workspace = true, optional = true }
tracing = { workspace = true, optional = true }
uuid = { workspace = true, optional = true }

[dev-dependencies]
essential-constraint-asm = { workspace = true }
essential-sign = { workspace = true }
essential-state-asm = { workspace = true }
reqwest = { workspace = true }
rusqlite = { workspace = true, features = ["hooks"] }
secp256k1 = { workspace = true }
tracing-subscriber = { workspace = true }
uuid.workspace = true

[features]
default = []
test-utils = [
"dep:essential-constraint-asm",
"dep:essential-state-asm",
"dep:uuid",
]
tracing = ["dep:tracing", "essential-relayer/tracing"]
test-utils = ["dep:essential-constraint-asm", "dep:essential-state-asm"]
6 changes: 6 additions & 0 deletions crates/node/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ impl ConnectionPool {
Ok(Self(new_conn_pool(conf)?))
}

#[cfg(any(feature = "test-utils", test))]
/// Create a new connection pool from the inner pool for testing purposes.
pub fn new_from_inner(inner: AsyncConnectionPool) -> Self {
Self(inner)
}

/// Acquire a temporary database [`ConnectionHandle`] from the inner pool.
///
/// In the case that all connections are busy, waits for the first available
Expand Down
21 changes: 17 additions & 4 deletions crates/node/src/state/tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::*;
use crate::test_utils::{
self, assert_multiple_block_mutations, assert_state_progress_is_none,
assert_state_progress_is_some, test_conn_pool,
assert_state_progress_is_some, test_conn_pool, test_mem_conn_pool_modify,
};
use essential_node_db::{create_tables, insert_block, insert_contract};
use essential_types::{contract::Contract, Block};
Expand Down Expand Up @@ -34,7 +34,16 @@ async fn can_derive_state() {
#[cfg(feature = "tracing")]
let _ = tracing_subscriber::fmt::try_init();

let conn_pool = test_conn_pool("can_derive_state");
let (tx, mut rx) = tokio::sync::mpsc::channel(100);

let conn_pool = test_mem_conn_pool_modify(4, |conn| {
let tx = tx.clone();
conn.commit_hook(Some(move || {
tx.try_send(()).unwrap();
false
}));
conn
});
let mut conn = conn_pool.acquire().await.unwrap();

let tx = conn.transaction().unwrap();
Expand All @@ -44,21 +53,25 @@ async fn can_derive_state() {
let test_blocks_count = 4;
let (test_blocks, contracts) = test_utils::test_blocks(test_blocks_count);
insert_contracts_to_db(&mut conn, contracts);
tokio::time::sleep(Duration::from_millis(100)).await;

let blocks = test_blocks;
let hashes = blocks.iter().map(content_addr).collect::<Vec<_>>();

let (state_tx, state_rx) = tokio::sync::watch::channel(());

while rx.try_recv().is_ok() {}

let handle = derive_state_stream(conn_pool.clone(), state_rx, state_tx.clone()).unwrap();

// Initially, the state progress is none
assert_state_progress_is_none(&conn);

// Process block 0
insert_block_and_send_notification(&mut conn, &blocks[0], &state_tx);
tokio::time::sleep(Duration::from_millis(100)).await;
for _ in 0..2 {
rx.recv().await.unwrap();
}

// Assert state progress is block 0
assert_state_progress_is_some(&conn, &blocks[0], &hashes[0]);
// Assert mutations in block 0 are in database
Expand Down
20 changes: 20 additions & 0 deletions crates/node/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use essential_types::{
Block, ConstraintBytecode, ContentAddress, PredicateAddress, StateReadBytecode, Word,
};
use rusqlite::Connection;
use rusqlite_pool::tokio::AsyncConnectionPool;
use std::{process::Stdio, time::Duration};
use tokio::{
io::{AsyncBufReadExt, BufReader},
Expand All @@ -29,6 +30,25 @@ pub fn test_db_conf(id: &str) -> crate::Config {
conf
}

pub fn test_mem_conn_pool_modify(
conn_limit: usize,
modify: impl Fn(Connection) -> Connection,
) -> ConnectionPool {
let uuid = uuid::Uuid::new_v4();
let conn_str = format!("file:/{uuid}");
let conn = AsyncConnectionPool::new(conn_limit, || {
let conn = rusqlite::Connection::open_with_flags_and_vfs(
conn_str.clone(),
Default::default(),
"memdb",
)?;
let conn = modify(conn);
Ok(conn)
})
.unwrap();
ConnectionPool::new_from_inner(conn)
}

pub fn test_blocks(n: u64) -> (Vec<Block>, Vec<Contract>) {
let (blocks, contracts) = (0..n)
.map(|i| test_block(i, Duration::from_secs(i as _)))
Expand Down