diff --git a/Cargo.toml b/Cargo.toml index e98c21ab4..fc3463d02 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,6 +90,7 @@ proptest = "1.0.0" regex = "1.5.6" criterion = { version = "0.7.0", features = ["async_tokio"] } ldk-node-062 = { package = "ldk-node", version = "=0.6.2" } +ldk-node-070 = { package = "ldk-node", version = "=0.7.0" } [target.'cfg(not(no_download))'.dev-dependencies] electrsd = { version = "0.36.1", default-features = false, features = ["legacy", "esplora_a33e97e1", "corepc-node_27_2"] } diff --git a/src/io/sqlite_store/migrations.rs b/src/io/sqlite_store/migrations.rs index ea809be08..9527765c0 100644 --- a/src/io/sqlite_store/migrations.rs +++ b/src/io/sqlite_store/migrations.rs @@ -9,66 +9,128 @@ use lightning::io; use rusqlite::Connection; pub(super) fn migrate_schema( - connection: &mut Connection, kv_table_name: &str, from_version: u16, to_version: u16, + connection: &mut Connection, kv_table_name: &str, mut from_version: u16, to_version: u16, ) -> io::Result<()> { assert!(from_version < to_version); - if from_version == 1 && to_version == 2 { - let tx = connection.transaction().map_err(|e| { - let msg = format!( - "Failed to migrate table {} from user_version {} to {}: {}", - kv_table_name, from_version, to_version, e - ); - io::Error::new(io::ErrorKind::Other, msg) - })?; - - // Rename 'namespace' column to 'primary_namespace' - let sql = format!( - "ALTER TABLE {} - RENAME COLUMN namespace TO primary_namespace;", - kv_table_name - ); - - tx.execute(&sql, []).map_err(|e| { - let msg = format!( - "Failed to migrate table {} from user_version {} to {}: {}", - kv_table_name, from_version, to_version, e - ); - io::Error::new(io::ErrorKind::Other, msg) - })?; - - // Add new 'secondary_namespace' column - let sql = format!( - "ALTER TABLE {} - ADD secondary_namespace TEXT DEFAULT \"\" NOT NULL;", - kv_table_name - ); - - tx.execute(&sql, []).map_err(|e| { - let msg = format!( - "Failed to migrate table {} from user_version {} to {}: {}", - kv_table_name, from_version, to_version, e - ); + if from_version == 1 && to_version >= 2 { + migrate_v1_to_v2(connection, kv_table_name)?; + from_version = 2; + } + if from_version == 2 && to_version >= 3 { + migrate_v2_to_v3(connection, kv_table_name)?; + } + + Ok(()) +} + +fn migrate_v1_to_v2(connection: &mut Connection, kv_table_name: &str) -> io::Result<()> { + let tx = connection.transaction().map_err(|e| { + let msg = format!("Failed to migrate table {} from v1 to v2: {}", kv_table_name, e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + // Rename 'namespace' column to 'primary_namespace' + let sql = format!( + "ALTER TABLE {} + RENAME COLUMN namespace TO primary_namespace;", + kv_table_name + ); + + tx.execute(&sql, []).map_err(|e| { + let msg = format!("Failed to migrate table {} from v1 to v2: {}", kv_table_name, e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + // Add new 'secondary_namespace' column + let sql = format!( + "ALTER TABLE {} + ADD secondary_namespace TEXT DEFAULT \"\" NOT NULL;", + kv_table_name + ); + + tx.execute(&sql, []).map_err(|e| { + let msg = format!("Failed to migrate table {} from v1 to v2: {}", kv_table_name, e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + // Update user_version + tx.pragma(Some(rusqlite::DatabaseName::Main), "user_version", 2u16, |_| Ok(())).map_err( + |e| { + let msg = format!("Failed to upgrade user_version from 1 to 2: {}", e); io::Error::new(io::ErrorKind::Other, msg) - })?; - - // Update user_version - tx.pragma(Some(rusqlite::DatabaseName::Main), "user_version", to_version, |_| Ok(())) - .map_err(|e| { - let msg = format!( - "Failed to upgrade user_version from {} to {}: {}", - from_version, to_version, e - ); - io::Error::new(io::ErrorKind::Other, msg) - })?; + }, + )?; - tx.commit().map_err(|e| { - let msg = format!( - "Failed to migrate table {} from user_version {} to {}: {}", - kv_table_name, from_version, to_version, e - ); + tx.commit().map_err(|e| { + let msg = format!("Failed to migrate table {} from v1 to v2: {}", kv_table_name, e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + Ok(()) +} + +fn migrate_v2_to_v3(connection: &mut Connection, kv_table_name: &str) -> io::Result<()> { + let tx = connection.transaction().map_err(|e| { + let msg = format!("Failed to migrate table {} from v2 to v3: {}", kv_table_name, e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + let old_table = format!("{}_v2_old", kv_table_name); + let map_err = |e: rusqlite::Error| -> io::Error { + let msg = format!("Failed to migrate table {} from v2 to v3: {}", kv_table_name, e); + io::Error::new(io::ErrorKind::Other, msg) + }; + + // Recreate the table to ensure the correct PRIMARY KEY regardless of migration history. + // Tables migrated from v1 have PK (primary_namespace, key) only — missing + // secondary_namespace. Recreating normalizes the schema for all databases. + let rename_sql = format!("ALTER TABLE {} RENAME TO {}", kv_table_name, old_table); + tx.execute(&rename_sql, []).map_err(map_err)?; + + let create_table_sql = format!( + "CREATE TABLE {} ( + primary_namespace TEXT NOT NULL, + secondary_namespace TEXT DEFAULT \"\" NOT NULL, + key TEXT NOT NULL CHECK (key <> ''), + value BLOB, + created_at INTEGER NOT NULL DEFAULT 0, + PRIMARY KEY (primary_namespace, secondary_namespace, key) + )", + kv_table_name + ); + tx.execute(&create_table_sql, []).map_err(map_err)?; + + // Copy data and backfill created_at from ROWID for relative ordering + let set_created_at_sql = format!( + "INSERT INTO {} (primary_namespace, secondary_namespace, key, value, created_at) + SELECT primary_namespace, secondary_namespace, key, value, ROWID FROM {}", + kv_table_name, old_table + ); + tx.execute(&set_created_at_sql, []).map_err(map_err)?; + + let drop_old_sql = format!("DROP TABLE {}", old_table); + tx.execute(&drop_old_sql, []).map_err(map_err)?; + + // Create composite index for paginated listing + let sql = format!( + "CREATE INDEX idx_{}_paginated ON {} (primary_namespace, secondary_namespace, created_at DESC, key ASC)", + kv_table_name, kv_table_name + ); + tx.execute(&sql, []).map_err(map_err)?; + + // Update user_version + tx.pragma(Some(rusqlite::DatabaseName::Main), "user_version", 3u16, |_| Ok(())).map_err( + |e| { + let msg = format!("Failed to upgrade user_version from 2 to 3: {}", e); io::Error::new(io::ErrorKind::Other, msg) - })?; - } + }, + )?; + + tx.commit().map_err(|e| { + let msg = format!("Failed to migrate table {} from v2 to v3: {}", kv_table_name, e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + Ok(()) } @@ -76,7 +138,7 @@ pub(super) fn migrate_schema( mod tests { use std::fs; - use lightning::util::persist::KVStoreSync; + use lightning::util::persist::{KVStoreSync, PaginatedKVStoreSync}; use rusqlite::{named_params, Connection}; use crate::io::sqlite_store::SqliteStore; @@ -128,7 +190,7 @@ mod tests { let sql = format!( "INSERT OR REPLACE INTO {} (namespace, key, value) VALUES (:namespace, :key, :value);", kv_table_name - ); + ); let mut stmt = connection.prepare_cached(&sql).unwrap(); stmt.execute(named_params! { @@ -166,4 +228,91 @@ mod tests { // Check we can continue to use the store just fine. do_read_write_remove_list_persist(&store); } + + #[test] + fn rwrl_post_schema_2_migration() { + let old_schema_version = 2u16; + + let mut temp_path = random_storage_path(); + temp_path.push("rwrl_post_schema_2_migration"); + + let db_file_name = "test_db".to_string(); + let kv_table_name = "test_table".to_string(); + + let test_ns = "testspace"; + let test_sub = "testsub"; + + { + // Create a v2 database manually + fs::create_dir_all(temp_path.clone()).unwrap(); + let mut db_file_path = temp_path.clone(); + db_file_path.push(db_file_name.clone()); + + let connection = Connection::open(db_file_path.clone()).unwrap(); + + connection + .pragma( + Some(rusqlite::DatabaseName::Main), + "user_version", + old_schema_version, + |_| Ok(()), + ) + .unwrap(); + + let sql = format!( + "CREATE TABLE IF NOT EXISTS {} ( + primary_namespace TEXT NOT NULL, + secondary_namespace TEXT DEFAULT \"\" NOT NULL, + key TEXT NOT NULL CHECK (key <> ''), + value BLOB, PRIMARY KEY ( primary_namespace, secondary_namespace, key ) + );", + kv_table_name + ); + connection.execute(&sql, []).unwrap(); + + // Insert 3 rows in a known order + for i in 0..3 { + let key = format!("key_{}", i); + let sql = format!( + "INSERT INTO {} (primary_namespace, secondary_namespace, key, value) VALUES (:ns, :sub, :key, :value);", + kv_table_name + ); + let mut stmt = connection.prepare_cached(&sql).unwrap(); + stmt.execute(named_params! { + ":ns": test_ns, + ":sub": test_sub, + ":key": key, + ":value": vec![i as u8; 8], + }) + .unwrap(); + } + } + + // Open with new code, triggering v2→v3 migration + let store = SqliteStore::new(temp_path, Some(db_file_name), Some(kv_table_name)).unwrap(); + + // Verify data survived + for i in 0..3 { + let key = format!("key_{}", i); + let data = store.read(test_ns, test_sub, &key).unwrap(); + assert_eq!(data, vec![i as u8; 8]); + } + + // Verify paginated listing works and returns entries in ROWID-backfilled order (newest first) + let response = + PaginatedKVStoreSync::list_paginated(&store, test_ns, test_sub, None).unwrap(); + assert_eq!(response.keys.len(), 3); + // ROWIDs were 1, 2, 3 so created_at was backfilled as 1, 2, 3 + // Newest first: key_2, key_1, key_0 + assert_eq!(response.keys, vec!["key_2", "key_1", "key_0"]); + + // Verify we can write new entries and they get proper ordering + KVStoreSync::write(&store, test_ns, test_sub, "key_new", vec![99u8; 8]).unwrap(); + let response = + PaginatedKVStoreSync::list_paginated(&store, test_ns, test_sub, None).unwrap(); + assert_eq!(response.keys[0], "key_new"); + + // Check we can continue to use the store just fine. + do_read_write_remove_list_persist(&store); + } } diff --git a/src/io/sqlite_store/mod.rs b/src/io/sqlite_store/mod.rs index e4091b24e..0f57fd263 100644 --- a/src/io/sqlite_store/mod.rs +++ b/src/io/sqlite_store/mod.rs @@ -14,7 +14,9 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use lightning::io; -use lightning::util::persist::{KVStore, KVStoreSync}; +use lightning::util::persist::{ + KVStore, KVStoreSync, PageToken, PaginatedKVStore, PaginatedKVStoreSync, PaginatedListResponse, +}; use lightning_types::string::PrintableString; use rusqlite::{named_params, Connection}; @@ -34,7 +36,10 @@ pub const DEFAULT_SQLITE_DB_FILE_NAME: &str = "ldk_data.sqlite"; pub const DEFAULT_KV_TABLE_NAME: &str = "ldk_data"; // The current SQLite `user_version`, which we can use if we'd ever need to do a schema migration. -const SCHEMA_USER_VERSION: u16 = 2; +const SCHEMA_USER_VERSION: u16 = 3; + +// The number of entries returned per page in paginated list operations. +const PAGE_SIZE: usize = 50; /// A [`KVStoreSync`] implementation that writes to and reads from an [SQLite] database. /// @@ -58,7 +63,10 @@ impl SqliteStore { data_dir: PathBuf, db_file_name: Option, kv_table_name: Option, ) -> io::Result { let inner = Arc::new(SqliteStoreInner::new(data_dir, db_file_name, kv_table_name)?); - let next_write_version = AtomicU64::new(1); + + // Initialize next_write_version from the DB to ensure monotonicity across restarts. + let initial_version = inner.get_max_created_at()? + 1; + let next_write_version = AtomicU64::new(initial_version); Ok(Self { inner, next_write_version }) } @@ -222,6 +230,33 @@ impl KVStoreSync for SqliteStore { } } +impl PaginatedKVStoreSync for SqliteStore { + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> io::Result { + self.inner.list_paginated_internal(primary_namespace, secondary_namespace, page_token) + } +} + +impl PaginatedKVStore for SqliteStore { + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> impl Future> + 'static + Send { + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let inner = Arc::clone(&self.inner); + let fut = tokio::task::spawn_blocking(move || { + inner.list_paginated_internal(&primary_namespace, &secondary_namespace, page_token) + }); + async move { + fut.await.unwrap_or_else(|e| { + let msg = format!("Failed to IO operation due join error: {}", e); + Err(io::Error::new(io::ErrorKind::Other, msg)) + }) + } + } +} + struct SqliteStoreInner { connection: Arc>, data_dir: PathBuf, @@ -289,7 +324,9 @@ impl SqliteStoreInner { primary_namespace TEXT NOT NULL, secondary_namespace TEXT DEFAULT \"\" NOT NULL, key TEXT NOT NULL CHECK (key <> ''), - value BLOB, PRIMARY KEY ( primary_namespace, secondary_namespace, key ) + value BLOB, + created_at INTEGER NOT NULL DEFAULT 0, + PRIMARY KEY ( primary_namespace, secondary_namespace, key ) );", kv_table_name ); @@ -299,11 +336,32 @@ impl SqliteStoreInner { io::Error::new(io::ErrorKind::Other, msg) })?; + // Create composite index for paginated listing (IF NOT EXISTS for idempotency) + let sql = format!( + "CREATE INDEX IF NOT EXISTS idx_{}_paginated ON {} (primary_namespace, secondary_namespace, created_at DESC, key ASC)", + kv_table_name, kv_table_name + ); + + connection.execute(&sql, []).map_err(|e| { + let msg = format!("Failed to create index on table {}: {}", kv_table_name, e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + let connection = Arc::new(Mutex::new(connection)); let write_version_locks = Mutex::new(HashMap::new()); Ok(Self { connection, data_dir, kv_table_name, write_version_locks }) } + fn get_max_created_at(&self) -> io::Result { + let locked_conn = self.connection.lock().unwrap(); + let sql = format!("SELECT COALESCE(MAX(created_at), 0) FROM {}", self.kv_table_name); + let max_val: i64 = locked_conn.query_row(&sql, [], |row| row.get(0)).map_err(|e| { + let msg = format!("Failed to query max created_at: {}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + Ok(max_val as u64) + } + fn get_inner_lock_ref(&self, locking_key: String) -> Arc> { let mut outer_lock = self.write_version_locks.lock().unwrap(); Arc::clone(&outer_lock.entry(locking_key).or_default()) @@ -367,7 +425,9 @@ impl SqliteStoreInner { let locked_conn = self.connection.lock().unwrap(); let sql = format!( - "INSERT OR REPLACE INTO {} (primary_namespace, secondary_namespace, key, value) VALUES (:primary_namespace, :secondary_namespace, :key, :value);", + "INSERT INTO {} (primary_namespace, secondary_namespace, key, value, created_at) \ + VALUES (:primary_namespace, :secondary_namespace, :key, :value, :created_at) \ + ON CONFLICT(primary_namespace, secondary_namespace, key) DO UPDATE SET value = excluded.value;", self.kv_table_name ); @@ -381,6 +441,7 @@ impl SqliteStoreInner { ":secondary_namespace": secondary_namespace, ":key": key, ":value": buf, + ":created_at": version as i64, }) .map(|_| ()) .map_err(|e| { @@ -472,6 +533,125 @@ impl SqliteStoreInner { Ok(keys) } + fn list_paginated_internal( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> io::Result { + check_namespace_key_validity( + primary_namespace, + secondary_namespace, + None, + "list_paginated", + )?; + + let locked_conn = self.connection.lock().unwrap(); + + let (keys, last_created_at) = match page_token { + Some(ref token) => { + let token_str = token.as_str(); + let (created_at_str, key) = token_str.split_once(':').ok_or_else(|| { + let msg = format!("Invalid page token: {}", token_str); + io::Error::new(io::ErrorKind::InvalidInput, msg) + })?; + let created_at: i64 = created_at_str.parse().map_err(|_| { + let msg = format!("Invalid page token created_at: {}", created_at_str); + io::Error::new(io::ErrorKind::InvalidInput, msg) + })?; + let sql = format!( + "SELECT key, created_at FROM {} \ + WHERE primary_namespace=:primary_namespace \ + AND secondary_namespace=:secondary_namespace \ + AND (created_at < :token_created_at \ + OR (created_at = :token_created_at AND key > :token_key)) \ + ORDER BY created_at DESC, key ASC \ + LIMIT :limit", + self.kv_table_name + ); + let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| { + let msg = format!("Failed to prepare statement: {}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + let rows = stmt + .query_map( + named_params! { + ":primary_namespace": primary_namespace, + ":secondary_namespace": secondary_namespace, + ":token_created_at": created_at, + ":token_key": key, + ":limit": PAGE_SIZE as i64, + }, + |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)), + ) + .map_err(|e| { + let msg = format!("Failed to retrieve queried rows: {}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + let mut keys = Vec::new(); + let mut last_ca = 0i64; + for row in rows { + let (k, ca) = row.map_err(|e| { + let msg = format!("Failed to retrieve queried rows: {}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + last_ca = ca; + keys.push(k); + } + (keys, last_ca) + }, + None => { + let sql = format!( + "SELECT key, created_at FROM {} \ + WHERE primary_namespace=:primary_namespace \ + AND secondary_namespace=:secondary_namespace \ + ORDER BY created_at DESC, key ASC \ + LIMIT :limit", + self.kv_table_name + ); + let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| { + let msg = format!("Failed to prepare statement: {}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + let rows = stmt + .query_map( + named_params! { + ":primary_namespace": primary_namespace, + ":secondary_namespace": secondary_namespace, + ":limit": PAGE_SIZE as i64, + }, + |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)), + ) + .map_err(|e| { + let msg = format!("Failed to retrieve queried rows: {}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + let mut keys = Vec::new(); + let mut last_ca = 0i64; + for row in rows { + let (k, ca) = row.map_err(|e| { + let msg = format!("Failed to retrieve queried rows: {}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + last_ca = ca; + keys.push(k); + } + (keys, last_ca) + }, + }; + + let next_page_token = if keys.len() == PAGE_SIZE { + // There may be more results; provide a token for the next page. + let last_entry_key = keys.last().unwrap(); + Some(PageToken::new(format!("{:020}:{}", last_created_at, last_entry_key))) + } else { + None + }; + + Ok(PaginatedListResponse { keys, next_page_token }) + } + fn execute_locked_write Result<(), lightning::io::Error>>( &self, inner_lock_ref: Arc>, locking_key: String, version: u64, callback: F, ) -> Result<(), lightning::io::Error> { @@ -560,6 +740,367 @@ mod tests { .unwrap(); do_test_store(&store_0, &store_1) } + + #[test] + fn test_sqlite_store_paginated_listing() { + let mut temp_path = random_storage_path(); + temp_path.push("test_sqlite_store_paginated_listing"); + let store = SqliteStore::new( + temp_path, + Some("test_db".to_string()), + Some("test_table".to_string()), + ) + .unwrap(); + + let primary_namespace = "test_ns"; + let secondary_namespace = "test_sub"; + let num_entries = 225; + + // Write entries in order + for i in 0..num_entries { + let key = format!("key_{:04}", i); + let data = vec![i as u8; 32]; + KVStoreSync::write(&store, primary_namespace, secondary_namespace, &key, data).unwrap(); + } + + // Paginate through all entries and collect them + let mut all_keys = Vec::new(); + let mut page_token = None; + let mut page_count = 0; + + loop { + let response = PaginatedKVStoreSync::list_paginated( + &store, + primary_namespace, + secondary_namespace, + page_token, + ) + .unwrap(); + + all_keys.extend(response.keys.clone()); + page_count += 1; + + match response.next_page_token { + Some(token) => page_token = Some(token), + None => break, + } + } + + // Verify we got exactly the right number of entries + assert_eq!(all_keys.len(), num_entries); + + // Verify correct number of pages (225 entries at 50 per page = 5 pages) + assert_eq!(page_count, 5); + + // Verify no duplicates + let mut unique_keys = all_keys.clone(); + unique_keys.sort(); + unique_keys.dedup(); + assert_eq!(unique_keys.len(), num_entries); + + // Verify ordering: newest first (highest created_at first). + // Since we wrote key_0000 first and key_0249 last, key_0249 should appear first + // in the paginated results. + assert_eq!(all_keys[0], format!("key_{:04}", num_entries - 1)); + assert_eq!(all_keys[num_entries - 1], "key_0000"); + } + + #[test] + fn test_sqlite_store_paginated_update_preserves_order() { + let mut temp_path = random_storage_path(); + temp_path.push("test_sqlite_store_paginated_update"); + let store = SqliteStore::new( + temp_path, + Some("test_db".to_string()), + Some("test_table".to_string()), + ) + .unwrap(); + + let primary_namespace = "test_ns"; + let secondary_namespace = "test_sub"; + + // Write 3 entries + KVStoreSync::write(&store, primary_namespace, secondary_namespace, "first", vec![1u8; 8]) + .unwrap(); + KVStoreSync::write(&store, primary_namespace, secondary_namespace, "second", vec![2u8; 8]) + .unwrap(); + KVStoreSync::write(&store, primary_namespace, secondary_namespace, "third", vec![3u8; 8]) + .unwrap(); + + // Update the first entry + KVStoreSync::write(&store, primary_namespace, secondary_namespace, "first", vec![99u8; 8]) + .unwrap(); + + // Paginated listing should still show "first" with its original creation order + let response = PaginatedKVStoreSync::list_paginated( + &store, + primary_namespace, + secondary_namespace, + None, + ) + .unwrap(); + + // Newest first: third, second, first + assert_eq!(response.keys, vec!["third", "second", "first"]); + + // Verify the updated value was persisted + let data = + KVStoreSync::read(&store, primary_namespace, secondary_namespace, "first").unwrap(); + assert_eq!(data, vec![99u8; 8]); + } + + #[test] + fn test_sqlite_store_paginated_empty_namespace() { + let mut temp_path = random_storage_path(); + temp_path.push("test_sqlite_store_paginated_empty"); + let store = SqliteStore::new( + temp_path, + Some("test_db".to_string()), + Some("test_table".to_string()), + ) + .unwrap(); + + // Paginating an empty or unknown namespace returns an empty result with no token. + let response = + PaginatedKVStoreSync::list_paginated(&store, "nonexistent", "ns", None).unwrap(); + assert!(response.keys.is_empty()); + assert!(response.next_page_token.is_none()); + } + + #[test] + fn test_sqlite_store_paginated_namespace_isolation() { + let mut temp_path = random_storage_path(); + temp_path.push("test_sqlite_store_paginated_isolation"); + let store = SqliteStore::new( + temp_path, + Some("test_db".to_string()), + Some("test_table".to_string()), + ) + .unwrap(); + + // Write entries across different namespaces. + KVStoreSync::write(&store, "ns_a", "sub", "key_1", vec![1u8; 8]).unwrap(); + KVStoreSync::write(&store, "ns_a", "sub", "key_2", vec![2u8; 8]).unwrap(); + KVStoreSync::write(&store, "ns_b", "sub", "key_3", vec![3u8; 8]).unwrap(); + KVStoreSync::write(&store, "ns_a", "other", "key_4", vec![4u8; 8]).unwrap(); + + // ns_a/sub should only contain key_1 and key_2 (newest first). + let response = PaginatedKVStoreSync::list_paginated(&store, "ns_a", "sub", None).unwrap(); + assert_eq!(response.keys, vec!["key_2", "key_1"]); + assert!(response.next_page_token.is_none()); + + // ns_b/sub should only contain key_3. + let response = PaginatedKVStoreSync::list_paginated(&store, "ns_b", "sub", None).unwrap(); + assert_eq!(response.keys, vec!["key_3"]); + + // ns_a/other should only contain key_4. + let response = PaginatedKVStoreSync::list_paginated(&store, "ns_a", "other", None).unwrap(); + assert_eq!(response.keys, vec!["key_4"]); + } + + #[test] + fn test_sqlite_store_paginated_removal() { + let mut temp_path = random_storage_path(); + temp_path.push("test_sqlite_store_paginated_removal"); + let store = SqliteStore::new( + temp_path, + Some("test_db".to_string()), + Some("test_table".to_string()), + ) + .unwrap(); + + let ns = "test_ns"; + let sub = "test_sub"; + + // Write entries and then remove some. + KVStoreSync::write(&store, ns, sub, "a", vec![1u8; 8]).unwrap(); + KVStoreSync::write(&store, ns, sub, "b", vec![2u8; 8]).unwrap(); + KVStoreSync::write(&store, ns, sub, "c", vec![3u8; 8]).unwrap(); + + KVStoreSync::remove(&store, ns, sub, "b", false).unwrap(); + + let response = PaginatedKVStoreSync::list_paginated(&store, ns, sub, None).unwrap(); + assert_eq!(response.keys, vec!["c", "a"]); + assert!(response.next_page_token.is_none()); + } + + #[test] + fn test_sqlite_store_paginated_exact_page_boundary() { + let mut temp_path = random_storage_path(); + temp_path.push("test_sqlite_store_paginated_boundary"); + let store = SqliteStore::new( + temp_path, + Some("test_db".to_string()), + Some("test_table".to_string()), + ) + .unwrap(); + + let ns = "test_ns"; + let sub = "test_sub"; + + // Write exactly PAGE_SIZE entries (50). + for i in 0..PAGE_SIZE { + let key = format!("key_{:04}", i); + KVStoreSync::write(&store, ns, sub, &key, vec![i as u8; 8]).unwrap(); + } + + // First page should have all 50 entries but still set a token + // (since result count == PAGE_SIZE, there *might* be more). + let response = PaginatedKVStoreSync::list_paginated(&store, ns, sub, None).unwrap(); + assert_eq!(response.keys.len(), PAGE_SIZE); + assert!(response.next_page_token.is_some()); + + // Second page should be empty with no token. + let response = + PaginatedKVStoreSync::list_paginated(&store, ns, sub, response.next_page_token) + .unwrap(); + assert!(response.keys.is_empty()); + assert!(response.next_page_token.is_none()); + } + + #[test] + fn test_sqlite_store_paginated_fewer_than_page_size() { + let mut temp_path = random_storage_path(); + temp_path.push("test_sqlite_store_paginated_few"); + let store = SqliteStore::new( + temp_path, + Some("test_db".to_string()), + Some("test_table".to_string()), + ) + .unwrap(); + + let ns = "test_ns"; + let sub = "test_sub"; + + // Write fewer entries than PAGE_SIZE. + for i in 0..5 { + let key = format!("key_{}", i); + KVStoreSync::write(&store, ns, sub, &key, vec![i as u8; 8]).unwrap(); + } + + let response = PaginatedKVStoreSync::list_paginated(&store, ns, sub, None).unwrap(); + assert_eq!(response.keys.len(), 5); + // Fewer than PAGE_SIZE means no next page. + assert!(response.next_page_token.is_none()); + // Newest first. + assert_eq!(response.keys, vec!["key_4", "key_3", "key_2", "key_1", "key_0"]); + } + + #[test] + fn test_sqlite_store_paginated_same_created_at_tiebreak() { + // Entries backfilled during migration all get created_at from ROWID, which + // gives them unique values. But we can simulate a tie-break scenario by + // directly checking that the ordering within a page is deterministic + // (created_at DESC, key ASC). + let mut temp_path = random_storage_path(); + temp_path.push("test_sqlite_store_paginated_tiebreak"); + let store = SqliteStore::new( + temp_path, + Some("test_db".to_string()), + Some("test_table".to_string()), + ) + .unwrap(); + + let ns = "test_ns"; + let sub = "test_sub"; + + // Manually insert rows with the same created_at to test tie-breaking. + { + let locked_conn = store.inner.connection.lock().unwrap(); + for key in &["cherry", "apple", "banana"] { + let sql = format!( + "INSERT INTO {} (primary_namespace, secondary_namespace, key, value, created_at) \ + VALUES (:ns, :sub, :key, :value, :created_at)", + store.inner.kv_table_name + ); + locked_conn + .execute( + &sql, + named_params! { + ":ns": ns, + ":sub": sub, + ":key": key, + ":value": vec![0u8; 4], + ":created_at": 42i64, + }, + ) + .unwrap(); + } + } + + let response = PaginatedKVStoreSync::list_paginated(&store, ns, sub, None).unwrap(); + // Same created_at, so tie-break by key ASC: apple, banana, cherry. + assert_eq!(response.keys, vec!["apple", "banana", "cherry"]); + } + + #[test] + fn test_sqlite_store_write_version_persists_across_restart() { + let mut temp_path = random_storage_path(); + temp_path.push("test_sqlite_store_write_version_restart"); + + let primary_namespace = "test_ns"; + let secondary_namespace = "test_sub"; + + // Write some entries with the first store instance + { + let store = SqliteStore::new( + temp_path.clone(), + Some("test_db".to_string()), + Some("test_table".to_string()), + ) + .unwrap(); + + KVStoreSync::write( + &store, + primary_namespace, + secondary_namespace, + "key_a", + vec![1u8; 8], + ) + .unwrap(); + KVStoreSync::write( + &store, + primary_namespace, + secondary_namespace, + "key_b", + vec![2u8; 8], + ) + .unwrap(); + + // Don't drop/cleanup since we want to reopen + std::mem::forget(store); + } + + // Open a new store instance on the same database and write more + { + let store = SqliteStore::new( + temp_path, + Some("test_db".to_string()), + Some("test_table".to_string()), + ) + .unwrap(); + + KVStoreSync::write( + &store, + primary_namespace, + secondary_namespace, + "key_c", + vec![3u8; 8], + ) + .unwrap(); + + // Paginated listing should show newest first: key_c, key_b, key_a + let response = PaginatedKVStoreSync::list_paginated( + &store, + primary_namespace, + secondary_namespace, + None, + ) + .unwrap(); + + assert_eq!(response.keys, vec!["key_c", "key_b", "key_a"]); + } + } } #[cfg(ldk_bench)] diff --git a/src/io/test_utils.rs b/src/io/test_utils.rs index 88078b316..4172ce035 100644 --- a/src/io/test_utils.rs +++ b/src/io/test_utils.rs @@ -9,6 +9,7 @@ use std::collections::{hash_map, HashMap}; use std::future::Future; use std::panic::RefUnwindSafe; use std::path::PathBuf; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Mutex; use lightning::events::ClosureReason; @@ -20,7 +21,8 @@ use lightning::ln::functional_test_utils::{ TestChanMonCfg, }; use lightning::util::persist::{ - KVStore, KVStoreSync, MonitorUpdatingPersister, KVSTORE_NAMESPACE_KEY_MAX_LEN, + KVStore, KVStoreSync, MonitorUpdatingPersister, PageToken, PaginatedKVStore, + PaginatedKVStoreSync, PaginatedListResponse, KVSTORE_NAMESPACE_KEY_MAX_LEN, }; use lightning::util::test_utils; use rand::distr::Alphanumeric; @@ -37,14 +39,20 @@ type TestMonitorUpdatePersister<'a, K> = MonitorUpdatingPersister< const EXPECTED_UPDATES_PER_PAYMENT: u64 = 5; +const IN_MEMORY_PAGE_SIZE: usize = 50; + pub struct InMemoryStore { persisted_bytes: Mutex>>>, + creation_counter: AtomicU64, + creation_times: Mutex>>, } impl InMemoryStore { pub fn new() -> Self { let persisted_bytes = Mutex::new(HashMap::new()); - Self { persisted_bytes } + let creation_counter = AtomicU64::new(1); + let creation_times = Mutex::new(HashMap::new()); + Self { persisted_bytes, creation_counter, creation_times } } fn read_internal( @@ -71,8 +79,16 @@ impl InMemoryStore { let mut persisted_lock = self.persisted_bytes.lock().unwrap(); let prefixed = format!("{primary_namespace}/{secondary_namespace}"); - let outer_e = persisted_lock.entry(prefixed).or_insert(HashMap::new()); + let outer_e = persisted_lock.entry(prefixed.clone()).or_insert(HashMap::new()); outer_e.insert(key.to_string(), buf); + + // Only assign creation time on first write (not on update) + let mut ct_lock = self.creation_times.lock().unwrap(); + let ct_ns = ct_lock.entry(prefixed).or_insert(HashMap::new()); + ct_ns + .entry(key.to_string()) + .or_insert_with(|| self.creation_counter.fetch_add(1, Ordering::Relaxed)); + Ok(()) } @@ -86,6 +102,12 @@ impl InMemoryStore { outer_ref.remove(&key.to_string()); } + // Remove creation time entry + let mut ct_lock = self.creation_times.lock().unwrap(); + if let Some(ct_ns) = ct_lock.get_mut(&prefixed) { + ct_ns.remove(key); + } + Ok(()) } @@ -153,6 +175,78 @@ impl KVStoreSync for InMemoryStore { } } +impl InMemoryStore { + fn list_paginated_internal( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> io::Result { + let ct_lock = self.creation_times.lock().unwrap(); + let prefixed = format!("{primary_namespace}/{secondary_namespace}"); + + let ct_ns = match ct_lock.get(&prefixed) { + Some(m) => m, + None => { + return Ok(PaginatedListResponse { keys: Vec::new(), next_page_token: None }); + }, + }; + + // Build list of (key, created_at) and sort by created_at DESC, key ASC + let mut entries: Vec<(&String, &u64)> = ct_ns.iter().collect(); + entries.sort_by(|a, b| b.1.cmp(a.1).then_with(|| a.0.cmp(b.0))); + + // Apply page token filter + let start_idx = if let Some(ref token) = page_token { + let token_str = token.as_str(); + let (created_at_str, key) = token_str + .split_once(':') + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Invalid page token"))?; + let created_at: u64 = created_at_str.parse().map_err(|_| { + io::Error::new(io::ErrorKind::InvalidInput, "Invalid page token created_at") + })?; + + // Find first entry after the token position + entries + .iter() + .position(|(k, ca)| **ca < created_at || (**ca == created_at && k.as_str() > key)) + .unwrap_or(entries.len()) + } else { + 0 + }; + + let page: Vec = entries[start_idx..] + .iter() + .take(IN_MEMORY_PAGE_SIZE) + .map(|(k, _)| (*k).clone()) + .collect(); + + let next_page_token = if page.len() == IN_MEMORY_PAGE_SIZE { + let last_key = page.last().unwrap(); + let last_ca = ct_ns.get(last_key).unwrap(); + Some(PageToken::new(format!("{:020}:{}", last_ca, last_key))) + } else { + None + }; + + Ok(PaginatedListResponse { keys: page, next_page_token }) + } +} + +impl PaginatedKVStoreSync for InMemoryStore { + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> io::Result { + self.list_paginated_internal(primary_namespace, secondary_namespace, page_token) + } +} + +impl PaginatedKVStore for InMemoryStore { + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> impl Future> + 'static + Send { + let res = self.list_paginated_internal(primary_namespace, secondary_namespace, page_token); + async move { res } + } +} + unsafe impl Sync for InMemoryStore {} unsafe impl Send for InMemoryStore {} diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 61c9c8281..987c1d944 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -26,6 +26,8 @@ use common::{ setup_builder, setup_node, setup_two_nodes, wait_for_tx, TestChainSource, TestStoreType, TestSyncStore, }; +use electrsd::corepc_node::Node as BitcoinD; +use electrsd::ElectrsD; use ldk_node::config::{AsyncPaymentsRole, EsploraSyncConfig}; use ldk_node::entropy::NodeEntropy; use ldk_node::liquidity::LSPS2ServiceConfig; @@ -2406,41 +2408,101 @@ async fn payment_persistence_after_restart() { restarted_node_a.stop().unwrap(); } -#[tokio::test(flavor = "multi_thread", worker_threads = 1)] -async fn persistence_backwards_compatibility() { +enum OldLdkVersion { + V0_6_2, + V0_7_0, +} + +async fn build_0_6_2_node( + bitcoind: &BitcoinD, electrsd: &ElectrsD, storage_path: String, esplora_url: String, + seed_bytes: [u8; 64], +) -> (u64, bitcoin::secp256k1::PublicKey) { + let mut builder_old = ldk_node_062::Builder::new(); + builder_old.set_network(bitcoin::Network::Regtest); + builder_old.set_storage_dir_path(storage_path); + builder_old.set_entropy_seed_bytes(seed_bytes); + builder_old.set_chain_source_esplora(esplora_url, None); + let node_old = builder_old.build().unwrap(); + + node_old.start().unwrap(); + let addr_old = node_old.onchain_payment().new_address().unwrap(); + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_old], + Amount::from_sat(100_000), + ) + .await; + node_old.sync_wallets().unwrap(); + + let balance = node_old.list_balances().spendable_onchain_balance_sats; + assert!(balance > 0); + let node_id = node_old.node_id(); + + node_old.stop().unwrap(); + + (balance, node_id) +} + +async fn build_0_7_0_node( + bitcoind: &BitcoinD, electrsd: &ElectrsD, storage_path: String, esplora_url: String, + seed_bytes: [u8; 64], +) -> (u64, bitcoin::secp256k1::PublicKey) { + let mut builder_old = ldk_node_070::Builder::new(); + builder_old.set_network(bitcoin::Network::Regtest); + builder_old.set_storage_dir_path(storage_path); + builder_old.set_entropy_seed_bytes(seed_bytes); + builder_old.set_chain_source_esplora(esplora_url, None); + let node_old = builder_old.build().unwrap(); + + node_old.start().unwrap(); + let addr_old = node_old.onchain_payment().new_address().unwrap(); + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_old], + Amount::from_sat(100_000), + ) + .await; + node_old.sync_wallets().unwrap(); + + let balance = node_old.list_balances().spendable_onchain_balance_sats; + assert!(balance > 0); + let node_id = node_old.node_id(); + + node_old.stop().unwrap(); + + (balance, node_id) +} + +async fn do_persistence_backwards_compatibility(version: OldLdkVersion) { let (bitcoind, electrsd) = common::setup_bitcoind_and_electrsd(); let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); let storage_path = common::random_storage_path().to_str().unwrap().to_owned(); let seed_bytes = [42u8; 64]; - // Setup a v0.6.2 `Node` - let (old_balance, old_node_id) = { - let mut builder_old = ldk_node_062::Builder::new(); - builder_old.set_network(bitcoin::Network::Regtest); - builder_old.set_storage_dir_path(storage_path.clone()); - builder_old.set_entropy_seed_bytes(seed_bytes); - builder_old.set_chain_source_esplora(esplora_url.clone(), None); - let node_old = builder_old.build().unwrap(); - - node_old.start().unwrap(); - let addr_old = node_old.onchain_payment().new_address().unwrap(); - common::premine_and_distribute_funds( - &bitcoind.client, - &electrsd.client, - vec![addr_old], - bitcoin::Amount::from_sat(100_000), - ) - .await; - node_old.sync_wallets().unwrap(); - - let balance = node_old.list_balances().spendable_onchain_balance_sats; - assert!(balance > 0); - let node_id = node_old.node_id(); - - node_old.stop().unwrap(); - - (balance, node_id) + let (old_balance, old_node_id) = match version { + OldLdkVersion::V0_6_2 => { + build_0_6_2_node( + &bitcoind, + &electrsd, + storage_path.clone(), + esplora_url.clone(), + seed_bytes, + ) + .await + }, + OldLdkVersion::V0_7_0 => { + build_0_7_0_node( + &bitcoind, + &electrsd, + storage_path.clone(), + esplora_url.clone(), + seed_bytes, + ) + .await + }, }; // Now ensure we can still reinit from the same backend. @@ -2470,6 +2532,12 @@ async fn persistence_backwards_compatibility() { node_new.stop().unwrap(); } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn persistence_backwards_compatibility() { + do_persistence_backwards_compatibility(OldLdkVersion::V0_6_2).await; + do_persistence_backwards_compatibility(OldLdkVersion::V0_7_0).await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn onchain_fee_bump_rbf() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();