From b0b46fed7c9bbabaee749fe62695c92cf6566fed Mon Sep 17 00:00:00 2001 From: Enigbe Date: Wed, 18 Feb 2026 22:18:01 +0100 Subject: [PATCH 01/10] Implement tiered storage Introduces TierStore, a KVStore implementation that manages data across three storage layers: - Primary: Main data store for critical node data - Ephemeral: Secondary store for non-critical, easily-rebuildable data (e.g., network graph) with fast local access - Backup: Tertiary store for disaster recovery with async/lazy operations to avoid blocking primary store - Unit tests for TierStore core functionality --- src/io/mod.rs | 1 + src/io/test_utils.rs | 170 +++++- src/io/tier_store.rs | 980 ++++++++++++++++++++++++++++++++ tests/integration_tests_rust.rs | 16 +- 4 files changed, 1158 insertions(+), 9 deletions(-) create mode 100644 src/io/tier_store.rs diff --git a/src/io/mod.rs b/src/io/mod.rs index e080d39f7..bf6366c45 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -10,6 +10,7 @@ pub mod sqlite_store; #[cfg(test)] pub(crate) mod test_utils; +pub(crate) mod tier_store; pub(crate) mod utils; pub mod vss_store; diff --git a/src/io/test_utils.rs b/src/io/test_utils.rs index 9add2d6c1..f2b226a5f 100644 --- a/src/io/test_utils.rs +++ b/src/io/test_utils.rs @@ -9,7 +9,8 @@ use std::collections::{hash_map, HashMap}; use std::future::Future; use std::panic::RefUnwindSafe; use std::path::PathBuf; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; +use std::time::Duration; use lightning::events::ClosureReason; use lightning::ln::functional_test_utils::{ @@ -25,6 +26,8 @@ use lightning::{check_closed_broadcast, io}; use rand::distr::Alphanumeric; use rand::{rng, Rng}; +use crate::runtime::Runtime; + type TestMonitorUpdatePersister<'a, K> = MonitorUpdatingPersister< &'a K, &'a test_utils::TestLogger, @@ -352,3 +355,168 @@ pub(crate) fn do_test_store(store_0: &K, store_1: &K) { // Make sure everything is persisted as expected after close. check_persisted_data!(persister_0_max_pending_updates * 2 * EXPECTED_UPDATES_PER_PAYMENT + 1); } + +struct DelayedStoreInner { + storage: Mutex>>, + delay: Duration, +} + +impl DelayedStoreInner { + fn new(delay: Duration) -> Self { + Self { storage: Mutex::new(HashMap::new()), delay } + } + + fn make_key(pn: &str, sn: &str, key: &str) -> String { + format!("{}/{}/{}", pn, sn, key) + } + + async fn read_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> Result, io::Error> { + tokio::time::sleep(self.delay).await; + + let full_key = Self::make_key(&primary_namespace, &secondary_namespace, &key); + let storage = self.storage.lock().unwrap(); + storage + .get(&full_key) + .cloned() + .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "key not found")) + } + + async fn write_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> Result<(), io::Error> { + tokio::time::sleep(self.delay).await; + + let full_key = Self::make_key(&primary_namespace, &secondary_namespace, &key); + let mut storage = self.storage.lock().unwrap(); + storage.insert(full_key, buf); + Ok(()) + } + + async fn remove_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> Result<(), io::Error> { + tokio::time::sleep(self.delay).await; + + let full_key = Self::make_key(&primary_namespace, &secondary_namespace, &key); + let mut storage = self.storage.lock().unwrap(); + storage.remove(&full_key); + Ok(()) + } + + async fn list_internal( + &self, primary_namespace: String, secondary_namespace: String, + ) -> Result, io::Error> { + tokio::time::sleep(self.delay).await; + + let prefix = format!("{}/{}/", primary_namespace, secondary_namespace); + let storage = self.storage.lock().unwrap(); + Ok(storage + .keys() + .filter(|k| k.starts_with(&prefix)) + .map(|k| k.strip_prefix(&prefix).unwrap().to_string()) + .collect()) + } +} + +pub struct DelayedStore { + inner: Arc, + runtime: Arc, +} + +impl DelayedStore { + pub fn new(delay_ms: u64, runtime: Arc) -> Self { + Self { inner: Arc::new(DelayedStoreInner::new(Duration::from_millis(delay_ms))), runtime } + } +} + +impl KVStore for DelayedStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, io::Error>> + 'static + Send { + let inner = Arc::clone(&self.inner); + let pn = primary_namespace.to_string(); + let sn = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.read_internal(pn, sn, key).await } + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + 'static + Send { + let inner = Arc::clone(&self.inner); + let pn = primary_namespace.to_string(); + let sn = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.write_internal(pn, sn, key, buf).await } + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, + ) -> impl Future> + 'static + Send { + let inner = Arc::clone(&self.inner); + let pn = primary_namespace.to_string(); + let sn = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.remove_internal(pn, sn, key).await } + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> impl Future, io::Error>> + 'static + Send { + let inner = Arc::clone(&self.inner); + let pn = primary_namespace.to_string(); + let sn = secondary_namespace.to_string(); + + async move { inner.list_internal(pn, sn).await } + } +} + +impl KVStoreSync for DelayedStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, io::Error> { + let inner = Arc::clone(&self.inner); + let pn = primary_namespace.to_string(); + let sn = secondary_namespace.to_string(); + let key = key.to_string(); + + self.runtime.block_on(async move { inner.read_internal(pn, sn, key).await }) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), io::Error> { + let inner = Arc::clone(&self.inner); + let pn = primary_namespace.to_string(); + let sn = secondary_namespace.to_string(); + let key = key.to_string(); + + self.runtime.block_on(async move { inner.write_internal(pn, sn, key, buf).await }) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, + ) -> Result<(), io::Error> { + let inner = Arc::clone(&self.inner); + let pn = primary_namespace.to_string(); + let sn = secondary_namespace.to_string(); + let key = key.to_string(); + + self.runtime.block_on(async move { inner.remove_internal(pn, sn, key).await }) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, io::Error> { + let inner = Arc::clone(&self.inner); + let pn = primary_namespace.to_string(); + let sn = secondary_namespace.to_string(); + + self.runtime.block_on(async move { inner.list_internal(pn, sn).await }) + } +} diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs new file mode 100644 index 000000000..70904c36f --- /dev/null +++ b/src/io/tier_store.rs @@ -0,0 +1,980 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. +#![allow(dead_code)] // TODO: Temporal warning silencer. Will be removed in later commit. + +use crate::io::utils::check_namespace_key_validity; +use crate::logger::{LdkLogger, Logger}; +use crate::runtime::Runtime; +use crate::types::DynStore; + +use lightning::util::persist::{ + KVStore, KVStoreSync, NETWORK_GRAPH_PERSISTENCE_KEY, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, +}; +use lightning::{io, log_trace}; +use lightning::{log_debug, log_error, log_info, log_warn}; + +use tokio::sync::mpsc::{self, error::TrySendError}; + +use std::future::Future; +use std::sync::Arc; + +#[cfg(not(test))] +const BACKUP_QUEUE_CAPACITY: usize = 100; +#[cfg(test)] +const BACKUP_QUEUE_CAPACITY: usize = 5; + +/// A 3-tiered [`KVStoreSync`] implementation that manages data across +/// three distinct storage locations, i.e. primary (preferably remote) +/// store for all critical data, optional ephemeral (local) store for +/// non-critical and easily rebuildable data, and backup (preferably +/// local) to lazily backup the primary store for disaster recovery +/// scenarios. +pub(crate) struct TierStore { + inner: Arc, + runtime: Arc, + logger: Arc, +} + +impl TierStore { + pub fn new(primary_store: Arc, runtime: Arc, logger: Arc) -> Self { + let inner = Arc::new(TierStoreInner::new(primary_store, Arc::clone(&logger))); + + Self { inner, runtime, logger } + } + + /// Configures the local backup store for disaster recovery. + /// + /// This store serves as a local copy of the critical data for disaster + /// recovery scenarios. When configured, this method also spawns a background + /// task that asynchronously processes backup writes and removals to avoid + /// blocking primary store operations. + /// + /// The backup operates on a best-effort basis: + /// - Writes are queued asynchronously (non-blocking) + /// - No retry logic (We assume local store is unlikely to have transient failures). + /// - Failures are logged but don't propagate to all the way to caller. + pub fn set_backup_store(&mut self, backup: Arc) { + let (tx, rx) = mpsc::channel::(BACKUP_QUEUE_CAPACITY); + + let backup_clone = Arc::clone(&backup); + let logger = Arc::clone(&self.logger); + + self.runtime.spawn_background_task(Self::process_backup_operation( + rx, + backup_clone, + logger, + )); + + debug_assert_eq!(Arc::strong_count(&self.inner), 1); + + let inner = Arc::get_mut(&mut self.inner).expect( + "TierStore should not be shared during configuration. No other references should exist", + ); + + inner.backup_store = Some(backup); + inner.backup_sender = Some(tx); + } + + async fn process_backup_operation( + mut receiver: mpsc::Receiver, backup_store: Arc, logger: Arc, + ) { + while let Some(op) = receiver.recv().await { + match Self::apply_backup_operation(&op, &backup_store).await { + Ok(_) => { + log_trace!( + logger, + "Backup succeeded for key {}/{}/{}", + op.primary_namespace(), + op.secondary_namespace(), + op.key() + ); + }, + Err(e) => { + log_error!( + logger, + "Backup failed permanently for key {}/{}/{}: {}", + op.primary_namespace(), + op.secondary_namespace(), + op.key(), + e + ); + }, + } + } + } + + async fn apply_backup_operation(op: &BackupOp, store: &Arc) -> io::Result<()> { + match op { + BackupOp::Write { primary_namespace, secondary_namespace, key, data } => { + KVStore::write( + store.as_ref(), + primary_namespace, + secondary_namespace, + key, + data.clone(), + ) + .await + }, + BackupOp::Remove { primary_namespace, secondary_namespace, key, lazy } => { + KVStore::remove(store.as_ref(), primary_namespace, secondary_namespace, key, *lazy) + .await + }, + } + } + + /// Configures the local store for non-critical data storage. + pub fn set_ephemeral_store(&mut self, ephemeral: Arc) { + debug_assert_eq!(Arc::strong_count(&self.inner), 1); + + let inner = Arc::get_mut(&mut self.inner).expect( + "TierStore should not be shared during configuration. No other references should exist", + ); + + inner.ephemeral_store = Some(ephemeral); + } +} + +impl KVStore for TierStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, io::Error>> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.read_internal(primary_namespace, secondary_namespace, key).await } + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.write_internal(primary_namespace, secondary_namespace, key, buf).await } + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> impl Future> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.remove_internal(primary_namespace, secondary_namespace, key, lazy).await } + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> impl Future, io::Error>> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + + async move { inner.list_internal(primary_namespace, secondary_namespace).await } + } +} + +impl KVStoreSync for TierStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + self.runtime.block_on(self.inner.read_internal( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + )) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + self.runtime.block_on(self.inner.write_internal( + primary_namespace, + secondary_namespace, + key, + buf, + )) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + self.runtime.block_on(self.inner.remove_internal( + primary_namespace, + secondary_namespace, + key, + lazy, + )) + } + + fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { + self.runtime.block_on( + self.inner + .list_internal(primary_namespace.to_string(), secondary_namespace.to_string()), + ) + } +} + +pub struct TierStoreInner { + /// For remote data. + primary_store: Arc, + /// For local non-critical/ephemeral data. + ephemeral_store: Option>, + /// For redundancy (disaster recovery). + backup_store: Option>, + backup_sender: Option>, + logger: Arc, +} + +impl TierStoreInner { + /// Creates a tier store with the primary data store. + pub fn new(primary_store: Arc, logger: Arc) -> Self { + Self { + primary_store, + ephemeral_store: None, + backup_store: None, + backup_sender: None, + logger, + } + } + + /// Queues data for asynchronous backup/write to the configured backup store. + /// + /// We perform a non-blocking send to avoid impacting primary storage operations. + /// This is a no-op if backup store is not configured. + /// + /// ## Returns + /// - `Ok(())`: Backup was successfully queued or no backup is configured + /// - `Err(WouldBlock)`: Backup queue is full - data was not queued + /// - `Err(BrokenPipe)`: Backup queue is no longer available + fn enqueue_backup_write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + if let Some(backup_sender) = &self.backup_sender { + let backup_res = backup_sender.try_send(BackupOp::Write { + primary_namespace: primary_namespace.to_string(), + secondary_namespace: secondary_namespace.to_string(), + key: key.to_string(), + data: buf, + }); + if let Err(e) = backup_res { + match e { + // Assuming the channel is only full for a short time, should we explore + // retrying here to add some resiliency? + TrySendError::Full(op) => { + log_warn!( + self.logger, + "Backup queue is full. Cannot write data for key: {}/{}/{}", + op.primary_namespace(), + op.secondary_namespace(), + op.key() + ); + let e = io::Error::new( + io::ErrorKind::WouldBlock, + "Backup queue is currently full.", + ); + return Err(e); + }, + TrySendError::Closed(op) => { + log_error!( + self.logger, + "Backup queue is closed. Cannot write data for key: {}/{}/{}", + op.primary_namespace(), + op.secondary_namespace(), + op.key() + ); + let e = + io::Error::new(io::ErrorKind::BrokenPipe, "Backup queue is closed."); + return Err(e); + }, + } + } + } + Ok(()) + } + + /// Queues the removal of data from the configured backup store. + /// + /// We perform a non-blocking send to avoid impacting primary storage operations. + /// This is a no-op if backup store is not configured. + /// + /// # Returns + /// - `Ok(())`: Backup was successfully queued or no backup is configured + /// - `Err(WouldBlock)`: Backup queue is full - data was not queued + /// - `Err(BrokenPipe)`: Backup system is no longer available + fn enqueue_backup_remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + if let Some(backup_sender) = &self.backup_sender { + let removal_res = backup_sender.try_send(BackupOp::Remove { + primary_namespace: primary_namespace.to_string(), + secondary_namespace: secondary_namespace.to_string(), + key: key.to_string(), + lazy, + }); + if let Err(e) = removal_res { + match e { + TrySendError::Full(op) => { + log_warn!( + self.logger, + "Backup queue is full. Cannot remove data for key: {}/{}/{}", + op.primary_namespace(), + op.secondary_namespace(), + op.key() + ); + let e = io::Error::new( + io::ErrorKind::WouldBlock, + "Backup queue is currently full.", + ); + return Err(e); + }, + TrySendError::Closed(op) => { + log_error!( + self.logger, + "Backup queue is closed. Cannot remove data for key: {}/{}/{}", + op.primary_namespace(), + op.secondary_namespace(), + op.key() + ); + let e = + io::Error::new(io::ErrorKind::BrokenPipe, "Backup queue is closed."); + return Err(e); + }, + } + } + } + Ok(()) + } + + /// Reads from the primary data store. + async fn read_primary( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + match KVStore::read( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + ) + .await + { + Ok(data) => { + log_info!( + self.logger, + "Read succeeded for key: {}/{}/{}", + primary_namespace, + secondary_namespace, + key + ); + Ok(data) + }, + Err(e) => { + log_error!( + self.logger, + "Failed to read from primary store for key {}/{}/{}: {}.", + primary_namespace, + secondary_namespace, + key, + e + ); + Err(e) + }, + } + } + + /// Lists keys from the primary data store. + async fn list_primary( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> io::Result> { + match KVStore::list(self.primary_store.as_ref(), primary_namespace, secondary_namespace) + .await + { + Ok(keys) => { + log_info!( + self.logger, + "List succeeded for namespace: {}/{}", + primary_namespace, + secondary_namespace + ); + return Ok(keys); + }, + Err(e) => { + log_error!( + self.logger, + "Failed to list from primary store for namespace {}/{}: {}.", + primary_namespace, + secondary_namespace, + e + ); + Err(e) + }, + } + } + + async fn primary_write_then_schedule_backup( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + match KVStore::write( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf.clone(), + ) + .await + { + Ok(()) => { + if let Err(e) = + self.enqueue_backup_write(primary_namespace, secondary_namespace, key, buf) + { + // We don't propagate backup errors here, opting to log only. + log_warn!( + self.logger, + "Failed to queue backup write for key: {}/{}/{}. Error: {}", + primary_namespace, + secondary_namespace, + key, + e + ) + } + + Ok(()) + }, + Err(e) => { + log_debug!( + self.logger, + "Skipping backup write due to primary write failure for key: {}/{}/{}.", + primary_namespace, + secondary_namespace, + key + ); + Err(e) + }, + } + } + + async fn primary_remove_then_schedule_backup( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + match KVStore::remove( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ) + .await + { + Ok(()) => { + if let Err(e) = + self.enqueue_backup_remove(primary_namespace, secondary_namespace, key, lazy) + { + // We don't propagate backup errors here, opting to silently log. + log_warn!( + self.logger, + "Failed to queue backup removal for key: {}/{}/{}. Error: {}", + primary_namespace, + secondary_namespace, + key, + e + ) + } + + Ok(()) + }, + Err(e) => { + log_debug!( + self.logger, + "Skipping backup removal due to primary removal failure for key: {}/{}/{}.", + primary_namespace, + secondary_namespace, + key + ); + Err(e) + }, + } + } + + async fn read_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "read", + )?; + + match (primary_namespace.as_str(), secondary_namespace.as_str(), key.as_str()) { + (NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) => { + if let Some(eph_store) = self.ephemeral_store.as_ref() { + // We only try once here (without retry logic) because local failure might be indicative + // of a more serious issue (e.g. full memory, memory corruption, permissions change) that + // do not self-resolve such that retrying would negate the latency benefits. + + // The following questions remain: + // 1. Are there situations where local transient errors may warrant a retry? + // 2. Can we reliably identify/detect these transient errors? + // 3. Should we fall back to the primary or backup stores in the event of any error? + KVStore::read( + eph_store.as_ref(), + &primary_namespace, + &secondary_namespace, + &key, + ) + .await + } else { + log_debug!(self.logger, "Ephemeral store not configured. Reading non-critical data from primary or backup stores."); + self.read_primary(&primary_namespace, &secondary_namespace, &key).await + } + }, + _ => self.read_primary(&primary_namespace, &secondary_namespace, &key).await, + } + } + + async fn write_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> io::Result<()> { + match (primary_namespace.as_str(), secondary_namespace.as_str(), key.as_str()) { + (NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) => { + if let Some(eph_store) = &self.ephemeral_store { + KVStore::write( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await + } else { + log_debug!(self.logger, "Ephemeral store not configured. Writing non-critical data to primary and backup stores."); + + self.primary_write_then_schedule_backup( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await + } + }, + _ => { + self.primary_write_then_schedule_backup( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await + }, + } + } + + async fn remove_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> io::Result<()> { + match (primary_namespace.as_str(), secondary_namespace.as_str(), key.as_str()) { + (NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) => { + if let Some(eph_store) = &self.ephemeral_store { + KVStore::remove( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await + } else { + log_debug!(self.logger, "Ephemeral store not configured. Removing non-critical data from primary and backup stores."); + + self.primary_remove_then_schedule_backup( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await + } + }, + _ => { + self.primary_remove_then_schedule_backup( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await + }, + } + } + + async fn list_internal( + &self, primary_namespace: String, secondary_namespace: String, + ) -> io::Result> { + match (primary_namespace.as_str(), secondary_namespace.as_str()) { + ( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + ) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _) => { + if let Some(eph_store) = self.ephemeral_store.as_ref() { + KVStoreSync::list(eph_store.as_ref(), &primary_namespace, &secondary_namespace) + } else { + log_debug!( + self.logger, + "Ephemeral store not configured. Listing from primary and backup stores." + ); + self.list_primary(&primary_namespace, &secondary_namespace).await + } + }, + _ => self.list_primary(&primary_namespace, &secondary_namespace).await, + } + } +} + +enum BackupOp { + Write { primary_namespace: String, secondary_namespace: String, key: String, data: Vec }, + Remove { primary_namespace: String, secondary_namespace: String, key: String, lazy: bool }, +} + +impl BackupOp { + fn primary_namespace(&self) -> &str { + match self { + BackupOp::Write { primary_namespace, .. } + | BackupOp::Remove { primary_namespace, .. } => primary_namespace, + } + } + + fn secondary_namespace(&self) -> &str { + match self { + BackupOp::Write { secondary_namespace, .. } + | BackupOp::Remove { secondary_namespace, .. } => secondary_namespace, + } + } + + fn key(&self) -> &str { + match self { + BackupOp::Write { key, .. } | BackupOp::Remove { key, .. } => key, + } + } +} + +#[cfg(test)] +mod tests { + use std::panic::RefUnwindSafe; + use std::path::PathBuf; + use std::sync::Arc; + use std::thread; + use std::time::Duration; + + use lightning::util::logger::Level; + use lightning::util::persist::{ + CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + }; + use lightning_persister::fs_store::FilesystemStore; + + use crate::io::test_utils::{ + do_read_write_remove_list_persist, random_storage_path, DelayedStore, + }; + use crate::io::tier_store::TierStore; + use crate::logger::Logger; + use crate::runtime::Runtime; + #[cfg(not(feature = "uniffi"))] + use crate::types::DynStore; + use crate::types::DynStoreWrapper; + + use super::*; + + impl RefUnwindSafe for TierStore {} + + struct CleanupDir(PathBuf); + impl Drop for CleanupDir { + fn drop(&mut self) { + let _ = std::fs::remove_dir_all(&self.0); + } + } + + fn setup_tier_store( + primary_store: Arc, logger: Arc, runtime: Arc, + ) -> TierStore { + TierStore::new(primary_store, runtime, logger) + } + + #[test] + fn write_read_list_remove() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let runtime = Arc::new(Runtime::new(Arc::clone(&logger)).unwrap()); + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let tier = setup_tier_store(primary_store, logger, runtime); + + do_read_write_remove_list_persist(&tier); + } + + #[test] + fn ephemeral_routing() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let runtime = Arc::new(Runtime::new(Arc::clone(&logger)).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = setup_tier_store(Arc::clone(&primary_store), logger, runtime); + + let ephemeral_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("ephemeral")))); + tier.set_ephemeral_store(Arc::clone(&ephemeral_store)); + + let data = vec![42u8; 32]; + + // Non-critical + KVStoreSync::write( + &tier, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + data.clone(), + ) + .unwrap(); + + // Critical + KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + data.clone(), + ) + .unwrap(); + + let primary_read_ng = KVStoreSync::read( + &*primary_store, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ); + let ephemeral_read_ng = KVStoreSync::read( + &*ephemeral_store, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ); + + let primary_read_cm = KVStoreSync::read( + &*primary_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + let ephemeral_read_cm = KVStoreSync::read( + &*ephemeral_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + + assert!(primary_read_ng.is_err()); + assert_eq!(ephemeral_read_ng.unwrap(), data); + + assert!(ephemeral_read_cm.is_err()); + assert_eq!(primary_read_cm.unwrap(), data); + } + + #[test] + fn lazy_backup() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + let runtime = Arc::new(Runtime::new(Arc::clone(&logger)).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = setup_tier_store(Arc::clone(&primary_store), logger, runtime); + + let backup_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("backup")))); + tier.set_backup_store(Arc::clone(&backup_store)); + + let data = vec![42u8; 32]; + + KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + data.clone(), + ) + .unwrap(); + + // Immediate read from backup should fail + let backup_read_cm = KVStoreSync::read( + &*backup_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + assert!(backup_read_cm.is_err()); + + // Primary not blocked by backup hence immediate read should succeed + let primary_read_cm = KVStoreSync::read( + &*primary_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + assert_eq!(primary_read_cm.unwrap(), data); + + // Delayed read from backup should succeed + thread::sleep(Duration::from_millis(50)); + let backup_read_cm = KVStoreSync::read( + &*backup_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + assert_eq!(backup_read_cm.unwrap(), data); + } + + #[test] + fn backup_overflow_doesnt_fail_writes() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path.clone(), Level::Trace).unwrap()); + let runtime = Arc::new(Runtime::new(Arc::clone(&logger)).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = + setup_tier_store(Arc::clone(&primary_store), Arc::clone(&logger), Arc::clone(&runtime)); + + let backup_store: Arc = + Arc::new(DynStoreWrapper(DelayedStore::new(100, runtime))); + tier.set_backup_store(Arc::clone(&backup_store)); + + let data = vec![42u8; 32]; + + let key = CHANNEL_MANAGER_PERSISTENCE_KEY; + for i in 0..=10 { + let result = KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + &format!("{}_{}", key, i), + data.clone(), + ); + + assert!(result.is_ok(), "Write {} should succeed", i); + } + + // Check logs for backup queue overflow message + let log_contents = std::fs::read_to_string(&log_path).unwrap(); + assert!( + log_contents.contains("Backup queue is full"), + "Logs should contain backup queue overflow message" + ); + } + + #[test] + fn lazy_removal() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path.clone(), Level::Trace).unwrap()); + let runtime = Arc::new(Runtime::new(Arc::clone(&logger)).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = + setup_tier_store(Arc::clone(&primary_store), Arc::clone(&logger), Arc::clone(&runtime)); + + let backup_store: Arc = + Arc::new(DynStoreWrapper(DelayedStore::new(100, runtime))); + tier.set_backup_store(Arc::clone(&backup_store)); + + let data = vec![42u8; 32]; + + let key = CHANNEL_MANAGER_PERSISTENCE_KEY; + let write_result = KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + data.clone(), + ); + assert!(write_result.is_ok(), "Write should succeed"); + + thread::sleep(Duration::from_millis(10)); + + assert_eq!( + KVStoreSync::read( + &*backup_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + ) + .unwrap(), + data + ); + + KVStoreSync::remove( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + true, + ) + .unwrap(); + + thread::sleep(Duration::from_millis(10)); + + let res = KVStoreSync::read( + &*backup_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + ); + + assert!(res.is_err()); + } +} diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index a16a395a6..c2f68b907 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -17,14 +17,14 @@ use bitcoin::hashes::Hash; use bitcoin::{Address, Amount, ScriptBuf, Txid}; use common::logging::{init_log_logger, validate_log_entry, MultiNodeLogger, TestLogWriter}; use common::{ - bump_fee_and_broadcast, distribute_funds_unconfirmed, do_channel_full_cycle, - expect_channel_pending_event, expect_channel_ready_event, expect_channel_ready_events, - expect_event, expect_payment_claimable_event, expect_payment_received_event, - expect_payment_successful_event, expect_splice_pending_event, generate_blocks_and_wait, - open_channel, open_channel_push_amt, premine_and_distribute_funds, premine_blocks, prepare_rbf, - random_chain_source, random_config, random_listening_addresses, setup_bitcoind_and_electrsd, - setup_builder, setup_node, setup_two_nodes, wait_for_tx, TestChainSource, TestStoreType, - TestSyncStore, + build_node_with_store, bump_fee_and_broadcast, distribute_funds_unconfirmed, + do_channel_full_cycle, expect_channel_pending_event, expect_channel_ready_event, + expect_channel_ready_events, expect_event, expect_payment_claimable_event, + expect_payment_received_event, expect_payment_successful_event, expect_splice_pending_event, + generate_blocks_and_wait, open_channel, open_channel_push_amt, premine_and_distribute_funds, + premine_blocks, prepare_rbf, random_chain_source, random_config, random_listening_addresses, + setup_bitcoind_and_electrsd, setup_builder, setup_node, setup_two_nodes, wait_for_tx, + TestChainSource, TestStoreType, TestSyncStore, }; use ldk_node::config::{AsyncPaymentsRole, EsploraSyncConfig}; use ldk_node::entropy::NodeEntropy; From 42cae4e48b12a07d0138763e4689c3063f4df1bf Mon Sep 17 00:00:00 2001 From: Enigbe Date: Thu, 19 Feb 2026 09:44:55 +0100 Subject: [PATCH 02/10] Integrate TierStore into NodeBuilder Adds TierStoreConfig and two configuration methods to NodeBuilder: - set_backup_store: Configure backup store for disaster recovery - set_ephemeral_store: Configure ephemeral store for non-critical data Modifies build_with_store to wrap the provided store in a TierStore, as the primary store, applying any configured ephemeral and backup stores. Note: Temporal dead code allowance will be removed in test commit. --- src/builder.rs | 65 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 64 insertions(+), 1 deletion(-) diff --git a/src/builder.rs b/src/builder.rs index 7a285876f..5d1077f33 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -54,6 +54,7 @@ use crate::event::EventQueue; use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; +use crate::io::tier_store::TierStore; use crate::io::utils::{ read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph, read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments, @@ -150,6 +151,21 @@ impl std::fmt::Debug for LogWriterConfig { } } +#[derive(Default)] +struct TierStoreConfig { + ephemeral: Option>, + backup: Option>, +} + +impl std::fmt::Debug for TierStoreConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TierStoreConfig") + .field("ephemeral", &self.ephemeral.as_ref().map(|_| "Arc")) + .field("backup", &self.backup.as_ref().map(|_| "Arc")) + .finish() + } +} + /// An error encountered during building a [`Node`]. /// /// [`Node`]: crate::Node @@ -242,6 +258,7 @@ pub struct NodeBuilder { liquidity_source_config: Option, log_writer_config: Option, async_payments_role: Option, + tier_store_config: Option, runtime_handle: Option, pathfinding_scores_sync_config: Option, recovery_mode: bool, @@ -260,6 +277,7 @@ impl NodeBuilder { let gossip_source_config = None; let liquidity_source_config = None; let log_writer_config = None; + let tier_store_config = None; let runtime_handle = None; let pathfinding_scores_sync_config = None; let recovery_mode = false; @@ -269,6 +287,7 @@ impl NodeBuilder { gossip_source_config, liquidity_source_config, log_writer_config, + tier_store_config, runtime_handle, async_payments_role: None, pathfinding_scores_sync_config, @@ -557,6 +576,33 @@ impl NodeBuilder { self } + /// Configures the backup store for local disaster recovery. + /// + /// When building with tiered storage, this store receives asynchronous copies + /// of all critical data written to the primary store. + /// + /// Backup writes are non-blocking and do not affect primary store operation performance. + #[allow(dead_code)] + pub fn set_backup_store(&mut self, backup_store: Arc) -> &mut Self { + let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); + tier_store_config.backup = Some(backup_store); + self + } + + /// Configures the ephemeral store for non-critical, frequently-accessed data. + /// + /// When building with tiered storage, this store is used for ephemeral data like + /// the network graph and scorer data to reduce latency for reads. Data stored here + /// can be rebuilt if lost. + /// + /// If not set, non-critical data will be stored in the primary store. + #[allow(dead_code)] + pub fn set_ephemeral_store(&mut self, ephemeral_store: Arc) -> &mut Self { + let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); + tier_store_config.ephemeral = Some(ephemeral_store); + self + } + /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: NodeEntropy) -> Result { @@ -668,6 +714,14 @@ impl NodeBuilder { } /// Builds a [`Node`] instance according to the options previously configured. + /// + /// The provided `kv_store` will be used as the primary storage backend. Optionally, + /// an ephemeral store for frequently-accessed non-critical data (e.g., network graph, scorer) + /// and a backup store for local disaster recovery can be configured via + /// [`set_ephemeral_store`] and [`set_backup_store`]. + /// + /// [`set_ephemeral_store`]: Self::set_ephemeral_store + /// [`set_backup_store`]: Self::set_backup_store pub fn build_with_store( &self, node_entropy: NodeEntropy, kv_store: S, ) -> Result { @@ -682,6 +736,15 @@ impl NodeBuilder { })?) }; + let ts_config = self.tier_store_config.as_ref(); + let primary_store: Arc = Arc::new(DynStoreWrapper(kv_store)); + let mut tier_store = + TierStore::new(primary_store, Arc::clone(&runtime), Arc::clone(&logger)); + if let Some(config) = ts_config { + config.ephemeral.as_ref().map(|s| tier_store.set_ephemeral_store(Arc::clone(s))); + config.backup.as_ref().map(|s| tier_store.set_backup_store(Arc::clone(s))); + } + let seed_bytes = node_entropy.to_seed_bytes(); let config = Arc::new(self.config.clone()); @@ -696,7 +759,7 @@ impl NodeBuilder { seed_bytes, runtime, logger, - Arc::new(DynStoreWrapper(kv_store)), + Arc::new(DynStoreWrapper(tier_store)), ) } } From 451a392df2bea4ce066fef21e4c5a2bcf5befa69 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Thu, 19 Feb 2026 21:30:56 +0100 Subject: [PATCH 03/10] Expose tier storage configuration across the FFI boundary Introduce FFI-safe abstractions and builder APIs to allow foreign language targets to configure custom backup and ephemeral stores when constructing nodes with a custom store. Major changes include: - Addition of FfiDynStoreTrait, an FFI-safe equivalent of DynStoreTrait, working around uniffi's lack of support for Pin> - Addition of FfiDynStore, a concrete wrapper for foreign language store implementations - Provision of FfiDynStoreTrait implementation for DynStoreWrapper to bridge native Rust stores to FFI layer (useful in testing) - Extension of ArcedNodeBuilder with methods for configuring backup and ephemeral stores - Exposure of build_with_store so foreign targets can build nodes with custom store implementations - Addition of build_node_with_store test helper to abstract uniffi-gated store wrapping at build_with_store call sites --- Cargo.toml | 1 + bindings/ldk_node.udl | 51 ++++++ src/builder.rs | 47 ++++- src/ffi/types.rs | 310 +++++++++++++++++++++++++++++++- src/lib.rs | 12 +- src/types.rs | 8 +- tests/common/mod.rs | 19 +- tests/integration_tests_rust.rs | 5 +- 8 files changed, 434 insertions(+), 19 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index eda73f04f..333ba3a90 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,7 @@ bitcoin = "0.32.7" bip39 = { version = "2.0.0", features = ["rand"] } bip21 = { version = "0.5", features = ["std"], default-features = false } +async-trait = { version = "0.1.89" } base64 = { version = "0.22.1", default-features = false, features = ["std"] } rand = { version = "0.9.2", default-features = false, features = ["std", "thread_rng", "os_rng"] } chrono = { version = "0.4", default-features = false, features = ["clock"] } diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index d40f72f4a..2f580e24a 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -103,6 +103,53 @@ interface LogWriter { void log(LogRecord record); }; +interface FfiDynStore { + [Name=from_store] + constructor(FfiDynStoreTrait store); +}; + +[Trait, WithForeign] +interface FfiDynStoreTrait { + [Throws=IOError] + sequence read(string primary_namespace, string secondary_namespace, string key); + [Throws=IOError] + void write(string primary_namespace, string secondary_namespace, string key, sequence buf); + [Throws=IOError] + void remove(string primary_namespace, string secondary_namespace, string key, boolean lazy); + [Throws=IOError] + sequence list(string primary_namespace, string secondary_namespace); + [Throws=IOError, Async] + sequence read_async(string primary_namespace, string secondary_namespace, string key); + [Throws=IOError, Async] + void write_async(string primary_namespace, string secondary_namespace, string key, sequence buf); + [Throws=IOError, Async] + void remove_async(string primary_namespace, string secondary_namespace, string key, boolean lazy); + [Throws=IOError, Async] + sequence list_async(string primary_namespace, string secondary_namespace); +}; + +[Error] +enum IOError { + "NotFound", + "PermissionDenied", + "ConnectionRefused", + "ConnectionReset", + "ConnectionAborted", + "NotConnected", + "AddrInUse", + "AddrNotAvailable", + "BrokenPipe", + "AlreadyExists", + "WouldBlock", + "InvalidInput", + "InvalidData", + "TimedOut", + "WriteZero", + "Interrupted", + "UnexpectedEof", + "Other", +}; + interface Builder { constructor(); [Name=from_config] @@ -127,6 +174,8 @@ interface Builder { void set_announcement_addresses(sequence announcement_addresses); [Throws=BuildError] void set_node_alias(string node_alias); + void set_backup_store(FfiDynStore backup_store); + void set_ephemeral_store(FfiDynStore ephemeral_store); [Throws=BuildError] void set_async_payments_role(AsyncPaymentsRole? role); void set_wallet_recovery_mode(); @@ -140,6 +189,8 @@ interface Builder { Node build_with_vss_store_and_fixed_headers(NodeEntropy node_entropy, string vss_url, string store_id, record fixed_headers); [Throws=BuildError] Node build_with_vss_store_and_header_provider(NodeEntropy node_entropy, string vss_url, string store_id, VssHeaderProvider header_provider); + [Throws=BuildError] + Node build_with_store(NodeEntropy node_entropy, FfiDynStore store); }; interface Node { diff --git a/src/builder.rs b/src/builder.rs index 5d1077f33..b4da4a9ad 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -52,6 +52,8 @@ use crate::connection::ConnectionManager; use crate::entropy::NodeEntropy; use crate::event::EventQueue; use crate::fee_estimator::OnchainFeeEstimator; +#[cfg(feature = "uniffi")] +use crate::ffi::FfiDynStore; use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; use crate::io::tier_store::TierStore; @@ -582,7 +584,6 @@ impl NodeBuilder { /// of all critical data written to the primary store. /// /// Backup writes are non-blocking and do not affect primary store operation performance. - #[allow(dead_code)] pub fn set_backup_store(&mut self, backup_store: Arc) -> &mut Self { let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); tier_store_config.backup = Some(backup_store); @@ -596,7 +597,6 @@ impl NodeBuilder { /// can be rebuilt if lost. /// /// If not set, non-critical data will be stored in the primary store. - #[allow(dead_code)] pub fn set_ephemeral_store(&mut self, ephemeral_store: Arc) -> &mut Self { let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); tier_store_config.ephemeral = Some(ephemeral_store); @@ -1005,6 +1005,32 @@ impl ArcedNodeBuilder { self.inner.write().unwrap().set_wallet_recovery_mode(); } + /// Configures the backup store for local disaster recovery. + /// + /// When building with tiered storage, this store receives asynchronous copies + /// of all critical data written to the primary store. + /// + /// Backup writes are non-blocking and do not affect primary store operation performance. + /// + pub fn set_backup_store(&self, backup_store: Arc) { + let wrapper = DynStoreWrapper((*backup_store).clone()); + let store: Arc = Arc::new(wrapper); + self.inner.write().unwrap().set_backup_store(store); + } + + /// Configures the ephemeral store for non-critical, frequently-accessed data. + /// + /// When building with tiered storage, this store is used for ephemeral data like + /// the network graph and scorer data to reduce latency for reads. Data stored here + /// can be rebuilt if lost. + /// + /// If not set, non-critical data will be stored in the primary store. + pub fn set_ephemeral_store(&self, ephemeral_store: Arc) { + let wrapper = DynStoreWrapper((*ephemeral_store).clone()); + let store: Arc = Arc::new(wrapper); + self.inner.write().unwrap().set_ephemeral_store(store); + } + /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: Arc) -> Result, BuildError> { @@ -1104,12 +1130,19 @@ impl ArcedNodeBuilder { } /// Builds a [`Node`] instance according to the options previously configured. - // Note that the generics here don't actually work for Uniffi, but we don't currently expose - // this so its not needed. - pub fn build_with_store( - &self, node_entropy: Arc, kv_store: S, + /// + /// The provided `kv_store` will be used as the primary storage backend. Optionally, + /// an ephemeral store for frequently-accessed non-critical data (e.g., network graph, scorer) + /// and a backup store for local disaster recovery can be configured via + /// [`set_ephemeral_store`] and [`set_backup_store`]. + /// + /// [`set_ephemeral_store`]: Self::set_ephemeral_store + /// [`set_backup_store`]: Self::set_backup_store + pub fn build_with_store( + &self, node_entropy: Arc, kv_store: Arc, ) -> Result, BuildError> { - self.inner.read().unwrap().build_with_store(*node_entropy, kv_store).map(Arc::new) + let store = (*kv_store).clone(); + self.inner.read().unwrap().build_with_store(*node_entropy, store).map(Arc::new) } } diff --git a/src/ffi/types.rs b/src/ffi/types.rs index 2a349a967..304a493a9 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -11,11 +11,13 @@ // Make sure to add any re-exported items that need to be used in uniffi below. use std::convert::TryInto; +use std::future::Future; use std::ops::Deref; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use async_trait::async_trait; pub use bip39::Mnemonic; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::hashes::Hash; @@ -32,6 +34,7 @@ use lightning::offers::refund::Refund as LdkRefund; use lightning::onion_message::dns_resolution::HumanReadableName as LdkHumanReadableName; pub use lightning::routing::gossip::{NodeAlias, NodeId, RoutingFees}; pub use lightning::routing::router::RouteParametersConfig; +use lightning::util::persist::{KVStore, KVStoreSync}; use lightning::util::ser::Writeable; use lightning_invoice::{Bolt11Invoice as LdkBolt11Invoice, Bolt11InvoiceDescriptionRef}; pub use lightning_invoice::{Description, SignedRawBolt11Invoice}; @@ -57,7 +60,312 @@ pub use crate::payment::store::{ ConfirmationStatus, LSPFeeLimits, PaymentDirection, PaymentKind, PaymentStatus, }; pub use crate::payment::UnifiedPaymentResult; -use crate::{hex_utils, SocketAddress, UniffiCustomTypeConverter, UserChannelId}; +use crate::types::DynStoreTrait; +use crate::{ + hex_utils, DynStoreWrapper, SocketAddress, SyncAndAsyncKVStore, UniffiCustomTypeConverter, + UserChannelId, +}; + +#[derive(Debug)] +pub enum IOError { + NotFound, + PermissionDenied, + ConnectionRefused, + ConnectionReset, + ConnectionAborted, + NotConnected, + AddrInUse, + AddrNotAvailable, + BrokenPipe, + AlreadyExists, + WouldBlock, + InvalidInput, + InvalidData, + TimedOut, + WriteZero, + Interrupted, + UnexpectedEof, + Other, +} + +impl From for IOError { + fn from(error: bitcoin::io::Error) -> Self { + match error.kind() { + bitcoin::io::ErrorKind::NotFound => IOError::NotFound, + bitcoin::io::ErrorKind::PermissionDenied => IOError::PermissionDenied, + bitcoin::io::ErrorKind::ConnectionRefused => IOError::ConnectionRefused, + bitcoin::io::ErrorKind::ConnectionReset => IOError::ConnectionReset, + bitcoin::io::ErrorKind::ConnectionAborted => IOError::ConnectionAborted, + bitcoin::io::ErrorKind::NotConnected => IOError::NotConnected, + bitcoin::io::ErrorKind::AddrInUse => IOError::AddrInUse, + bitcoin::io::ErrorKind::AddrNotAvailable => IOError::AddrNotAvailable, + bitcoin::io::ErrorKind::BrokenPipe => IOError::BrokenPipe, + bitcoin::io::ErrorKind::AlreadyExists => IOError::AlreadyExists, + bitcoin::io::ErrorKind::WouldBlock => IOError::WouldBlock, + bitcoin::io::ErrorKind::InvalidInput => IOError::InvalidInput, + bitcoin::io::ErrorKind::InvalidData => IOError::InvalidData, + bitcoin::io::ErrorKind::TimedOut => IOError::TimedOut, + bitcoin::io::ErrorKind::WriteZero => IOError::WriteZero, + bitcoin::io::ErrorKind::Interrupted => IOError::Interrupted, + bitcoin::io::ErrorKind::UnexpectedEof => IOError::UnexpectedEof, + bitcoin::io::ErrorKind::Other => IOError::Other, + } + } +} + +impl From for bitcoin::io::Error { + fn from(error: IOError) -> Self { + match error { + IOError::NotFound => bitcoin::io::ErrorKind::NotFound.into(), + IOError::PermissionDenied => bitcoin::io::ErrorKind::PermissionDenied.into(), + IOError::ConnectionRefused => bitcoin::io::ErrorKind::ConnectionRefused.into(), + IOError::ConnectionReset => bitcoin::io::ErrorKind::ConnectionReset.into(), + IOError::ConnectionAborted => bitcoin::io::ErrorKind::ConnectionAborted.into(), + IOError::NotConnected => bitcoin::io::ErrorKind::NotConnected.into(), + IOError::AddrInUse => bitcoin::io::ErrorKind::AddrInUse.into(), + IOError::AddrNotAvailable => bitcoin::io::ErrorKind::AddrNotAvailable.into(), + IOError::BrokenPipe => bitcoin::io::ErrorKind::BrokenPipe.into(), + IOError::AlreadyExists => bitcoin::io::ErrorKind::AlreadyExists.into(), + IOError::WouldBlock => bitcoin::io::ErrorKind::WouldBlock.into(), + IOError::InvalidInput => bitcoin::io::ErrorKind::InvalidInput.into(), + IOError::InvalidData => bitcoin::io::ErrorKind::InvalidData.into(), + IOError::TimedOut => bitcoin::io::ErrorKind::TimedOut.into(), + IOError::WriteZero => bitcoin::io::ErrorKind::WriteZero.into(), + IOError::Interrupted => bitcoin::io::ErrorKind::Interrupted.into(), + IOError::UnexpectedEof => bitcoin::io::ErrorKind::UnexpectedEof.into(), + IOError::Other => bitcoin::io::ErrorKind::Other.into(), + } + } +} + +impl std::fmt::Display for IOError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + IOError::NotFound => write!(f, "NotFound"), + IOError::PermissionDenied => write!(f, "PermissionDenied"), + IOError::ConnectionRefused => write!(f, "ConnectionRefused"), + IOError::ConnectionReset => write!(f, "ConnectionReset"), + IOError::ConnectionAborted => write!(f, "ConnectionAborted"), + IOError::NotConnected => write!(f, "NotConnected"), + IOError::AddrInUse => write!(f, "AddrInUse"), + IOError::AddrNotAvailable => write!(f, "AddrNotAvailable"), + IOError::BrokenPipe => write!(f, "BrokenPipe"), + IOError::AlreadyExists => write!(f, "AlreadyExists"), + IOError::WouldBlock => write!(f, "WouldBlock"), + IOError::InvalidInput => write!(f, "InvalidInput"), + IOError::InvalidData => write!(f, "InvalidData"), + IOError::TimedOut => write!(f, "TimedOut"), + IOError::WriteZero => write!(f, "WriteZero"), + IOError::Interrupted => write!(f, "Interrupted"), + IOError::UnexpectedEof => write!(f, "UnexpectedEof"), + IOError::Other => write!(f, "Other"), + } + } +} + +#[async_trait] +pub trait FfiDynStoreTrait: Send + Sync { + async fn read_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> Result, IOError>; + async fn write_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> Result<(), IOError>; + async fn remove_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> Result<(), IOError>; + async fn list_async( + &self, primary_namespace: String, secondary_namespace: String, + ) -> Result, IOError>; + + fn read( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> Result, IOError>; + fn write( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> Result<(), IOError>; + fn remove( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> Result<(), IOError>; + fn list( + &self, primary_namespace: String, secondary_namespace: String, + ) -> Result, IOError>; +} + +#[derive(Clone)] +pub struct FfiDynStore { + pub(crate) inner: Arc, +} + +impl FfiDynStore { + pub fn from_store(store: Arc) -> Self { + Self { inner: store } + } + + pub fn from_kv_store(store: T) -> Self { + Self { inner: Arc::new(DynStoreWrapper(store)) } + } +} + +#[async_trait] +impl FfiDynStoreTrait for DynStoreWrapper { + async fn read_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> Result, IOError> { + DynStoreTrait::read_async(self, &primary_namespace, &secondary_namespace, &key) + .await + .map_err(IOError::from) + } + async fn write_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> Result<(), IOError> { + DynStoreTrait::write_async(self, &primary_namespace, &secondary_namespace, &key, buf) + .await + .map_err(IOError::from) + } + async fn remove_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> Result<(), IOError> { + DynStoreTrait::remove_async(self, &primary_namespace, &secondary_namespace, &key, lazy) + .await + .map_err(IOError::from) + } + async fn list_async( + &self, primary_namespace: String, secondary_namespace: String, + ) -> Result, IOError> { + DynStoreTrait::list_async(self, &primary_namespace, &secondary_namespace) + .await + .map_err(IOError::from) + } + + fn read( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> Result, IOError> { + DynStoreTrait::read(self, &primary_namespace, &secondary_namespace, &key) + .map_err(IOError::from) + } + fn write( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> Result<(), IOError> { + DynStoreTrait::write(self, &primary_namespace, &secondary_namespace, &key, buf) + .map_err(IOError::from) + } + fn remove( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> Result<(), IOError> { + DynStoreTrait::remove(self, &primary_namespace, &secondary_namespace, &key, lazy) + .map_err(IOError::from) + } + fn list( + &self, primary_namespace: String, secondary_namespace: String, + ) -> Result, IOError> { + DynStoreTrait::list(self, &primary_namespace, &secondary_namespace).map_err(IOError::from) + } +} + +impl KVStore for FfiDynStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, lightning::io::Error>> + 'static + Send { + let this = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + async move { + this.read_async(primary_namespace, secondary_namespace, key).await.map_err(|e| e.into()) + } + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + 'static + Send { + let this = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + async move { + this.write_async(primary_namespace, secondary_namespace, key, buf) + .await + .map_err(|e| e.into()) + } + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> impl Future> + 'static + Send { + let this = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + async move { + this.remove_async(primary_namespace, secondary_namespace, key, lazy) + .await + .map_err(|e| e.into()) + } + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> impl Future, lightning::io::Error>> + 'static + Send { + let this = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + async move { + this.list_async(primary_namespace, secondary_namespace).await.map_err(|e| e.into()) + } + } +} + +impl KVStoreSync for FfiDynStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, lightning::io::Error> { + FfiDynStoreTrait::read( + self.inner.as_ref(), + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + ) + .map_err(|e| e.into()) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), lightning::io::Error> { + FfiDynStoreTrait::write( + self.inner.as_ref(), + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + buf, + ) + .map_err(|e| e.into()) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Result<(), lightning::io::Error> { + FfiDynStoreTrait::remove( + self.inner.as_ref(), + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + lazy, + ) + .map_err(|e| e.into()) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, lightning::io::Error> { + FfiDynStoreTrait::list( + self.inner.as_ref(), + primary_namespace.to_string(), + secondary_namespace.to_string(), + ) + .map_err(|e| e.into()) + } +} impl UniffiCustomTypeConverter for PublicKey { type Builtin = String; diff --git a/src/lib.rs b/src/lib.rs index 2b60307b0..d296441e5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -134,6 +134,8 @@ use event::{EventHandler, EventQueue}; use fee_estimator::{ConfirmationTarget, FeeEstimator, OnchainFeeEstimator}; #[cfg(feature = "uniffi")] use ffi::*; +#[cfg(feature = "uniffi")] +pub use ffi::{FfiDynStore, FfiDynStoreTrait, IOError}; use gossip::GossipSource; use graph::NetworkGraph; use io::utils::write_node_metrics; @@ -160,11 +162,13 @@ use peer_store::{PeerInfo, PeerStore}; use rand::Rng; use runtime::Runtime; use types::{ - Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph, - HRNResolver, KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, - Wallet, + Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, Graph, HRNResolver, + KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, Wallet, +}; +pub use types::{ + ChannelDetails, CustomTlvRecord, DynStore, DynStoreWrapper, PeerDetails, SyncAndAsyncKVStore, + UserChannelId, }; -pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, SyncAndAsyncKVStore, UserChannelId}; pub use { bip39, bitcoin, lightning, lightning_invoice, lightning_liquidity, lightning_types, tokio, vss_client, diff --git a/src/types.rs b/src/types.rs index b5b1ffed7..a7ea89e48 100644 --- a/src/types.rs +++ b/src/types.rs @@ -53,7 +53,7 @@ where { } -pub(crate) trait DynStoreTrait: Send + Sync { +pub trait DynStoreTrait: Send + Sync { fn read_async( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> Pin, bitcoin::io::Error>> + Send + 'static>>; @@ -133,9 +133,11 @@ impl<'a> KVStoreSync for dyn DynStoreTrait + 'a { } } -pub(crate) type DynStore = dyn DynStoreTrait; +/// Type alias for any store that implements DynStoreTrait. +pub type DynStore = dyn DynStoreTrait; -pub(crate) struct DynStoreWrapper(pub(crate) T); +/// A wrapper that allows using any [`SyncAndAsyncKVStore`] implementor as a trait object. +pub struct DynStoreWrapper(pub T); impl DynStoreTrait for DynStoreWrapper { fn read_async( diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 0413b4552..656c25759 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -32,6 +32,7 @@ use ldk_node::io::sqlite_store::SqliteStore; use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus}; use ldk_node::{ Builder, CustomTlvRecord, Event, LightningBalance, Node, NodeError, PendingSweepBalance, + SyncAndAsyncKVStore, }; use lightning::io; use lightning::ln::msgs::SocketAddress; @@ -485,7 +486,7 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> let node = match config.store_type { TestStoreType::TestSyncStore => { let kv_store = TestSyncStore::new(config.node_config.storage_dir_path.into()); - builder.build_with_store(config.node_entropy.into(), kv_store).unwrap() + build_node_with_store(&builder, config.node_entropy, kv_store) }, TestStoreType::Sqlite => builder.build(config.node_entropy.into()).unwrap(), }; @@ -1638,3 +1639,19 @@ impl TestSyncStoreInner { self.do_list(primary_namespace, secondary_namespace) } } + +pub(crate) fn build_node_with_store( + builder: &Builder, entropy: NodeEntropy, store: S, +) -> TestNode { + #[cfg(feature = "uniffi")] + { + use ldk_node::FfiDynStore; + builder + .build_with_store(entropy.into(), Arc::new(FfiDynStore::from_kv_store(store))) + .unwrap() + } + #[cfg(not(feature = "uniffi"))] + { + builder.build_with_store(entropy, store).unwrap() + } +} diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index c2f68b907..2c7bf6340 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -235,8 +235,7 @@ async fn start_stop_reinit() { setup_builder!(builder, config.node_config); builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - let node = - builder.build_with_store(config.node_entropy.into(), test_sync_store.clone()).unwrap(); + let node = build_node_with_store(&builder, config.node_entropy, test_sync_store.clone()); node.start().unwrap(); let expected_node_id = node.node_id(); @@ -275,7 +274,7 @@ async fn start_stop_reinit() { builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); let reinitialized_node = - builder.build_with_store(config.node_entropy.into(), test_sync_store).unwrap(); + build_node_with_store(&builder, config.node_entropy, test_sync_store.clone()); reinitialized_node.start().unwrap(); assert_eq!(reinitialized_node.node_id(), expected_node_id); From 2707aa1b2eecbb8457dd6421008fcd41c3bc9c5d Mon Sep 17 00:00:00 2001 From: Enigbe Date: Fri, 20 Feb 2026 21:05:09 +0100 Subject: [PATCH 04/10] Add integration and FFI tests for tiered storage - Add Rust integration test verifying correct routing to storage tiers - Add Python in-memory KV store and FFI test for tiered storage --- benches/payments.rs | 1 + bindings/python/src/ldk_node/kv_store.py | 115 +++++++ bindings/python/src/ldk_node/test_ldk_node.py | 323 +++++++++++------- tests/common/mod.rs | 54 ++- tests/integration_tests_rust.rs | 86 +++++ 5 files changed, 448 insertions(+), 131 deletions(-) create mode 100644 bindings/python/src/ldk_node/kv_store.py diff --git a/benches/payments.rs b/benches/payments.rs index 52769d794..8ded1399e 100644 --- a/benches/payments.rs +++ b/benches/payments.rs @@ -127,6 +127,7 @@ fn payment_benchmark(c: &mut Criterion) { true, false, common::TestStoreType::Sqlite, + common::TestStoreType::Sqlite, ); let runtime = diff --git a/bindings/python/src/ldk_node/kv_store.py b/bindings/python/src/ldk_node/kv_store.py new file mode 100644 index 000000000..d871d7a6d --- /dev/null +++ b/bindings/python/src/ldk_node/kv_store.py @@ -0,0 +1,115 @@ +import threading + +from abc import ABC, abstractmethod +from typing import List + +from ldk_node import IoError + +class AbstractKvStore(ABC): + @abstractmethod + async def read_async(self, primary_namespace: "str",secondary_namespace: "str",key: "str") -> "typing.List[int]": + pass + + @abstractmethod + async def write_async(self, primary_namespace: "str",secondary_namespace: "str",key: "str",buf: "typing.List[int]") -> None: + pass + + @abstractmethod + async def remove_async(self, primary_namespace: "str",secondary_namespace: "str",key: "str",lazy: "bool") -> None: + pass + + @abstractmethod + async def list_async(self, primary_namespace: "str",secondary_namespace: "str") -> "typing.List[str]": + pass + + @abstractmethod + def read(self, primary_namespace: "str",secondary_namespace: "str",key: "str") -> "typing.List[int]": + pass + + @abstractmethod + def write(self, primary_namespace: "str",secondary_namespace: "str",key: "str",buf: "typing.List[int]") -> None: + pass + + @abstractmethod + def remove(self, primary_namespace: "str",secondary_namespace: "str",key: "str",lazy: "bool") -> None: + pass + + @abstractmethod + def list(self, primary_namespace: "str",secondary_namespace: "str") -> "typing.List[str]": + pass + +class TestKvStore(AbstractKvStore): + def __init__(self, name: str): + self.name = name + # Storage structure: {(primary_ns, secondary_ns): {key: [bytes]}} + self.storage = {} + self._lock = threading.Lock() + + def dump(self): + print(f"\n[{self.name}] Store contents:") + for (primary_ns, secondary_ns), keys_dict in self.storage.items(): + print(f" Namespace: ({primary_ns!r}, {secondary_ns!r})") + for key, data in keys_dict.items(): + print(f" Key: {key!r} -> {len(data)} bytes") + # Optionally show first few bytes + preview = data[:20] if len(data) > 20 else data + print(f" Data preview: {preview}...") + + def read(self, primary_namespace: str, secondary_namespace: str, key: str) -> List[int]: + with self._lock: + print(f"[{self.name}] READ: {primary_namespace}/{secondary_namespace}/{key}") + namespace_key = (primary_namespace, secondary_namespace) + + if namespace_key not in self.storage: + print(f" -> namespace not found, keys: {list(self.storage.keys())}") + raise IoError.NotFound(f"Namespace not found: {primary_namespace}/{secondary_namespace}") + + if key not in self.storage[namespace_key]: + print(f" -> key not found, keys: {list(self.storage[namespace_key].keys())}") + raise IoError.NotFound(f"Key not found: {key}") + + data = self.storage[namespace_key][key] + print(f" -> returning {len(data)} bytes") + return data + + def write(self, primary_namespace: str, secondary_namespace: str, key: str, buf: List[int]) -> None: + with self._lock: + namespace_key = (primary_namespace, secondary_namespace) + if namespace_key not in self.storage: + self.storage[namespace_key] = {} + + self.storage[namespace_key][key] = buf.copy() + + def remove(self, primary_namespace: str, secondary_namespace: str, key: str, lazy: bool) -> None: + with self._lock: + namespace_key = (primary_namespace, secondary_namespace) + if namespace_key not in self.storage: + raise IoError.NotFound(f"Namespace not found: {primary_namespace}/{secondary_namespace}") + + if key not in self.storage[namespace_key]: + raise IoError.NotFound(f"Key not found: {key}") + + del self.storage[namespace_key][key] + + if not self.storage[namespace_key]: + del self.storage[namespace_key] + + def list(self, primary_namespace: str, secondary_namespace: str) -> List[str]: + with self._lock: + namespace_key = (primary_namespace, secondary_namespace) + if namespace_key in self.storage: + return list(self.storage[namespace_key].keys()) + return [] + + async def read_async(self, primary_namespace: str, secondary_namespace: str, key: str) -> List[int]: + return self.read(primary_namespace, secondary_namespace, key) + + async def write_async(self, primary_namespace: str, secondary_namespace: str, key: str, buf: List[int]) -> None: + self.write(primary_namespace, secondary_namespace, key, buf) + + async def remove_async(self, primary_namespace: str, secondary_namespace: str, key: str, lazy: bool) -> None: + self.remove(primary_namespace, secondary_namespace, key, lazy) + + async def list_async(self, primary_namespace: str, secondary_namespace: str) -> List[str]: + return self.list(primary_namespace, secondary_namespace) + \ No newline at end of file diff --git a/bindings/python/src/ldk_node/test_ldk_node.py b/bindings/python/src/ldk_node/test_ldk_node.py index 0b73e6a47..c7e3937a4 100644 --- a/bindings/python/src/ldk_node/test_ldk_node.py +++ b/bindings/python/src/ldk_node/test_ldk_node.py @@ -5,13 +5,67 @@ import os import re import requests +import asyncio +import threading +import ldk_node from ldk_node import * +from kv_store import TestKvStore DEFAULT_ESPLORA_SERVER_URL = "http://127.0.0.1:3002" DEFAULT_TEST_NETWORK = Network.REGTEST DEFAULT_BITCOIN_CLI_BIN = "bitcoin-cli" +class NodeSetup: + def __init__(self, node, node_id, tmp_dir, listening_addresses, stores=None): + self.node = node + self.node_id = node_id + self.tmp_dir = tmp_dir + self.listening_addresses = listening_addresses + self.stores = stores # (primary, backup, ephemeral) or None + + def cleanup(self): + self.node.stop() + time.sleep(1) + self.tmp_dir.cleanup() + +def setup_two_nodes(esplora_endpoint, port_1=2323, port_2=2324, use_tier_store=False): + # Setup Node 1 + tmp_dir_1 = tempfile.TemporaryDirectory("_ldk_node_1") + print("TMP DIR 1:", tmp_dir_1.name) + + listening_addresses_1 = [f"127.0.0.1:{port_1}"] + if use_tier_store: + node_1, stores_1 = setup_node_with_tier_store(tmp_dir_1.name, esplora_endpoint, listening_addresses_1) + else: + node_1 = setup_node(tmp_dir_1.name, esplora_endpoint, listening_addresses_1) + stores_1 = None + + node_1.start() + node_id_1 = node_1.node_id() + print("Node ID 1:", node_id_1) + + setup_1 = NodeSetup(node_1, node_id_1, tmp_dir_1, listening_addresses_1, stores_1) + + # Setup Node 2 + tmp_dir_2 = tempfile.TemporaryDirectory("_ldk_node_2") + print("TMP DIR 2:", tmp_dir_2.name) + + listening_addresses_2 = [f"127.0.0.1:{port_2}"] + if use_tier_store: + node_2, stores_2 = setup_node_with_tier_store(tmp_dir_2.name, esplora_endpoint, listening_addresses_2) + else: + node_2 = setup_node(tmp_dir_2.name, esplora_endpoint, listening_addresses_2) + stores_2 = None + + node_2.start() + node_id_2 = node_2.node_id() + print("Node ID 2:", node_id_2) + + setup_2 = NodeSetup(node_2, node_id_2, tmp_dir_2, listening_addresses_2, stores_2) + + return setup_1, setup_2 + def bitcoin_cli(cmd): args = [] @@ -95,7 +149,6 @@ def send_to_address(address, amount_sats): print("SEND TX:", res) return res - def setup_node(tmp_dir, esplora_endpoint, listening_addresses): mnemonic = generate_entropy_mnemonic(None) node_entropy = NodeEntropy.from_bip39_mnemonic(mnemonic, None) @@ -107,6 +160,118 @@ def setup_node(tmp_dir, esplora_endpoint, listening_addresses): builder.set_listening_addresses(listening_addresses) return builder.build(node_entropy) +def setup_node_with_tier_store(tmp_dir, esplora_endpoint, listening_addresses): + mnemonic = generate_entropy_mnemonic(None) + node_entropy = NodeEntropy.from_bip39_mnemonic(mnemonic, None) + config = default_config() + + primary = TestKvStore("primary") + backup = TestKvStore("backup") + ephemeral = TestKvStore("ephemeral") + + # Set event loop for async Python callbacks from Rust + # (https://mozilla.github.io/uniffi-rs/0.27/futures.html#python-uniffi_set_event_loop) + loop = asyncio.new_event_loop() + + def run_loop(): + asyncio.set_event_loop(loop) + loop.run_forever() + + loop_thread = threading.Thread(target=run_loop, daemon=True) + loop_thread.start() + ldk_node.uniffi_set_event_loop(loop) + + builder = Builder.from_config(config) + builder.set_storage_dir_path(tmp_dir) + builder.set_chain_source_esplora(esplora_endpoint, None) + builder.set_network(DEFAULT_TEST_NETWORK) + builder.set_listening_addresses(listening_addresses) + builder.set_backup_store(FfiDynStore.from_store(backup)) + builder.set_ephemeral_store(FfiDynStore.from_store(ephemeral)) + + return builder.build_with_store(node_entropy, FfiDynStore.from_store(primary)), (primary, backup, ephemeral) + +def do_channel_full_cycle(setup_1, setup_2, esplora_endpoint): + # Fund both nodes + address_1 = setup_1.node.onchain_payment().new_address() + txid_1 = send_to_address(address_1, 100000) + address_2 = setup_2.node.onchain_payment().new_address() + txid_2 = send_to_address(address_2, 100000) + + wait_for_tx(esplora_endpoint, txid_1) + wait_for_tx(esplora_endpoint, txid_2) + mine_and_wait(esplora_endpoint, 6) + + setup_1.node.sync_wallets() + setup_2.node.sync_wallets() + + # Verify balances + spendable_balance_1 = setup_1.node.list_balances().spendable_onchain_balance_sats + spendable_balance_2 = setup_2.node.list_balances().spendable_onchain_balance_sats + assert spendable_balance_1 == 100000 + assert spendable_balance_2 == 100000 + + # Open channel + setup_1.node.open_channel(setup_2.node_id, setup_2.listening_addresses[0], 50000, None, None) + + channel_pending_event_1 = setup_1.node.wait_next_event() + assert isinstance(channel_pending_event_1, Event.CHANNEL_PENDING) + setup_1.node.event_handled() + + channel_pending_event_2 = setup_2.node.wait_next_event() + assert isinstance(channel_pending_event_2, Event.CHANNEL_PENDING) + setup_2.node.event_handled() + + funding_txid = channel_pending_event_1.funding_txo.txid + wait_for_tx(esplora_endpoint, funding_txid) + mine_and_wait(esplora_endpoint, 6) + + setup_1.node.sync_wallets() + setup_2.node.sync_wallets() + + channel_ready_event_1 = setup_1.node.wait_next_event() + assert isinstance(channel_ready_event_1, Event.CHANNEL_READY) + setup_1.node.event_handled() + + channel_ready_event_2 = setup_2.node.wait_next_event() + assert isinstance(channel_ready_event_2, Event.CHANNEL_READY) + setup_2.node.event_handled() + + # Make payment + description = Bolt11InvoiceDescription.DIRECT("asdf") + invoice = setup_2.node.bolt11_payment().receive(2500000, description, 9217) + setup_1.node.bolt11_payment().send(invoice, None) + + payment_successful_event_1 = setup_1.node.wait_next_event() + assert isinstance(payment_successful_event_1, Event.PAYMENT_SUCCESSFUL) + setup_1.node.event_handled() + + payment_received_event_2 = setup_2.node.wait_next_event() + assert isinstance(payment_received_event_2, Event.PAYMENT_RECEIVED) + setup_2.node.event_handled() + + # Close channel + setup_2.node.close_channel(channel_ready_event_2.user_channel_id, setup_1.node_id) + + channel_closed_event_1 = setup_1.node.wait_next_event() + assert isinstance(channel_closed_event_1, Event.CHANNEL_CLOSED) + setup_1.node.event_handled() + + channel_closed_event_2 = setup_2.node.wait_next_event() + assert isinstance(channel_closed_event_2, Event.CHANNEL_CLOSED) + setup_2.node.event_handled() + + mine_and_wait(esplora_endpoint, 1) + setup_1.node.sync_wallets() + setup_2.node.sync_wallets() + + # Verify final balances + spendable_balance_after_close_1 = setup_1.node.list_balances().spendable_onchain_balance_sats + assert spendable_balance_after_close_1 > 95000 + assert spendable_balance_after_close_1 < 100000 + spendable_balance_after_close_2 = setup_2.node.list_balances().spendable_onchain_balance_sats + assert spendable_balance_after_close_2 == 102500 + def get_esplora_endpoint(): if os.environ.get('ESPLORA_ENDPOINT'): return str(os.environ['ESPLORA_ENDPOINT']) @@ -122,132 +287,36 @@ def setUp(self): def test_channel_full_cycle(self): esplora_endpoint = get_esplora_endpoint() - - ## Setup Node 1 - tmp_dir_1 = tempfile.TemporaryDirectory("_ldk_node_1") - print("TMP DIR 1:", tmp_dir_1.name) - - listening_addresses_1 = ["127.0.0.1:2323"] - node_1 = setup_node(tmp_dir_1.name, esplora_endpoint, listening_addresses_1) - node_1.start() - node_id_1 = node_1.node_id() - print("Node ID 1:", node_id_1) - - # Setup Node 2 - tmp_dir_2 = tempfile.TemporaryDirectory("_ldk_node_2") - print("TMP DIR 2:", tmp_dir_2.name) - - listening_addresses_2 = ["127.0.0.1:2324"] - node_2 = setup_node(tmp_dir_2.name, esplora_endpoint, listening_addresses_2) - node_2.start() - node_id_2 = node_2.node_id() - print("Node ID 2:", node_id_2) - - address_1 = node_1.onchain_payment().new_address() - txid_1 = send_to_address(address_1, 100000) - address_2 = node_2.onchain_payment().new_address() - txid_2 = send_to_address(address_2, 100000) - - wait_for_tx(esplora_endpoint, txid_1) - wait_for_tx(esplora_endpoint, txid_2) - - mine_and_wait(esplora_endpoint, 6) - - node_1.sync_wallets() - node_2.sync_wallets() - - spendable_balance_1 = node_1.list_balances().spendable_onchain_balance_sats - spendable_balance_2 = node_2.list_balances().spendable_onchain_balance_sats - total_balance_1 = node_1.list_balances().total_onchain_balance_sats - total_balance_2 = node_2.list_balances().total_onchain_balance_sats - - print("SPENDABLE 1:", spendable_balance_1) - self.assertEqual(spendable_balance_1, 100000) - - print("SPENDABLE 2:", spendable_balance_2) - self.assertEqual(spendable_balance_2, 100000) - - print("TOTAL 1:", total_balance_1) - self.assertEqual(total_balance_1, 100000) - - print("TOTAL 2:", total_balance_2) - self.assertEqual(total_balance_2, 100000) - - node_1.open_channel(node_id_2, listening_addresses_2[0], 50000, None, None) - - channel_pending_event_1 = node_1.wait_next_event() - assert isinstance(channel_pending_event_1, Event.CHANNEL_PENDING) - print("EVENT:", channel_pending_event_1) - node_1.event_handled() - - channel_pending_event_2 = node_2.wait_next_event() - assert isinstance(channel_pending_event_2, Event.CHANNEL_PENDING) - print("EVENT:", channel_pending_event_2) - node_2.event_handled() - - funding_txid = channel_pending_event_1.funding_txo.txid - wait_for_tx(esplora_endpoint, funding_txid) - mine_and_wait(esplora_endpoint, 6) - - node_1.sync_wallets() - node_2.sync_wallets() - - channel_ready_event_1 = node_1.wait_next_event() - assert isinstance(channel_ready_event_1, Event.CHANNEL_READY) - print("EVENT:", channel_ready_event_1) - print("funding_txo:", funding_txid) - node_1.event_handled() - - channel_ready_event_2 = node_2.wait_next_event() - assert isinstance(channel_ready_event_2, Event.CHANNEL_READY) - print("EVENT:", channel_ready_event_2) - node_2.event_handled() - - description = Bolt11InvoiceDescription.DIRECT("asdf") - invoice = node_2.bolt11_payment().receive(2500000, description, 9217) - node_1.bolt11_payment().send(invoice, None) - - payment_successful_event_1 = node_1.wait_next_event() - assert isinstance(payment_successful_event_1, Event.PAYMENT_SUCCESSFUL) - print("EVENT:", payment_successful_event_1) - node_1.event_handled() - - payment_received_event_2 = node_2.wait_next_event() - assert isinstance(payment_received_event_2, Event.PAYMENT_RECEIVED) - print("EVENT:", payment_received_event_2) - node_2.event_handled() - - node_2.close_channel(channel_ready_event_2.user_channel_id, node_id_1) - - channel_closed_event_1 = node_1.wait_next_event() - assert isinstance(channel_closed_event_1, Event.CHANNEL_CLOSED) - print("EVENT:", channel_closed_event_1) - node_1.event_handled() - - channel_closed_event_2 = node_2.wait_next_event() - assert isinstance(channel_closed_event_2, Event.CHANNEL_CLOSED) - print("EVENT:", channel_closed_event_2) - node_2.event_handled() - - mine_and_wait(esplora_endpoint, 1) - - node_1.sync_wallets() - node_2.sync_wallets() - - spendable_balance_after_close_1 = node_1.list_balances().spendable_onchain_balance_sats - assert spendable_balance_after_close_1 > 95000 - assert spendable_balance_after_close_1 < 100000 - spendable_balance_after_close_2 = node_2.list_balances().spendable_onchain_balance_sats - self.assertEqual(spendable_balance_after_close_2, 102500) - - # Stop nodes - node_1.stop() - node_2.stop() - - # Cleanup - time.sleep(1) # Wait a sec so our logs can finish writing - tmp_dir_1.cleanup() - tmp_dir_2.cleanup() + setup_1, setup_2 = setup_two_nodes(esplora_endpoint) + + do_channel_full_cycle(setup_1, setup_2, esplora_endpoint) + + setup_1.cleanup() + setup_2.cleanup() + + def test_tier_store(self): + esplora_endpoint = get_esplora_endpoint() + setup_1, setup_2 = setup_two_nodes(esplora_endpoint, port_1=2325, port_2=2326, use_tier_store=True) + + do_channel_full_cycle(setup_1, setup_2, esplora_endpoint) + + primary, backup, ephemeral = setup_1.stores + + # Wait for async backup + time.sleep(2) + + self.assertGreater(len(primary.storage), 0, "Primary should have data") + self.assertGreater(len(backup.storage), 0, "Backup should have data") + self.assertEqual(list(primary.storage.keys()), list(backup.storage.keys()), + "Backup should mirror primary") + + self.assertGreater(len(ephemeral.storage), 0, "Ephemeral should have data") + ephemeral_keys = [key for namespace in ephemeral.storage.values() for key in namespace.keys()] + has_scorer_or_graph = any(key in ['scorer', 'network_graph'] for key in ephemeral_keys) + self.assertTrue(has_scorer_or_graph, "Ephemeral should contain scorer or network_graph data") + + setup_1.cleanup() + setup_2.cleanup() if __name__ == '__main__': unittest.main() diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 656c25759..6964a9429 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -34,6 +34,8 @@ use ldk_node::{ Builder, CustomTlvRecord, Event, LightningBalance, Node, NodeError, PendingSweepBalance, SyncAndAsyncKVStore, }; +#[cfg(feature = "uniffi")] +use ldk_node::FfiDynStore; use lightning::io; use lightning::ln::msgs::SocketAddress; use lightning::routing::gossip::NodeAlias; @@ -328,10 +330,20 @@ pub(crate) enum TestChainSource<'a> { BitcoindRestSync(&'a BitcoinD), } -#[derive(Clone, Copy)] +#[cfg(feature = "uniffi")] +type TestDynStore = Arc; +#[cfg(not(feature = "uniffi"))] +type TestDynStore = Arc; + +#[derive(Clone)] pub(crate) enum TestStoreType { TestSyncStore, Sqlite, + TierStore { + primary: TestDynStore, + backup: Option, + ephemeral: Option, + }, } impl Default for TestStoreType { @@ -382,6 +394,30 @@ macro_rules! setup_builder { pub(crate) use setup_builder; +pub(crate) fn create_tier_stores(base_path: PathBuf) -> (TestDynStore, TestDynStore, TestDynStore) { + let primary = SqliteStore::new( + base_path.join("primary"), + Some("primary_db".to_string()), + Some("primary_kv".to_string()), + ) + .unwrap(); + let backup = FilesystemStore::new(base_path.join("backup")); + let ephemeral = TestStore::new(false); + + #[cfg(feature = "uniffi")] + { + ( + Arc::new(FfiDynStore::from_kv_store(primary)), + Arc::new(FfiDynStore::from_kv_store(backup)), + Arc::new(FfiDynStore::from_kv_store(ephemeral)), + ) + } + #[cfg(not(feature = "uniffi"))] + { + (primary, backup, ephemeral) + } +} + pub(crate) fn setup_two_nodes( chain_source: &TestChainSource, allow_0conf: bool, anchor_channels: bool, anchors_trusted_no_reserve: bool, @@ -392,21 +428,22 @@ pub(crate) fn setup_two_nodes( anchor_channels, anchors_trusted_no_reserve, TestStoreType::TestSyncStore, + TestStoreType::TestSyncStore, ) } pub(crate) fn setup_two_nodes_with_store( chain_source: &TestChainSource, allow_0conf: bool, anchor_channels: bool, - anchors_trusted_no_reserve: bool, store_type: TestStoreType, + anchors_trusted_no_reserve: bool, store_type_a: TestStoreType, store_type_b: TestStoreType, ) -> (TestNode, TestNode) { println!("== Node A =="); let mut config_a = random_config(anchor_channels); - config_a.store_type = store_type; + config_a.store_type = store_type_a; let node_a = setup_node(chain_source, config_a); println!("\n== Node B =="); let mut config_b = random_config(anchor_channels); - config_b.store_type = store_type; + config_b.store_type = store_type_b; if allow_0conf { config_b.node_config.trusted_peers_0conf.push(node_a.node_id()); } @@ -489,6 +526,15 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> build_node_with_store(&builder, config.node_entropy, kv_store) }, TestStoreType::Sqlite => builder.build(config.node_entropy.into()).unwrap(), + TestStoreType::TierStore { primary, backup, ephemeral } => { + if let Some(backup) = backup { + builder.set_backup_store(backup); + } + if let Some(ephemeral) = ephemeral { + builder.set_ephemeral_store(ephemeral); + } + builder.build_with_store(config.node_entropy.into(), primary).unwrap() + }, }; if config.recovery_mode { diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 2c7bf6340..c24d58e91 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -37,10 +37,17 @@ use ldk_node::{Builder, Event, NodeError}; use lightning::ln::channelmanager::PaymentId; use lightning::routing::gossip::{NodeAlias, NodeId}; use lightning::routing::router::RouteParametersConfig; +use lightning::util::persist::{ + KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, +}; use lightning_invoice::{Bolt11InvoiceDescription, Description}; use lightning_types::payment::{PaymentHash, PaymentPreimage}; use log::LevelFilter; +use crate::common::{create_tier_stores, random_storage_path, setup_two_nodes_with_store}; + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn channel_full_cycle() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); @@ -50,6 +57,85 @@ async fn channel_full_cycle() { .await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn channel_full_cycle_tier_store() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + let (primary_a, backup_a, ephemeral_a) = create_tier_stores(random_storage_path()); + let (primary_b, backup_b, ephemeral_b) = create_tier_stores(random_storage_path()); + + let (node_a, node_b) = setup_two_nodes_with_store( + &chain_source, + false, + true, + false, + TestStoreType::TierStore { + primary: Arc::clone(&primary_a), + backup: Some(Arc::clone(&backup_a)), + ephemeral: Some(Arc::clone(&ephemeral_a)), + }, + TestStoreType::TierStore { + primary: Arc::clone(&primary_b), + backup: Some(Arc::clone(&backup_b)), + ephemeral: Some(Arc::clone(&ephemeral_b)), + }, + ); + do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false) + .await; + + // Verify Primary store contains channel manager data + let primary_channel_manager = KVStoreSync::read( + primary_a.as_ref(), + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + assert!(primary_channel_manager.is_ok(), "Primary should have channel manager data"); + + // Verify Primary store contains payment info + let primary_payments = KVStoreSync::list(primary_a.as_ref(), "payments", ""); + assert!(primary_payments.is_ok(), "Primary should have payment data"); + assert!(!primary_payments.unwrap().is_empty(), "Primary should have payment entries"); + + // Verify Backup store synced critical data + let backup_channel_manager = KVStoreSync::read( + backup_a.as_ref(), + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + assert!(backup_channel_manager.is_ok(), "Backup should have synced channel manager"); + + // Verify backup is not empty + let backup_all_keys = KVStoreSync::list(backup_a.as_ref(), "", "").unwrap(); + assert!(!backup_all_keys.is_empty(), "Backup store should not be empty"); + + // Verify Ephemeral does NOT have channel manager + let ephemeral_channel_manager = KVStoreSync::read( + ephemeral_a.as_ref(), + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + assert!(ephemeral_channel_manager.is_err(), "Ephemeral should NOT have channel manager"); + + // Verify Ephemeral does NOT have payment info + let ephemeral_payments = KVStoreSync::list(ephemeral_a.as_ref(), "payments", ""); + assert!( + ephemeral_payments.is_err() || ephemeral_payments.unwrap().is_empty(), + "Ephemeral should NOT have payment data" + ); + + //Verify Ephemeral does have network graph + let ephemeral_network_graph = KVStoreSync::read( + ephemeral_a.as_ref(), + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ); + assert!(ephemeral_network_graph.is_ok(), "Ephemeral should have network graph"); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn channel_full_cycle_force_close() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); From 1ba4831c195fd609bd0c496f58ead52eb51b27dd Mon Sep 17 00:00:00 2001 From: Enigbe Date: Tue, 24 Feb 2026 22:48:29 +0100 Subject: [PATCH 05/10] Add per-key version-locked writes to FfiDynStore Introduce FfiDynStoreInner with per-key write version locks that ensure write ordering and skip stale versions in both sync and async code paths. Test changes: - Unify tier store test helpers to use TestSyncStore for all tiers, replacing mixed SqliteStore/FilesystemStore/TestStore usage that caused test hangs due to TestStore's async write blocking - Split build_node_with_store into cfg-gated versions for uniffi vs non-uniffi feature flags --- src/ffi/types.rs | 315 +++++++++++++++++++++++++++++--- tests/common/mod.rs | 81 +++++--- tests/integration_tests_rust.rs | 53 ++++-- 3 files changed, 382 insertions(+), 67 deletions(-) diff --git a/src/ffi/types.rs b/src/ffi/types.rs index 304a493a9..a9a1af45d 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -10,11 +10,13 @@ // // Make sure to add any re-exported items that need to be used in uniffi below. +use std::collections::HashMap; use std::convert::TryInto; use std::future::Future; use std::ops::Deref; use std::str::FromStr; -use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; use std::time::Duration; use async_trait::async_trait; @@ -54,6 +56,7 @@ pub use crate::config::{ pub use crate::entropy::{generate_entropy_mnemonic, EntropyError, NodeEntropy, WordCount}; use crate::error::Error; pub use crate::graph::{ChannelInfo, ChannelUpdateInfo, NodeAnnouncementInfo, NodeInfo}; +use crate::io::utils::check_namespace_key_validity; pub use crate::liquidity::{LSPS1OrderStatus, LSPS2ServiceConfig}; pub use crate::logger::{LogLevel, LogRecord, LogWriter}; pub use crate::payment::store::{ @@ -194,16 +197,42 @@ pub trait FfiDynStoreTrait: Send + Sync { #[derive(Clone)] pub struct FfiDynStore { - pub(crate) inner: Arc, + inner: Arc, + next_write_version: Arc, } impl FfiDynStore { pub fn from_store(store: Arc) -> Self { - Self { inner: store } + let inner = Arc::new(FfiDynStoreInner::new(store)); + Self { inner, next_write_version: Arc::new(AtomicU64::new(1)) } + } + + fn build_locking_key( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> String { + if primary_namespace.is_empty() { + key.to_owned() + } else { + format!("{}#{}#{}", primary_namespace, secondary_namespace, key) + } + } + + fn get_new_version_and_lock_ref( + &self, locking_key: String, + ) -> (Arc>, u64) { + let version = self.next_write_version.fetch_add(1, Ordering::Relaxed); + if version == u64::MAX { + panic!("FfiDynStore version counter overflowed"); + } + + let inner_lock_ref = self.inner.get_inner_lock_ref(locking_key); + + (inner_lock_ref, version) } pub fn from_kv_store(store: T) -> Self { - Self { inner: Arc::new(DynStoreWrapper(store)) } + let store = FfiDynStoreInner::new(Arc::new(DynStoreWrapper(store))); + Self { inner: Arc::new(store), next_write_version: Arc::new(AtomicU64::new(1)) } } } @@ -272,7 +301,9 @@ impl KVStore for FfiDynStore { let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); async move { - this.read_async(primary_namespace, secondary_namespace, key).await.map_err(|e| e.into()) + this.read_internal_async(primary_namespace, secondary_namespace, key) + .await + .map_err(|e| e.into()) } } @@ -280,13 +311,25 @@ impl KVStore for FfiDynStore { &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> impl Future> + 'static + Send { let this = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); + + let locking_key = self.build_locking_key(&primary_namespace, &secondary_namespace, &key); + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); async move { - this.write_async(primary_namespace, secondary_namespace, key, buf) - .await - .map_err(|e| e.into()) + this.write_internal_async( + inner_lock_ref, + locking_key, + version, + primary_namespace, + secondary_namespace, + key, + buf, + ) + .await + .map_err(|e| e.into()) } } @@ -294,13 +337,26 @@ impl KVStore for FfiDynStore { &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> impl Future> + 'static + Send { let this = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); + + let locking_key = self.build_locking_key(&primary_namespace, &secondary_namespace, &key); + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); + async move { - this.remove_async(primary_namespace, secondary_namespace, key, lazy) - .await - .map_err(|e| e.into()) + this.remove_internal_async( + inner_lock_ref, + locking_key, + version, + primary_namespace, + secondary_namespace, + key, + lazy, + ) + .await + .map_err(|e| e.into()) } } @@ -308,10 +364,14 @@ impl KVStore for FfiDynStore { &self, primary_namespace: &str, secondary_namespace: &str, ) -> impl Future, lightning::io::Error>> + 'static + Send { let this = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); + async move { - this.list_async(primary_namespace, secondary_namespace).await.map_err(|e| e.into()) + this.list_internal_async(primary_namespace, secondary_namespace) + .await + .map_err(|e| e.into()) } } } @@ -320,50 +380,251 @@ impl KVStoreSync for FfiDynStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> Result, lightning::io::Error> { - FfiDynStoreTrait::read( - self.inner.as_ref(), + self.inner.read_internal( primary_namespace.to_string(), secondary_namespace.to_string(), key.to_string(), ) - .map_err(|e| e.into()) } fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> Result<(), lightning::io::Error> { - FfiDynStoreTrait::write( - self.inner.as_ref(), + let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); + + self.inner.write_internal( + inner_lock_ref, + locking_key, + version, primary_namespace.to_string(), secondary_namespace.to_string(), key.to_string(), buf, ) - .map_err(|e| e.into()) } fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> Result<(), lightning::io::Error> { - FfiDynStoreTrait::remove( - self.inner.as_ref(), + let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); + + self.inner.remove_internal( + inner_lock_ref, + locking_key, + version, primary_namespace.to_string(), secondary_namespace.to_string(), key.to_string(), lazy, ) - .map_err(|e| e.into()) } fn list( &self, primary_namespace: &str, secondary_namespace: &str, ) -> Result, lightning::io::Error> { - FfiDynStoreTrait::list( - self.inner.as_ref(), - primary_namespace.to_string(), - secondary_namespace.to_string(), - ) - .map_err(|e| e.into()) + self.inner.list_internal(primary_namespace.to_string(), secondary_namespace.to_string()) + } +} + +struct FfiDynStoreInner { + ffi_store: Arc, + write_version_locks: Mutex>>>, +} + +impl FfiDynStoreInner { + fn new(ffi_store: Arc) -> Self { + Self { ffi_store, write_version_locks: Mutex::new(HashMap::new()) } + } + + fn get_inner_lock_ref(&self, locking_key: String) -> Arc> { + let mut outer_lock = self.write_version_locks.lock().unwrap(); + Arc::clone(&outer_lock.entry(locking_key).or_default()) + } + + async fn read_internal_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> bitcoin::io::Result> { + check_namespace_key_validity(&primary_namespace, &secondary_namespace, Some(&key), "read")?; + self.ffi_store + .read_async(primary_namespace, secondary_namespace, key) + .await + .map_err(|e| e.into()) + } + + fn read_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> bitcoin::io::Result> { + check_namespace_key_validity(&primary_namespace, &secondary_namespace, Some(&key), "read")?; + self.ffi_store.read(primary_namespace, secondary_namespace, key).map_err(|e| e.into()) + } + + async fn write_internal_async( + &self, inner_lock_ref: Arc>, locking_key: String, version: u64, + primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> bitcoin::io::Result<()> { + check_namespace_key_validity( + &primary_namespace, + &secondary_namespace, + Some(&key), + "write", + )?; + + let store = Arc::clone(&self.ffi_store); + + self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { + store + .write_async(primary_namespace, secondary_namespace, key, buf) + .await + .map_err(|e| >::into(e))?; + + Ok(()) + }) + .await + } + + fn write_internal( + &self, inner_lock_ref: Arc>, locking_key: String, version: u64, + primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> bitcoin::io::Result<()> { + check_namespace_key_validity( + &primary_namespace, + &secondary_namespace, + Some(&key), + "write", + )?; + + let res = { + let mut last_written_version = inner_lock_ref.blocking_lock(); + if version <= *last_written_version { + Ok(()) + } else { + self.ffi_store + .write(primary_namespace, secondary_namespace, key, buf) + .map_err(|e| e.into()) + .map(|_| { + *last_written_version = version; + }) + } + }; + + self.clean_locks(&inner_lock_ref, locking_key); + res + } + + async fn remove_internal_async( + &self, inner_lock_ref: Arc>, locking_key: String, version: u64, + primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> bitcoin::io::Result<()> { + check_namespace_key_validity( + &primary_namespace, + &secondary_namespace, + Some(&key), + "remove", + )?; + + let store = Arc::clone(&self.ffi_store); + + self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { + store + .remove_async(primary_namespace, secondary_namespace, key, lazy) + .await + .map_err(|e| >::into(e))?; + + Ok(()) + }) + .await + } + + fn remove_internal( + &self, inner_lock_ref: Arc>, locking_key: String, version: u64, + primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> bitcoin::io::Result<()> { + check_namespace_key_validity( + &primary_namespace, + &secondary_namespace, + Some(&key), + "remove", + )?; + + let res = { + let mut last_written_version = inner_lock_ref.blocking_lock(); + if version <= *last_written_version { + Ok(()) + } else { + self.ffi_store + .remove(primary_namespace, secondary_namespace, key, lazy) + .map_err(|e| >::into(e)) + .map(|_| { + *last_written_version = version; + }) + } + }; + + self.clean_locks(&inner_lock_ref, locking_key); + res + } + + async fn list_internal_async( + &self, primary_namespace: String, secondary_namespace: String, + ) -> bitcoin::io::Result> { + check_namespace_key_validity(&primary_namespace, &secondary_namespace, None, "list")?; + self.ffi_store + .list_async(primary_namespace, secondary_namespace) + .await + .map_err(|e| e.into()) + } + + fn list_internal( + &self, primary_namespace: String, secondary_namespace: String, + ) -> bitcoin::io::Result> { + check_namespace_key_validity(&primary_namespace, &secondary_namespace, None, "list")?; + self.ffi_store.list(primary_namespace, secondary_namespace).map_err(|e| e.into()) + } + + async fn execute_locked_write< + F: Future>, + FN: FnOnce() -> F, + >( + &self, inner_lock_ref: Arc>, locking_key: String, version: u64, + callback: FN, + ) -> Result<(), bitcoin::io::Error> { + let res = { + let mut last_written_version = inner_lock_ref.lock().await; + + // Check if we already have a newer version written/removed. This is used in async contexts to realize eventual + // consistency. + let is_stale_version = version <= *last_written_version; + + // If the version is not stale, we execute the callback. Otherwise we can and must skip writing. + if is_stale_version { + Ok(()) + } else { + callback().await.map(|_| { + *last_written_version = version; + }) + } + }; + + self.clean_locks(&inner_lock_ref, locking_key); + + res + } + + fn clean_locks(&self, inner_lock_ref: &Arc>, locking_key: String) { + // If there no arcs in use elsewhere, this means that there are no in-flight writes. We can remove the map entry + // to prevent leaking memory. The two arcs that are expected are the one in the map and the one held here in + // inner_lock_ref. The outer lock is obtained first, to avoid a new arc being cloned after we've already + // counted. + let mut outer_lock = self.write_version_locks.lock().unwrap(); + + let strong_count = Arc::strong_count(&inner_lock_ref); + debug_assert!(strong_count >= 2, "Unexpected FfiDynStore strong count"); + + if strong_count == 2 { + outer_lock.remove(&locking_key); + } } } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 6964a9429..ad2ac64b8 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -30,12 +30,13 @@ use ldk_node::config::{AsyncPaymentsRole, Config, ElectrumSyncConfig, EsploraSyn use ldk_node::entropy::{generate_entropy_mnemonic, NodeEntropy}; use ldk_node::io::sqlite_store::SqliteStore; use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus}; +#[cfg(feature = "uniffi")] +use ldk_node::FfiDynStore; +#[cfg(not(feature = "uniffi"))] +use ldk_node::SyncAndAsyncKVStore; use ldk_node::{ Builder, CustomTlvRecord, Event, LightningBalance, Node, NodeError, PendingSweepBalance, - SyncAndAsyncKVStore, }; -#[cfg(feature = "uniffi")] -use ldk_node::FfiDynStore; use lightning::io; use lightning::ln::msgs::SocketAddress; use lightning::routing::gossip::NodeAlias; @@ -333,7 +334,7 @@ pub(crate) enum TestChainSource<'a> { #[cfg(feature = "uniffi")] type TestDynStore = Arc; #[cfg(not(feature = "uniffi"))] -type TestDynStore = Arc; +type TestDynStore = Arc; #[derive(Clone)] pub(crate) enum TestStoreType { @@ -395,14 +396,9 @@ macro_rules! setup_builder { pub(crate) use setup_builder; pub(crate) fn create_tier_stores(base_path: PathBuf) -> (TestDynStore, TestDynStore, TestDynStore) { - let primary = SqliteStore::new( - base_path.join("primary"), - Some("primary_db".to_string()), - Some("primary_kv".to_string()), - ) - .unwrap(); - let backup = FilesystemStore::new(base_path.join("backup")); - let ephemeral = TestStore::new(false); + let primary = TestSyncStore::new(base_path.join("primary")); + let backup = TestSyncStore::new(base_path.join("backup")); + let ephemeral = TestSyncStore::new(base_path.join("ephemeral")); #[cfg(feature = "uniffi")] { @@ -414,7 +410,7 @@ pub(crate) fn create_tier_stores(base_path: PathBuf) -> (TestDynStore, TestDynSt } #[cfg(not(feature = "uniffi"))] { - (primary, backup, ephemeral) + (Arc::new(primary), Arc::new(backup), Arc::new(ephemeral)) } } @@ -523,17 +519,52 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> let node = match config.store_type { TestStoreType::TestSyncStore => { let kv_store = TestSyncStore::new(config.node_config.storage_dir_path.into()); - build_node_with_store(&builder, config.node_entropy, kv_store) + #[cfg(feature = "uniffi")] + { + let kv_store = Arc::new(FfiDynStore::from_kv_store(kv_store)); + build_node_with_store(&builder, config.node_entropy, kv_store) + } + #[cfg(not(feature = "uniffi"))] + { + build_node_with_store(&builder, config.node_entropy, kv_store) + } }, TestStoreType::Sqlite => builder.build(config.node_entropy.into()).unwrap(), TestStoreType::TierStore { primary, backup, ephemeral } => { if let Some(backup) = backup { - builder.set_backup_store(backup); + #[cfg(feature = "uniffi")] + { + builder.set_backup_store(backup); + } + #[cfg(not(feature = "uniffi"))] + { + use ldk_node::{DynStore, DynStoreWrapper}; + builder.set_backup_store( + Arc::new(DynStoreWrapper((*backup).clone())) as Arc + ); + } } if let Some(ephemeral) = ephemeral { - builder.set_ephemeral_store(ephemeral); + #[cfg(feature = "uniffi")] + { + builder.set_ephemeral_store(ephemeral); + } + #[cfg(not(feature = "uniffi"))] + { + use ldk_node::{DynStore, DynStoreWrapper}; + builder.set_ephemeral_store( + Arc::new(DynStoreWrapper((*ephemeral).clone())) as Arc + ); + } + } + #[cfg(feature = "uniffi")] + { + build_node_with_store(&builder, config.node_entropy, primary) + } + #[cfg(not(feature = "uniffi"))] + { + build_node_with_store(&builder, config.node_entropy, (*primary).clone()) } - builder.build_with_store(config.node_entropy.into(), primary).unwrap() }, }; @@ -1686,18 +1717,20 @@ impl TestSyncStoreInner { } } +#[cfg(not(feature = "uniffi"))] pub(crate) fn build_node_with_store( builder: &Builder, entropy: NodeEntropy, store: S, ) -> TestNode { - #[cfg(feature = "uniffi")] { - use ldk_node::FfiDynStore; - builder - .build_with_store(entropy.into(), Arc::new(FfiDynStore::from_kv_store(store))) - .unwrap() + builder.build_with_store(entropy, store).unwrap() } - #[cfg(not(feature = "uniffi"))] +} + +#[cfg(feature = "uniffi")] +pub(crate) fn build_node_with_store( + builder: &Builder, entropy: NodeEntropy, store: Arc, +) -> TestNode { { - builder.build_with_store(entropy, store).unwrap() + builder.build_with_store(entropy.into(), store).unwrap() } } diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index c24d58e91..7e0b681f6 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -70,14 +70,14 @@ async fn channel_full_cycle_tier_store() { true, false, TestStoreType::TierStore { - primary: Arc::clone(&primary_a), - backup: Some(Arc::clone(&backup_a)), - ephemeral: Some(Arc::clone(&ephemeral_a)), + primary: primary_a.clone(), + backup: Some(backup_a.clone()), + ephemeral: Some(ephemeral_a.clone()), }, TestStoreType::TierStore { - primary: Arc::clone(&primary_b), - backup: Some(Arc::clone(&backup_b)), - ephemeral: Some(Arc::clone(&ephemeral_b)), + primary: primary_b, + backup: Some(backup_b), + ephemeral: Some(ephemeral_b), }, ); do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false) @@ -85,7 +85,7 @@ async fn channel_full_cycle_tier_store() { // Verify Primary store contains channel manager data let primary_channel_manager = KVStoreSync::read( - primary_a.as_ref(), + &(*primary_a.clone()), CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, @@ -93,13 +93,13 @@ async fn channel_full_cycle_tier_store() { assert!(primary_channel_manager.is_ok(), "Primary should have channel manager data"); // Verify Primary store contains payment info - let primary_payments = KVStoreSync::list(primary_a.as_ref(), "payments", ""); + let primary_payments = KVStoreSync::list(&(*primary_a.clone()), "payments", ""); assert!(primary_payments.is_ok(), "Primary should have payment data"); assert!(!primary_payments.unwrap().is_empty(), "Primary should have payment entries"); // Verify Backup store synced critical data let backup_channel_manager = KVStoreSync::read( - backup_a.as_ref(), + &(*backup_a.clone()), CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, @@ -107,12 +107,12 @@ async fn channel_full_cycle_tier_store() { assert!(backup_channel_manager.is_ok(), "Backup should have synced channel manager"); // Verify backup is not empty - let backup_all_keys = KVStoreSync::list(backup_a.as_ref(), "", "").unwrap(); + let backup_all_keys = KVStoreSync::list(&(*backup_a.clone()), "", "").unwrap(); assert!(!backup_all_keys.is_empty(), "Backup store should not be empty"); // Verify Ephemeral does NOT have channel manager let ephemeral_channel_manager = KVStoreSync::read( - ephemeral_a.as_ref(), + &(*ephemeral_a.clone()), CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, @@ -120,7 +120,7 @@ async fn channel_full_cycle_tier_store() { assert!(ephemeral_channel_manager.is_err(), "Ephemeral should NOT have channel manager"); // Verify Ephemeral does NOT have payment info - let ephemeral_payments = KVStoreSync::list(ephemeral_a.as_ref(), "payments", ""); + let ephemeral_payments = KVStoreSync::list(&(*ephemeral_a.clone()), "payments", ""); assert!( ephemeral_payments.is_err() || ephemeral_payments.unwrap().is_empty(), "Ephemeral should NOT have payment data" @@ -128,7 +128,7 @@ async fn channel_full_cycle_tier_store() { //Verify Ephemeral does have network graph let ephemeral_network_graph = KVStoreSync::read( - ephemeral_a.as_ref(), + &(*ephemeral_a.clone()), NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, @@ -321,7 +321,18 @@ async fn start_stop_reinit() { setup_builder!(builder, config.node_config); builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - let node = build_node_with_store(&builder, config.node_entropy, test_sync_store.clone()); + let node; + #[cfg(feature = "uniffi")] + { + use ldk_node::FfiDynStore; + + let test_sync_store = Arc::new(FfiDynStore::from_kv_store(test_sync_store.clone())); + node = build_node_with_store(&builder, config.node_entropy, test_sync_store); + } + #[cfg(not(feature = "uniffi"))] + { + node = build_node_with_store(&builder, config.node_entropy, test_sync_store.clone()); + } node.start().unwrap(); let expected_node_id = node.node_id(); @@ -359,8 +370,18 @@ async fn start_stop_reinit() { setup_builder!(builder, config.node_config); builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - let reinitialized_node = - build_node_with_store(&builder, config.node_entropy, test_sync_store.clone()); + let reinitialized_node; + #[cfg(feature = "uniffi")] + { + use ldk_node::FfiDynStore; + + let test_sync_store = Arc::new(FfiDynStore::from_kv_store(test_sync_store)); + reinitialized_node = build_node_with_store(&builder, config.node_entropy, test_sync_store); + } + #[cfg(not(feature = "uniffi"))] + { + reinitialized_node = build_node_with_store(&builder, config.node_entropy, test_sync_store); + } reinitialized_node.start().unwrap(); assert_eq!(reinitialized_node.node_id(), expected_node_id); From fed8a92fc9180171edf6b87cfe65cc37de9ce1ed Mon Sep 17 00:00:00 2001 From: Enigbe Date: Mon, 16 Feb 2026 17:32:30 +0100 Subject: [PATCH 06/10] Add documentation for ForeignDynstoreTrait --- src/ffi/types.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/ffi/types.rs b/src/ffi/types.rs index a9a1af45d..1328fc63d 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -166,6 +166,9 @@ impl std::fmt::Display for IOError { } } +/// FFI-safe version of [`DynStoreTrait`]. +/// +/// [`DynStoreTrait`]: crate::types::DynStoreTrait #[async_trait] pub trait FfiDynStoreTrait: Send + Sync { async fn read_async( From 02debf54d5843d29416c1e9529d059d7e49adb56 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Mon, 16 Feb 2026 18:36:52 +0100 Subject: [PATCH 07/10] Remove unnecessary uniffi gating in tier store --- src/io/tier_store.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs index 70904c36f..e9d4ff6b2 100644 --- a/src/io/tier_store.rs +++ b/src/io/tier_store.rs @@ -706,7 +706,6 @@ mod tests { use crate::io::tier_store::TierStore; use crate::logger::Logger; use crate::runtime::Runtime; - #[cfg(not(feature = "uniffi"))] use crate::types::DynStore; use crate::types::DynStoreWrapper; From cb66b599da9b3db8b14f7b766af82ba8b604748a Mon Sep 17 00:00:00 2001 From: Enigbe Date: Mon, 16 Feb 2026 19:55:33 +0100 Subject: [PATCH 08/10] Drop DelayedStore and associated backup test These were created to test that our backup store does not impact the primary store writes but the boilerplate appears too much for the functionality being tested. --- src/io/test_utils.rs | 170 +------------------------------------------ src/io/tier_store.rs | 49 +------------ 2 files changed, 4 insertions(+), 215 deletions(-) diff --git a/src/io/test_utils.rs b/src/io/test_utils.rs index f2b226a5f..9add2d6c1 100644 --- a/src/io/test_utils.rs +++ b/src/io/test_utils.rs @@ -9,8 +9,7 @@ use std::collections::{hash_map, HashMap}; use std::future::Future; use std::panic::RefUnwindSafe; use std::path::PathBuf; -use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::sync::Mutex; use lightning::events::ClosureReason; use lightning::ln::functional_test_utils::{ @@ -26,8 +25,6 @@ use lightning::{check_closed_broadcast, io}; use rand::distr::Alphanumeric; use rand::{rng, Rng}; -use crate::runtime::Runtime; - type TestMonitorUpdatePersister<'a, K> = MonitorUpdatingPersister< &'a K, &'a test_utils::TestLogger, @@ -355,168 +352,3 @@ pub(crate) fn do_test_store(store_0: &K, store_1: &K) { // Make sure everything is persisted as expected after close. check_persisted_data!(persister_0_max_pending_updates * 2 * EXPECTED_UPDATES_PER_PAYMENT + 1); } - -struct DelayedStoreInner { - storage: Mutex>>, - delay: Duration, -} - -impl DelayedStoreInner { - fn new(delay: Duration) -> Self { - Self { storage: Mutex::new(HashMap::new()), delay } - } - - fn make_key(pn: &str, sn: &str, key: &str) -> String { - format!("{}/{}/{}", pn, sn, key) - } - - async fn read_internal( - &self, primary_namespace: String, secondary_namespace: String, key: String, - ) -> Result, io::Error> { - tokio::time::sleep(self.delay).await; - - let full_key = Self::make_key(&primary_namespace, &secondary_namespace, &key); - let storage = self.storage.lock().unwrap(); - storage - .get(&full_key) - .cloned() - .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "key not found")) - } - - async fn write_internal( - &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, - ) -> Result<(), io::Error> { - tokio::time::sleep(self.delay).await; - - let full_key = Self::make_key(&primary_namespace, &secondary_namespace, &key); - let mut storage = self.storage.lock().unwrap(); - storage.insert(full_key, buf); - Ok(()) - } - - async fn remove_internal( - &self, primary_namespace: String, secondary_namespace: String, key: String, - ) -> Result<(), io::Error> { - tokio::time::sleep(self.delay).await; - - let full_key = Self::make_key(&primary_namespace, &secondary_namespace, &key); - let mut storage = self.storage.lock().unwrap(); - storage.remove(&full_key); - Ok(()) - } - - async fn list_internal( - &self, primary_namespace: String, secondary_namespace: String, - ) -> Result, io::Error> { - tokio::time::sleep(self.delay).await; - - let prefix = format!("{}/{}/", primary_namespace, secondary_namespace); - let storage = self.storage.lock().unwrap(); - Ok(storage - .keys() - .filter(|k| k.starts_with(&prefix)) - .map(|k| k.strip_prefix(&prefix).unwrap().to_string()) - .collect()) - } -} - -pub struct DelayedStore { - inner: Arc, - runtime: Arc, -} - -impl DelayedStore { - pub fn new(delay_ms: u64, runtime: Arc) -> Self { - Self { inner: Arc::new(DelayedStoreInner::new(Duration::from_millis(delay_ms))), runtime } - } -} - -impl KVStore for DelayedStore { - fn read( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> impl Future, io::Error>> + 'static + Send { - let inner = Arc::clone(&self.inner); - let pn = primary_namespace.to_string(); - let sn = secondary_namespace.to_string(); - let key = key.to_string(); - - async move { inner.read_internal(pn, sn, key).await } - } - - fn write( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, - ) -> impl Future> + 'static + Send { - let inner = Arc::clone(&self.inner); - let pn = primary_namespace.to_string(); - let sn = secondary_namespace.to_string(); - let key = key.to_string(); - - async move { inner.write_internal(pn, sn, key, buf).await } - } - - fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, - ) -> impl Future> + 'static + Send { - let inner = Arc::clone(&self.inner); - let pn = primary_namespace.to_string(); - let sn = secondary_namespace.to_string(); - let key = key.to_string(); - - async move { inner.remove_internal(pn, sn, key).await } - } - - fn list( - &self, primary_namespace: &str, secondary_namespace: &str, - ) -> impl Future, io::Error>> + 'static + Send { - let inner = Arc::clone(&self.inner); - let pn = primary_namespace.to_string(); - let sn = secondary_namespace.to_string(); - - async move { inner.list_internal(pn, sn).await } - } -} - -impl KVStoreSync for DelayedStore { - fn read( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> Result, io::Error> { - let inner = Arc::clone(&self.inner); - let pn = primary_namespace.to_string(); - let sn = secondary_namespace.to_string(); - let key = key.to_string(); - - self.runtime.block_on(async move { inner.read_internal(pn, sn, key).await }) - } - - fn write( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, - ) -> Result<(), io::Error> { - let inner = Arc::clone(&self.inner); - let pn = primary_namespace.to_string(); - let sn = secondary_namespace.to_string(); - let key = key.to_string(); - - self.runtime.block_on(async move { inner.write_internal(pn, sn, key, buf).await }) - } - - fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, - ) -> Result<(), io::Error> { - let inner = Arc::clone(&self.inner); - let pn = primary_namespace.to_string(); - let sn = secondary_namespace.to_string(); - let key = key.to_string(); - - self.runtime.block_on(async move { inner.remove_internal(pn, sn, key).await }) - } - - fn list( - &self, primary_namespace: &str, secondary_namespace: &str, - ) -> Result, io::Error> { - let inner = Arc::clone(&self.inner); - let pn = primary_namespace.to_string(); - let sn = secondary_namespace.to_string(); - - self.runtime.block_on(async move { inner.list_internal(pn, sn).await }) - } -} diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs index e9d4ff6b2..eeec1f418 100644 --- a/src/io/tier_store.rs +++ b/src/io/tier_store.rs @@ -700,9 +700,7 @@ mod tests { }; use lightning_persister::fs_store::FilesystemStore; - use crate::io::test_utils::{ - do_read_write_remove_list_persist, random_storage_path, DelayedStore, - }; + use crate::io::test_utils::{do_read_write_remove_list_persist, random_storage_path}; use crate::io::tier_store::TierStore; use crate::logger::Logger; use crate::runtime::Runtime; @@ -872,47 +870,6 @@ mod tests { assert_eq!(backup_read_cm.unwrap(), data); } - #[test] - fn backup_overflow_doesnt_fail_writes() { - let base_dir = random_storage_path(); - let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); - let logger = Arc::new(Logger::new_fs_writer(log_path.clone(), Level::Trace).unwrap()); - let runtime = Arc::new(Runtime::new(Arc::clone(&logger)).unwrap()); - - let _cleanup = CleanupDir(base_dir.clone()); - - let primary_store: Arc = - Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); - let mut tier = - setup_tier_store(Arc::clone(&primary_store), Arc::clone(&logger), Arc::clone(&runtime)); - - let backup_store: Arc = - Arc::new(DynStoreWrapper(DelayedStore::new(100, runtime))); - tier.set_backup_store(Arc::clone(&backup_store)); - - let data = vec![42u8; 32]; - - let key = CHANNEL_MANAGER_PERSISTENCE_KEY; - for i in 0..=10 { - let result = KVStoreSync::write( - &tier, - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - &format!("{}_{}", key, i), - data.clone(), - ); - - assert!(result.is_ok(), "Write {} should succeed", i); - } - - // Check logs for backup queue overflow message - let log_contents = std::fs::read_to_string(&log_path).unwrap(); - assert!( - log_contents.contains("Backup queue is full"), - "Logs should contain backup queue overflow message" - ); - } - #[test] fn lazy_removal() { let base_dir = random_storage_path(); @@ -928,7 +885,7 @@ mod tests { setup_tier_store(Arc::clone(&primary_store), Arc::clone(&logger), Arc::clone(&runtime)); let backup_store: Arc = - Arc::new(DynStoreWrapper(DelayedStore::new(100, runtime))); + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("backup")))); tier.set_backup_store(Arc::clone(&backup_store)); let data = vec![42u8; 32]; @@ -943,7 +900,7 @@ mod tests { ); assert!(write_result.is_ok(), "Write should succeed"); - thread::sleep(Duration::from_millis(10)); + thread::sleep(Duration::from_millis(100)); assert_eq!( KVStoreSync::read( From 08fe778232f0db37794a25e1adfcc433d0b886c0 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Mon, 16 Feb 2026 20:27:57 +0100 Subject: [PATCH 09/10] Remove spammy logs --- src/io/tier_store.rs | 44 ++++++-------------------------------------- 1 file changed, 6 insertions(+), 38 deletions(-) diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs index eeec1f418..10b36a181 100644 --- a/src/io/tier_store.rs +++ b/src/io/tier_store.rs @@ -11,13 +11,13 @@ use crate::logger::{LdkLogger, Logger}; use crate::runtime::Runtime; use crate::types::DynStore; +use lightning::io; use lightning::util::persist::{ KVStore, KVStoreSync, NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, }; -use lightning::{io, log_trace}; -use lightning::{log_debug, log_error, log_info, log_warn}; +use lightning::{log_error, log_warn}; use tokio::sync::mpsc::{self, error::TrySendError}; @@ -86,15 +86,7 @@ impl TierStore { ) { while let Some(op) = receiver.recv().await { match Self::apply_backup_operation(&op, &backup_store).await { - Ok(_) => { - log_trace!( - logger, - "Backup succeeded for key {}/{}/{}", - op.primary_namespace(), - op.secondary_namespace(), - op.key() - ); - }, + Ok(_) => {}, Err(e) => { log_error!( logger, @@ -381,16 +373,7 @@ impl TierStoreInner { ) .await { - Ok(data) => { - log_info!( - self.logger, - "Read succeeded for key: {}/{}/{}", - primary_namespace, - secondary_namespace, - key - ); - Ok(data) - }, + Ok(data) => Ok(data), Err(e) => { log_error!( self.logger, @@ -413,12 +396,6 @@ impl TierStoreInner { .await { Ok(keys) => { - log_info!( - self.logger, - "List succeeded for namespace: {}/{}", - primary_namespace, - secondary_namespace - ); return Ok(keys); }, Err(e) => { @@ -464,7 +441,7 @@ impl TierStoreInner { Ok(()) }, Err(e) => { - log_debug!( + log_error!( self.logger, "Skipping backup write due to primary write failure for key: {}/{}/{}.", primary_namespace, @@ -506,7 +483,7 @@ impl TierStoreInner { Ok(()) }, Err(e) => { - log_debug!( + log_error!( self.logger, "Skipping backup removal due to primary removal failure for key: {}/{}/{}.", primary_namespace, @@ -548,7 +525,6 @@ impl TierStoreInner { ) .await } else { - log_debug!(self.logger, "Ephemeral store not configured. Reading non-critical data from primary or backup stores."); self.read_primary(&primary_namespace, &secondary_namespace, &key).await } }, @@ -572,8 +548,6 @@ impl TierStoreInner { ) .await } else { - log_debug!(self.logger, "Ephemeral store not configured. Writing non-critical data to primary and backup stores."); - self.primary_write_then_schedule_backup( primary_namespace.as_str(), secondary_namespace.as_str(), @@ -611,8 +585,6 @@ impl TierStoreInner { ) .await } else { - log_debug!(self.logger, "Ephemeral store not configured. Removing non-critical data from primary and backup stores."); - self.primary_remove_then_schedule_backup( primary_namespace.as_str(), secondary_namespace.as_str(), @@ -646,10 +618,6 @@ impl TierStoreInner { if let Some(eph_store) = self.ephemeral_store.as_ref() { KVStoreSync::list(eph_store.as_ref(), &primary_namespace, &secondary_namespace) } else { - log_debug!( - self.logger, - "Ephemeral store not configured. Listing from primary and backup stores." - ); self.list_primary(&primary_namespace, &secondary_namespace).await } }, From db1fe8379af91bdd67d8477cc2587ed4de4de395 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Mon, 16 Feb 2026 21:44:10 +0100 Subject: [PATCH 10/10] DRY ephemeral key matching, fix visibility and appropriate kvstore usage - Restrict `TierStoreInner` visibility from `pub` to `pub(crate)` - Primary store can be either local or remote - Extract repeated ephemeral key matching into a standalone `is_ephemerally_cached_key` helper to DRY up `read_internal`, `write_internal`, and `remove_internal` - Replace `KVStoreSync::list` with async `KVStore::list` in `list_internal` to avoid blocking the async runtime --- src/io/tier_store.rs | 163 +++++++++++++++++++------------------------ 1 file changed, 71 insertions(+), 92 deletions(-) diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs index 10b36a181..9306279ad 100644 --- a/src/io/tier_store.rs +++ b/src/io/tier_store.rs @@ -230,8 +230,8 @@ impl KVStoreSync for TierStore { } } -pub struct TierStoreInner { - /// For remote data. +struct TierStoreInner { + /// For local or remote data. primary_store: Arc, /// For local non-critical/ephemeral data. ephemeral_store: Option>, @@ -395,9 +395,7 @@ impl TierStoreInner { match KVStore::list(self.primary_store.as_ref(), primary_namespace, secondary_namespace) .await { - Ok(keys) => { - return Ok(keys); - }, + Ok(keys) => Ok(keys), Err(e) => { log_error!( self.logger, @@ -505,104 +503,76 @@ impl TierStoreInner { "read", )?; - match (primary_namespace.as_str(), secondary_namespace.as_str(), key.as_str()) { - (NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY) - | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) => { - if let Some(eph_store) = self.ephemeral_store.as_ref() { - // We only try once here (without retry logic) because local failure might be indicative - // of a more serious issue (e.g. full memory, memory corruption, permissions change) that - // do not self-resolve such that retrying would negate the latency benefits. - - // The following questions remain: - // 1. Are there situations where local transient errors may warrant a retry? - // 2. Can we reliably identify/detect these transient errors? - // 3. Should we fall back to the primary or backup stores in the event of any error? - KVStore::read( - eph_store.as_ref(), - &primary_namespace, - &secondary_namespace, - &key, - ) - .await - } else { - self.read_primary(&primary_namespace, &secondary_namespace, &key).await - } - }, - _ => self.read_primary(&primary_namespace, &secondary_namespace, &key).await, + if let Some(eph_store) = self + .ephemeral_store + .as_ref() + .filter(|_s| is_ephemeral_cached_key(&primary_namespace, &secondary_namespace, &key)) + { + // We only try once here (without retry logic) because local failure might be indicative + // of a more serious issue (e.g. full memory, memory corruption, permissions change) that + // do not self-resolve such that retrying would negate the latency benefits. + + // The following questions remain: + // 1. Are there situations where local transient errors may warrant a retry? + // 2. Can we reliably identify/detect these transient errors? + // 3. Should we fall back to the primary or backup stores in the event of any error? + KVStore::read(eph_store.as_ref(), &primary_namespace, &secondary_namespace, &key).await + } else { + self.read_primary(&primary_namespace, &secondary_namespace, &key).await } } async fn write_internal( &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, ) -> io::Result<()> { - match (primary_namespace.as_str(), secondary_namespace.as_str(), key.as_str()) { - (NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY) - | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) => { - if let Some(eph_store) = &self.ephemeral_store { - KVStore::write( - eph_store.as_ref(), - primary_namespace.as_str(), - secondary_namespace.as_str(), - key.as_str(), - buf, - ) - .await - } else { - self.primary_write_then_schedule_backup( - primary_namespace.as_str(), - secondary_namespace.as_str(), - key.as_str(), - buf, - ) - .await - } - }, - _ => { - self.primary_write_then_schedule_backup( - primary_namespace.as_str(), - secondary_namespace.as_str(), - key.as_str(), - buf, - ) - .await - }, + if let Some(eph_store) = self + .ephemeral_store + .as_ref() + .filter(|_s| is_ephemeral_cached_key(&primary_namespace, &secondary_namespace, &key)) + { + KVStore::write( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await + } else { + self.primary_write_then_schedule_backup( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await } } async fn remove_internal( &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, ) -> io::Result<()> { - match (primary_namespace.as_str(), secondary_namespace.as_str(), key.as_str()) { - (NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY) - | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) => { - if let Some(eph_store) = &self.ephemeral_store { - KVStore::remove( - eph_store.as_ref(), - primary_namespace.as_str(), - secondary_namespace.as_str(), - key.as_str(), - lazy, - ) - .await - } else { - self.primary_remove_then_schedule_backup( - primary_namespace.as_str(), - secondary_namespace.as_str(), - key.as_str(), - lazy, - ) - .await - } - }, - _ => { - self.primary_remove_then_schedule_backup( - primary_namespace.as_str(), - secondary_namespace.as_str(), - key.as_str(), - lazy, - ) - .await - }, + if let Some(eph_store) = self + .ephemeral_store + .as_ref() + .filter(|_s| is_ephemeral_cached_key(&primary_namespace, &secondary_namespace, &key)) + { + KVStore::remove( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await + } else { + self.primary_remove_then_schedule_backup( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await } } @@ -616,7 +586,8 @@ impl TierStoreInner { ) | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _) => { if let Some(eph_store) = self.ephemeral_store.as_ref() { - KVStoreSync::list(eph_store.as_ref(), &primary_namespace, &secondary_namespace) + KVStore::list(eph_store.as_ref(), &primary_namespace, &secondary_namespace) + .await } else { self.list_primary(&primary_namespace, &secondary_namespace).await } @@ -653,6 +624,14 @@ impl BackupOp { } } +fn is_ephemeral_cached_key(pn: &str, sn: &str, key: &str) -> bool { + matches!( + (pn, sn, key), + (NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) + ) +} + #[cfg(test)] mod tests { use std::panic::RefUnwindSafe;