diff --git a/payjoin-mailroom/src/config.rs b/payjoin-mailroom/src/config.rs index a040445ae..5afa94b1e 100644 --- a/payjoin-mailroom/src/config.rs +++ b/payjoin-mailroom/src/config.rs @@ -12,6 +12,10 @@ pub struct Config { pub storage_dir: PathBuf, #[serde(deserialize_with = "deserialize_duration_secs")] pub timeout: Duration, + /// Mailbox lifetime in seconds. Together with `DEFAULT_CAPACITY` in + /// `db::files`, this bounds both disk usage and write throughput. + #[serde(deserialize_with = "deserialize_duration_secs")] + pub mailbox_ttl: Duration, pub v1: Option, #[cfg(feature = "telemetry")] pub telemetry: Option, @@ -85,6 +89,7 @@ impl Default for Config { listener: "[::]:8080".parse().expect("valid default listener address"), storage_dir: PathBuf::from("./data"), timeout: Duration::from_secs(30), + mailbox_ttl: Duration::from_secs(60 * 60 * 24 * 7), // 1 week v1: None, #[cfg(feature = "telemetry")] telemetry: None, @@ -115,6 +120,7 @@ impl Config { listener, storage_dir, timeout, + mailbox_ttl: Duration::from_secs(60 * 60 * 24 * 7), // 1 week v1, #[cfg(feature = "telemetry")] telemetry: None, diff --git a/payjoin-mailroom/src/db/files.rs b/payjoin-mailroom/src/db/files.rs index 0494cf3de..023c27ecc 100644 --- a/payjoin-mailroom/src/db/files.rs +++ b/payjoin-mailroom/src/db/files.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, VecDeque}; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; @@ -11,7 +11,7 @@ use rand::RngCore; use tokio::fs::{self, File}; use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; use tokio::sync::{oneshot, Mutex}; -use tracing::trace; +use tracing::{trace, warn}; use crate::db::{Db as DbTrait, Error as DbError}; @@ -21,14 +21,6 @@ use crate::db::{Db as DbTrait, Error as DbError}; /// mailboxes/tx, ~4K txs/block, and ~144 blocks/24h. const DEFAULT_CAPACITY: usize = 1 << (1 + 12 + 8); -const DEFAULT_UNREAD_TTL_AT_CAPACITY: Duration = Duration::from_secs(60 * 60 * 24); // 1 day -const DEFAULT_UNREAD_TTL_BELOW_CAPACITY: Duration = Duration::from_secs(60 * 60 * 24 * 7); // 1 week - -/// How long read messages should be kept in mailboxes. Defaults to a 10 minute -/// grace period from first read attempt, in case of intermittent network or -/// relay errors. -const DEFAULT_READ_TTL: Duration = Duration::from_secs(60 * 10); // 10 minutes - #[derive(Debug)] struct V2WaitMapEntry { receiver: future::Shared>>>, @@ -50,12 +42,7 @@ pub(crate) struct Mailboxes { pending_v1: HashMap, pending_v2: HashMap, insert_order: VecDeque<(SystemTime, ShortId)>, - read_order: VecDeque<(SystemTime, ShortId)>, - read_mailbox_ids: HashSet, - unread_ttl_below_capacity: Duration, - unread_ttl_at_capacity: Duration, - read_ttl: Duration, - early_removal_count: usize, + ttl: Duration, } #[derive(Debug)] @@ -200,7 +187,7 @@ impl DiskStorage { } impl Mailboxes { - async fn init(dir: PathBuf) -> io::Result { + async fn init(dir: PathBuf, ttl: Duration) -> io::Result { let storage = DiskStorage::init(dir).await?; let insert_order = storage.insert_order().await?.into(); Ok(Self { @@ -209,12 +196,7 @@ impl Mailboxes { capacity: DEFAULT_CAPACITY, pending_v1: HashMap::default(), pending_v2: HashMap::default(), - read_order: VecDeque::default(), - read_mailbox_ids: HashSet::default(), - unread_ttl_below_capacity: DEFAULT_UNREAD_TTL_BELOW_CAPACITY, - unread_ttl_at_capacity: DEFAULT_UNREAD_TTL_AT_CAPACITY, - read_ttl: DEFAULT_READ_TTL, - early_removal_count: 0, + ttl, }) } } @@ -226,8 +208,8 @@ pub struct FilesDb { } impl FilesDb { - pub async fn init(timeout: Duration, path: PathBuf) -> io::Result { - Ok(Self { timeout, mailboxes: Arc::new(Mutex::new(Mailboxes::init(path).await?)) }) + pub async fn init(timeout: Duration, path: PathBuf, ttl: Duration) -> io::Result { + Ok(Self { timeout, mailboxes: Arc::new(Mutex::new(Mailboxes::init(path, ttl).await?)) }) } pub async fn prune(&self) -> io::Result { self.mailboxes.lock().await.prune().await } @@ -337,19 +319,12 @@ impl Mailboxes { // V2 requests are stored on disk if let Some((_created, payload)) = self.persistent_storage.get(id).await? { - self.mark_read(id); return Ok(Some(Arc::new(payload))); } Ok(None) } - fn mark_read(&mut self, id: &ShortId) { - if self.read_mailbox_ids.insert(*id) { - self.read_order.push_back((SystemTime::now(), *id)); - } - } - async fn has_capacity(&mut self) -> io::Result { self.maybe_prune().await?; Ok(self.len() < self.capacity) @@ -399,12 +374,10 @@ impl Mailboxes { self.insert_order.push_back((created, *id)); - // If there are pending readers, satisfy them and mark the payload as read + // If there are pending readers, satisfy them if let Some(pending) = self.pending_v2.remove(id) { trace!("notifying pending readers for {}", id); - self.mark_read(id); - pending .sender .send(Arc::new(payload)) @@ -444,7 +417,6 @@ impl Mailboxes { } async fn remove(&mut self, id: &ShortId) -> io::Result> { - self.read_mailbox_ids.remove(id); self.persistent_storage.remove(id).await } @@ -457,9 +429,7 @@ impl Mailboxes { } fn len(&self) -> usize { - (self.insert_order.len() - self.early_removal_count) - + self.pending_v1.len() - + self.pending_v2.len() + self.insert_order.len() + self.pending_v1.len() + self.pending_v2.len() } async fn maybe_prune(&mut self) -> io::Result { @@ -479,86 +449,37 @@ impl Mailboxes { trace!("pruning"); let now = SystemTime::now(); - debug_assert!(self.read_ttl < self.unread_ttl_at_capacity); - debug_assert!(self.unread_ttl_at_capacity < self.unread_ttl_below_capacity); debug_assert!(self.pending_v1.iter().all(|(_, v)| !v.sender.is_closed())); // Prune in flight requests, these can persist in the case of an incomplete session self.pending_v2.retain(|_, v| v.receiver.strong_count().unwrap_or(0) > 1); - // Prune any fully expired mailboxes, whether read or unread + // Prune any fully expired mailboxes while let Some((created, id)) = self.insert_order.front().cloned() { - if created + self.unread_ttl_below_capacity < now { - debug_assert!(self.insert_order.len() >= self.early_removal_count); + if created + self.ttl < now { _ = self.insert_order.pop_front(); if self.remove(&id).await?.is_none() { - self.early_removal_count = self - .early_removal_count - .checked_sub(1) - .expect("early removal adjustment should never underflow"); - } - debug_assert!(self.insert_order.len() >= self.early_removal_count); - trace!("Pruned old mailbox {id}"); - } else { - break; - } - } - - // So long as there expired read mailboxes, prune those. Stop when a - // mailbox within the TTL is encountered. - while let Some((read, id)) = self.read_order.front().cloned() { - if read + self.read_ttl < now { - _ = self.read_order.pop_front(); - if self.remove(&id).await?.is_some() { - self.early_removal_count += 1; - debug_assert!(self.insert_order.len() >= self.early_removal_count); + warn!("Mailbox file missing during prune; possible external deletion or disk error"); + } else { + trace!("Pruned old mailbox {id}"); } - trace!("Pruned read mailbox {id}"); } else { break; } } - // If no room was created, try to prune the oldest unread mailbox if - // it's over the minimum TTL - debug_assert!(self.len() <= self.capacity); - if self.len() == self.capacity { - if let Some((created, id)) = self.insert_order.front().cloned() { - if created + self.unread_ttl_at_capacity < now { - _ = self.insert_order.pop_front(); - self.remove(&id).await?; - trace!("Pruned unread mailbox {id} to make room"); - } else { - trace!("Nothing to prune, {} entries remain", self.len()); - } - } - } - Ok(self.next_prune()) } fn next_prune(&mut self) -> Duration { - let earliest_read_prune_opportunity = self - .read_order - .front() - .map(|(read, _id)| { - self.read_ttl - .checked_sub(read.elapsed().expect("system clock moved back")) - .unwrap_or(self.read_ttl) - }) - .unwrap_or_else(|| self.read_ttl); - - let earliest_unread_prune_opportunity = self - .insert_order + self.insert_order .front() .map(|(created, _id)| { - self.unread_ttl_at_capacity + self.ttl .checked_sub(created.elapsed().expect("system clock moved back")) - .unwrap_or(self.unread_ttl_at_capacity) + .unwrap_or(self.ttl) }) - .unwrap_or_else(|| self.unread_ttl_at_capacity); - - std::cmp::min(earliest_read_prune_opportunity, earliest_unread_prune_opportunity) + .unwrap_or(self.ttl) } } @@ -754,9 +675,13 @@ async fn test_disk_storage_mailboxes() -> std::io::Result<()> { async fn test_mailbox_storage() -> std::io::Result<()> { let dir = tempfile::tempdir()?; - let db = FilesDb::init(Duration::from_millis(10), dir.path().to_owned()) - .await - .expect("initializing mailbox database should succeed"); + let db = FilesDb::init( + Duration::from_millis(10), + dir.path().to_owned(), + Duration::from_secs(60 * 60 * 24 * 7), + ) + .await + .expect("initializing mailbox database should succeed"); let id = ShortId([0u8; 8]); let contents = b"foo bar"; @@ -775,9 +700,13 @@ async fn test_mailbox_storage() -> std::io::Result<()> { async fn test_v2_wait() -> std::io::Result<()> { let dir = tempfile::tempdir()?; - let db = FilesDb::init(Duration::from_millis(1), dir.path().to_owned()) - .await - .expect("initializing mailbox database should succeed"); + let db = FilesDb::init( + Duration::from_millis(1), + dir.path().to_owned(), + Duration::from_secs(60 * 60 * 24 * 7), + ) + .await + .expect("initializing mailbox database should succeed"); let id = ShortId([0u8; 8]); let contents = b"foo bar"; @@ -832,9 +761,13 @@ async fn test_v1_wait() -> std::io::Result<()> { let dir = tempfile::tempdir()?; let db = Arc::new( - FilesDb::init(Duration::from_millis(1), dir.path().to_owned()) - .await - .expect("initializing mailbox database should succeed"), + FilesDb::init( + Duration::from_millis(1), + dir.path().to_owned(), + Duration::from_secs(60 * 60 * 24 * 7), + ) + .await + .expect("initializing mailbox database should succeed"), ); let id = ShortId([0u8; 8]); @@ -879,9 +812,13 @@ async fn test_v1_data_minimization() -> std::io::Result<()> { let dir = tempfile::tempdir()?; let db = Arc::new( - FilesDb::init(Duration::from_millis(500), dir.path().to_owned()) - .await - .expect("initializing mailbox database should succeed"), + FilesDb::init( + Duration::from_millis(500), + dir.path().to_owned(), + Duration::from_secs(60 * 60 * 24 * 7), + ) + .await + .expect("initializing mailbox database should succeed"), ); let id = ShortId([0u8; 8]); @@ -934,20 +871,20 @@ async fn test_v1_data_minimization() -> std::io::Result<()> { async fn test_prune() -> std::io::Result<()> { let dir = tempfile::tempdir()?; - let db = FilesDb::init(Duration::from_millis(2), dir.path().to_owned()) - .await - .expect("initializing mailbox database should succeed"); + let db = FilesDb::init( + Duration::from_millis(2), + dir.path().to_owned(), + Duration::from_secs(60 * 60 * 24 * 7), + ) + .await + .expect("initializing mailbox database should succeed"); - let read_ttl = Duration::from_secs(60); - let unread_ttl_at_capacity = Duration::from_secs(600); - let unread_ttl_below_capacity = Duration::from_secs(3600); + let ttl = Duration::from_secs(600); { let mut guard = db.mailboxes.lock().await; guard.capacity = 2; - guard.read_ttl = read_ttl; - guard.unread_ttl_at_capacity = unread_ttl_at_capacity; - guard.unread_ttl_below_capacity = unread_ttl_below_capacity; + guard.ttl = ttl; } assert_eq!(db.mailboxes.lock().await.len(), 0); @@ -984,38 +921,11 @@ async fn test_prune() -> std::io::Result<()> { db.prune().await.expect("pruning should not fail"); assert_eq!(db.mailboxes.lock().await.len(), 1); - // Shift insert timestamps past unread_ttl_below_capacity + // Shift insert timestamps past ttl { let mut guard = db.mailboxes.lock().await; for (ts, _) in guard.insert_order.iter_mut() { - *ts -= unread_ttl_below_capacity + Duration::from_secs(1); - } - } - - assert_eq!(db.mailboxes.lock().await.len(), 1); - db.prune().await.expect("pruning should not fail"); - assert_eq!(db.mailboxes.lock().await.len(), 0); - - // Post again, read it, then verify read TTL pruning - db.post_v2_payload(&id, contents.to_vec()) - .await - .expect("posting payload should succeed") - .expect("contents should be accepted"); - - assert_eq!(db.mailboxes.lock().await.len(), 1); - - // Mark the mailbox as read - _ = db.wait_for_v2_payload(&id).await.expect("waiting for payload should succeed"); - - assert_eq!(db.mailboxes.lock().await.len(), 1); - db.prune().await.expect("pruning should not fail"); - assert_eq!(db.mailboxes.lock().await.len(), 1); - - // Shift read timestamps past read_ttl - { - let mut guard = db.mailboxes.lock().await; - for (ts, _) in guard.read_order.iter_mut() { - *ts -= read_ttl + Duration::from_secs(1); + *ts -= ttl + Duration::from_secs(1); } } diff --git a/payjoin-mailroom/src/directory.rs b/payjoin-mailroom/src/directory.rs index 0cd51ed0a..ff55e7518 100644 --- a/payjoin-mailroom/src/directory.rs +++ b/payjoin-mailroom/src/directory.rs @@ -591,7 +591,13 @@ mod tests { async fn test_service(v1: Option) -> Service { let dir = tempfile::tempdir().expect("tempdir"); - let db = FilesDb::init(Duration::from_millis(100), dir.keep()).await.expect("db init"); + let db = FilesDb::init( + Duration::from_millis(100), + dir.keep(), + Duration::from_secs(60 * 60 * 24 * 7), + ) + .await + .expect("db init"); let ohttp: ohttp::Server = crate::key_config::gen_ohttp_server_config().expect("ohttp config").into(); Service::new(db, ohttp, SentinelTag::new([0u8; 32]), v1) @@ -824,7 +830,7 @@ mod tests { let metrics = MetricsService::new(Some(provider.clone())); let dir = tempfile::tempdir().expect("tempdir"); - let db = FilesDb::init(Duration::from_millis(100), dir.keep()).await.expect("db init"); + let db = FilesDb::init(Duration::from_millis(100), dir.keep(), Duration::from_secs(60 * 60 * 24 * 7)).await.expect("db init"); let db = MetricsDb::new(db, metrics); let ohttp: ohttp::Server = crate::key_config::gen_ohttp_server_config().expect("ohttp config").into(); diff --git a/payjoin-mailroom/src/lib.rs b/payjoin-mailroom/src/lib.rs index ca91cc712..ccd075f7b 100644 --- a/payjoin-mailroom/src/lib.rs +++ b/payjoin-mailroom/src/lib.rs @@ -228,7 +228,9 @@ async fn init_directory( sentinel_tag: SentinelTag, metrics: &MetricsService, ) -> anyhow::Result { - let files_db = crate::db::FilesDb::init(config.timeout, config.storage_dir.clone()).await?; + let files_db = + crate::db::FilesDb::init(config.timeout, config.storage_dir.clone(), config.mailbox_ttl) + .await?; files_db.spawn_background_prune().await; let db = crate::db::MetricsDb::new(crate::db::DbServiceAdapter::new(files_db), metrics.clone());