diff --git a/Cargo.lock b/Cargo.lock index 34254722..d11510d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4443,6 +4443,22 @@ dependencies = [ "zstd", ] +[[package]] +name = "synddb-wal-sequencer" +version = "0.1.0" +dependencies = [ + "clap", + "humantime", + "rusqlite", + "serde", + "synddb-shared", + "thiserror 1.0.69", + "tracing", + "tracing-subscriber", + "tracing-test", + "zerocopy", +] + [[package]] name = "synstructure" version = "0.13.2" @@ -4857,6 +4873,27 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tracing-test" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68" +dependencies = [ + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" +dependencies = [ + "quote", + "syn 2.0.108", +] + [[package]] name = "try-lock" version = "0.2.5" @@ -5487,18 +5524,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.27" +version = "0.8.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0894878a5fa3edfd6da3f88c4805f4c8558e2b996227a3d864f47fe11e38282c" +checksum = "fd74ec98b9250adb3ca554bdde269adf631549f51d8a8f8f0a10b50f1cb298c3" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.27" +version = "0.8.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88d2b8d9c68ad2b9e4340d7832716a4d21a22a1154777ad56ea55c51a9cf3831" +checksum = "d8a8d209fdf45cf5138cbb5a506f6b52522a25afccc534d1475dad8e31105c6a" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 058e29e6..0601686a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "crates/synddb-client", "crates/synddb-chain-monitor", "crates/synddb-shared", + "crates/synddb-wal-sequencer", "tests/e2e/runner", "tests/e2e-gcs/runner", ] @@ -66,6 +67,7 @@ base64 = "0.22" zstd = "0.13" sha2 = "0.10" sha3 = "0.10" +zerocopy = "0.8.31" # CBOR/COSE serialization ciborium = "0.2" diff --git a/crates/synddb-shared/src/lib.rs b/crates/synddb-shared/src/lib.rs index 1c65213d..1f31bd69 100644 --- a/crates/synddb-shared/src/lib.rs +++ b/crates/synddb-shared/src/lib.rs @@ -3,3 +3,4 @@ pub mod parse; pub mod runtime; pub mod types; +pub mod utils; diff --git a/crates/synddb-shared/src/utils.rs b/crates/synddb-shared/src/utils.rs new file mode 100644 index 00000000..70a6781c --- /dev/null +++ b/crates/synddb-shared/src/utils.rs @@ -0,0 +1,61 @@ +use alloy::primitives::keccak256; +use std::{ + fs::{self}, + ops::Deref, + panic, + path::{Path, PathBuf}, + thread, + time::{SystemTime, UNIX_EPOCH}, +}; + +#[derive(Debug)] +pub struct TmpDir(PathBuf); + +impl Deref for TmpDir { + type Target = Path; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Drop for TmpDir { + fn drop(&mut self) { + let _ = fs::remove_dir_all(&self.0); + } +} + +impl From<&TmpDir> for PathBuf { + fn from(tmp: &TmpDir) -> Self { + tmp.0.clone() + } +} + +/// Returns a unique temporary path for tests. +/// +/// The path is constructed by: +/// 1. Getting the caller's source location (file and line) +/// 2. Appending the current timestamp in nanoseconds, process ID, and thread ID +/// 3. Hashing the combined string +/// 4. Creating a path in the system temp directory with format `"{prefix}_{hash}"` +/// +/// This ensures unique paths for concurrent tests by including both the test location, +/// process ID, and thread ID for debugging. +pub fn tmp_dir(prefix: &str, dir: Option) -> TmpDir { + let location = panic::Location::caller(); + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos(); + let thread_id = thread::current().id(); + let process_id = std::process::id(); + + let input = format!("{location}:{timestamp}:{process_id}:{thread_id:?}"); + let hash = keccak256(input.as_bytes()); + let hash_hex = alloy::hex::encode(hash); + + let dir = dir + .unwrap_or_else(std::env::temp_dir) + .join(format!("{prefix}_{hash_hex}")); + fs::create_dir_all(&dir).unwrap(); + TmpDir(dir) +} diff --git a/crates/synddb-wal-sequencer/Cargo.toml b/crates/synddb-wal-sequencer/Cargo.toml new file mode 100644 index 00000000..8185d577 --- /dev/null +++ b/crates/synddb-wal-sequencer/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "synddb-wal-sequencer" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +repository.workspace = true + +[[bin]] +name = "synddb-wal-sequencer" +path = "src/main.rs" + +[lints] +workspace = true + +[dependencies] +clap = { workspace = true, features = ["env"] } +humantime = { workspace = true } +rusqlite = { workspace = true, features = ["bundled", "session", "hooks"] } +serde = { workspace = true } +thiserror = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +zerocopy = { workspace = true, features = ["derive"] } +synddb-shared = { path = "../synddb-shared/" } + +[dev-dependencies] +tracing-test = "0.2" diff --git a/crates/synddb-wal-sequencer/src/lib.rs b/crates/synddb-wal-sequencer/src/lib.rs new file mode 100644 index 00000000..eebdc5f0 --- /dev/null +++ b/crates/synddb-wal-sequencer/src/lib.rs @@ -0,0 +1,3 @@ +pub mod storage_layer; +pub mod wal_monitor; +pub mod wal_reader; diff --git a/crates/synddb-wal-sequencer/src/main.rs b/crates/synddb-wal-sequencer/src/main.rs new file mode 100644 index 00000000..6dd39a37 --- /dev/null +++ b/crates/synddb-wal-sequencer/src/main.rs @@ -0,0 +1,185 @@ +use std::{path::PathBuf, thread, time::Duration}; + +use clap::Parser; +use synddb_wal_sequencer::{ + storage_layer::{watch_and_sync_to_storage, StorageLayer}, + wal_monitor::monitor_wal, +}; +use tracing::info; + +fn parse_duration(s: &str) -> Result { + humantime::parse_duration(s) +} + +#[derive(Debug, Clone, Parser)] +#[command(name = "synddb-wal-sequencer")] +#[command(about = "WAL sequencer for SQLite replication")] +pub struct Config { + #[arg(long, env = "DB_PATH")] + pub db_path: PathBuf, + + #[arg(long, env = "WAL_BACKUPS_DIR")] + pub wal_backups_dir: PathBuf, + + #[allow(clippy::doc_markdown)] + /// the storage layer to be used. examples: + /// - filesystem:/path/to/dir + #[arg(long, env = "STORAGE_LAYER")] + pub storage_layer: StorageLayer, + + /// interval between WAL checkpoints (e.g., "1s", "500ms") + #[arg(long, env = "CHECKPOINT_INTERVAL", default_value = "1s", value_parser = parse_duration)] + pub checkpoint_interval: Duration, + + /// interval between storage sync checks (e.g., "1s", "500ms") + #[arg(long, env = "STORAGE_SYNC_INTERVAL", default_value = "1s", value_parser = parse_duration)] + pub storage_sync_interval: Duration, +} + +fn main() { + tracing_subscriber::fmt::init(); + let config = Box::leak(Box::new(Config::parse())); + start(config); +} + +fn start(config: &'static Config) { + let wal_monitor_handle = thread::Builder::new() + .name("wal_monitor".into()) + .spawn(move || { + info!("starting WAL monitor"); + monitor_wal( + &config.db_path, + &config.wal_backups_dir, + config.checkpoint_interval, + ); + }) + .unwrap(); + + let storage_handle = thread::Builder::new() + .name("storage_service".into()) + .spawn(|| { + info!("starting Storage service"); + watch_and_sync_to_storage( + &config.wal_backups_dir, + &config.storage_layer, + config.storage_sync_interval, + ); + }) + .unwrap(); + + wal_monitor_handle.join().expect("monitor thread panicked"); + storage_handle.join().expect("uploader thread panicked"); +} + +#[cfg(test)] +mod tests { + use super::*; + use std::{fs, path::Path, thread, time::Duration}; + + use rusqlite::Connection; + use synddb_shared::utils::tmp_dir; + use synddb_wal_sequencer::{ + storage_layer::StorageLayer, + wal_monitor::{JOURNAL_MODE, WAL, WAL_AUTOCHECKPOINT}, + }; + use tracing_test::traced_test; + + use crate::Config; + + fn apply_wal>(wal_path: P, db_path: P) { + let db_path = db_path.as_ref(); + let wal_dest = db_path.with_extension("db-wal"); + + fs::copy(&wal_path, &wal_dest).expect("failed to copy WAL file"); + + let conn = Connection::open(db_path).expect("failed to open db"); + conn.pragma_update(None, "wal_checkpoint", "TRUNCATE") + .expect("failed to checkpoint"); + } + + #[test] + #[traced_test] + fn basic_functionality() { + let db_dir = tmp_dir("db", None); + let wal_backups_dir = tmp_dir("db_backup", None); + let storage_dir = tmp_dir("storage", None); + let new_db_dir = tmp_dir("new_db", None); + + let db_path = db_dir.join("test.db"); + + // start monitor and storage sync in background threads + let config = Box::leak(Box::new(Config { + db_path: db_path.clone(), + wal_backups_dir: (&wal_backups_dir).into(), + storage_layer: StorageLayer::Filesystem((&storage_dir).into()), + checkpoint_interval: Duration::from_secs(1), + storage_sync_interval: Duration::from_secs(1), + })); + let _handle = thread::spawn(move || start(config)); + + thread::sleep(Duration::from_millis(20)); + + // simulate an application writing to the DB + { + let conn = Connection::open(&db_path).unwrap(); + conn.pragma_update(None, WAL_AUTOCHECKPOINT, 0).unwrap(); + + conn.execute( + "CREATE TABLE syndicate (id INTEGER PRIMARY KEY, value TEXT)", + [], + ) + .unwrap(); + + for i in 0..10 { + conn.execute( + "INSERT INTO syndicate (value) VALUES (?1)", + [format!("value_{i}")], + ) + .unwrap(); + } + } + + // collect and sort WAL files from storage + let mut attempts = 0u64; + let mut wal_files = vec![]; + loop { + wal_files = fs::read_dir(&*storage_dir) + .unwrap() + .filter_map(|e| e.ok()) + .map(|e| e.path()) + .collect(); + if !wal_files.is_empty() { + break; + }; + attempts += 1; + assert!(attempts <= 50, "timeout waiting for WAL files in storage"); + thread::sleep(Duration::from_millis(100)); + } + + wal_files.sort(); + + // apply WAL files to new database + let new_db_path = new_db_dir.join("test.db"); + let new_conn = Connection::open(&new_db_path).unwrap(); + new_conn.pragma_update(None, JOURNAL_MODE, WAL).unwrap(); + for wal_file in &wal_files { + apply_wal(wal_file, &new_db_path); + } + + // verify data consistency + let mut stmt = new_conn + .prepare("SELECT id, value FROM syndicate ORDER BY id") + .unwrap(); + let rows: Vec<(i64, String)> = stmt + .query_map([], |row| Ok((row.get(0)?, row.get(1)?))) + .unwrap() + .map(|r| r.unwrap()) + .collect(); + + assert_eq!(rows.len(), 10); + for (i, (id, value)) in rows.iter().enumerate() { + assert_eq!(*id, (i + 1) as i64); + assert_eq!(*value, format!("value_{i}")); + } + } +} diff --git a/crates/synddb-wal-sequencer/src/storage_layer.rs b/crates/synddb-wal-sequencer/src/storage_layer.rs new file mode 100644 index 00000000..e8946534 --- /dev/null +++ b/crates/synddb-wal-sequencer/src/storage_layer.rs @@ -0,0 +1,130 @@ +use std::{ + fs::{self, DirEntry}, + io::Error, + path::{Path, PathBuf}, + time::{Duration, Instant}, +}; + +use tracing::trace; + +#[derive(Debug, Clone)] +pub enum StorageLayer { + Noop, + Filesystem(PathBuf), +} + +impl std::str::FromStr for StorageLayer { + type Err = String; + + fn from_str(s: &str) -> Result { + if s == "noop" { + Ok(Self::Noop) + } else if let Some(path) = s.strip_prefix("filesystem:") { + Ok(Self::Filesystem(PathBuf::from(path))) + } else { + Err(format!( + "invalid storage layer: {s}. Expected 'noop' or 'filesystem:/path'" + )) + } + } +} + +/// watches a wal `backups_dir` and uploads files to a `storage_layer` +pub fn watch_and_sync_to_storage>( + backups_dir: P, + storage_layer: &StorageLayer, + sync_interval: Duration, +) { + match storage_layer { + StorageLayer::Filesystem(target_dir) => { + do_watch_and_sync(backups_dir, FsStorage::new(target_dir), sync_interval); + } + StorageLayer::Noop => { + do_watch_and_sync(backups_dir, NoopStorage {}, sync_interval); + } + }; +} + +fn do_watch_and_sync(dir: P, storage: S, sync_interval: Duration) +where + P: AsRef, + S: StorageAdapter, +{ + trace!(dir = ?dir.as_ref(), "starting watch_and_sync"); + loop { + let wal_files = wal_files_in_dir(&dir) + .unwrap_or_else(|e| panic!("failed to obtain wal_files in dir: {e}")); + + for wal in wal_files { + let filename = wal.file_name().to_string_lossy().into_owned(); + if !storage.has_file(&filename) { + // TODO this is a naive implementation that reads the entire file to RAM ,can be + // improved + let start = Instant::now(); + let contents = fs::read(wal.path()) + .unwrap_or_else(|e| panic!("failed to read file {filename}: {e}")); + storage + .upload(&filename, &contents) + .unwrap_or_else(|e| panic!("failed to upload file {filename}: {e}")); + trace!(%filename, "uploaded, took {} ns", start.elapsed().as_nanos()); + } + fs::remove_file(wal.path()) + .unwrap_or_else(|e| panic!("failed to remove file: {wal:?}, error: {e}")); + } + + std::thread::sleep(sync_interval); + } +} + +/// returns the list of files in the dir sorted alphabetically +fn wal_files_in_dir>(dir: P) -> Result, Error> { + let mut entries: Vec<_> = fs::read_dir(dir)?.filter_map(|e| e.ok()).collect(); + entries.sort_by_key(|e| e.path()); + Ok(entries) +} + +pub trait StorageAdapter { + fn has_file(&self, filename: &str) -> bool; + fn upload(&self, filename: &str, contents: &[u8]) -> Result<(), String>; +} + +// ------------------- Storage Adapter implementations + +/// TODO remove +struct NoopStorage {} +impl StorageAdapter for NoopStorage { + fn has_file(&self, _filename: &str) -> bool { + false + } + + fn upload(&self, _filename: &str, _contents: &[u8]) -> Result<(), String> { + // noop + Ok(()) + } +} + +/// Simple filesystem storage (moves files to another dir), useful for tests +#[derive(Debug)] +pub struct FsStorage { + target_dir: PathBuf, +} + +impl FsStorage { + pub fn new>(target_dir: P) -> Self { + let target_dir = target_dir.as_ref().to_path_buf(); + fs::create_dir_all(&target_dir) + .unwrap_or_else(|e| panic!("unable to create dir {target_dir:?}: {e}")); + Self { target_dir } + } +} + +impl StorageAdapter for FsStorage { + fn has_file(&self, filename: &str) -> bool { + self.target_dir.join(filename).exists() + } + + fn upload(&self, filename: &str, contents: &[u8]) -> Result<(), String> { + let path = self.target_dir.join(filename); + fs::write(&path, contents).map_err(|e| format!("failed to write {}: {e}", path.display())) + } +} diff --git a/crates/synddb-wal-sequencer/src/wal_monitor.rs b/crates/synddb-wal-sequencer/src/wal_monitor.rs new file mode 100644 index 00000000..5774abdf --- /dev/null +++ b/crates/synddb-wal-sequencer/src/wal_monitor.rs @@ -0,0 +1,126 @@ +use core::panic; +use std::{ + fs, + path::Path, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, +}; + +use rusqlite::{Connection, Transaction}; +use tracing::trace; + +pub const JOURNAL_MODE: &str = "journal_mode"; +pub const WAL_AUTOCHECKPOINT: &str = "wal_autocheckpoint"; +pub const WAL_CHECKPOINT: &str = "wal_checkpoint"; +pub const TRUNCATE: &str = "TRUNCATE"; +pub const WAL: &str = "WAL"; + +pub fn monitor_wal>(db_path: P, wal_backups_dir: P, checkpoint_interval: Duration) { + let wal_path = db_path.as_ref().with_extension("db-wal"); + // reconcile storage layer state with DB / WAL /backup files + + // assert correct DB configuration is in place: + // jornal_mode = WAL + // wal_checkpoint = 0 + let mut conn = if db_path.as_ref().exists() { + // existing DB + let conn = Connection::open(db_path).unwrap(); + let journal_mode: String = conn + .pragma_query_value(None, JOURNAL_MODE, |row| row.get(0)) + .unwrap(); + assert_eq!( + journal_mode.to_uppercase(), + WAL, + "Database must be in WAL mode" + ); + conn + } else { + // new DB + let conn = Connection::open(db_path).unwrap(); + conn.pragma_update(None, JOURNAL_MODE, WAL).unwrap(); + conn + }; + + // wal_checkpoint is per-connection + conn.pragma_update(None, WAL_AUTOCHECKPOINT, 0).unwrap(); + + trace!(?wal_path, "monitor_wal started"); + + // NOTE: this read lock is necessary to prevent other applications from checkpointing + #[allow(clippy::collection_is_never_read)] + let mut _read_lock = acquire_read_lock(&mut conn); + + loop { + std::thread::sleep(checkpoint_interval); + + let wal_size = fs::metadata(&wal_path).map(|m| m.len()).unwrap_or(0); + if wal_size == 0 { + trace!("WAL is empty."); + continue; + } + trace!(wal_size, "WAL contents found."); + + _read_lock = None; // rollbacks the read_tx and releases the lock + + // NOTE: to avoid changes while the WAL data is collected, we temporarily lock db access with this tx (notice the locking mode) + // https://www.sqlite.org/lockingv3.html#locking + // this is how litestream does it for reference: https://github.com/benbjohnson/litestream/blob/e1d5aad75bc67735732b54a252d98685c502288b/db.go#L544 + // TODO I think Immediate is enough here, but maybe Exclusive is warranted? + // TODO make some tests that assert this actually properly locks the DB as expected + let lock_tx = conn + .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate) + .unwrap(); + + // TODO using timestamp for simplicity, but a sequence number would be better (should come + // from the storage layer) + + let start = Instant::now(); + let ts = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis(); + let wal_backup = wal_backups_dir.as_ref().join(format!("{ts}.db-wal")); + fs::copy(&wal_path, &wal_backup) + .unwrap_or_else(|e| panic!("failed to backup WAL file: {e}")); + + trace!( + ?wal_backup, + "WAL backup complete, took {} ns", + start.elapsed().as_nanos() + ); + + // release the lock + lock_tx.rollback().expect("failed to rollback lock_tx"); + + // TODO address this: + // NOTE: there is a race condition where the application could write to the db + // between the release of the read lock and the WAL_CHECKPOINT being applied + // litestream protects against this by allowing the inconsistent state to occur, then + // applying a snapshot to catch up: https://github.com/benbjohnson/litestream/blob/92fc139923d2b13909ba8b0e5df8b63d45a91648/db.go#L940-L942 + + // checkpoint + // https://www.sqlite.org/pragma.html#pragma_wal_checkpoint + conn.pragma_update(None, WAL_CHECKPOINT, TRUNCATE) + .unwrap_or_else(|e| panic!("failed to initiate WAL checkpoint: {e}")); + + // re-aquire the read-lock immediately + _read_lock = acquire_read_lock(&mut conn); + } +} + +fn acquire_read_lock(conn: &mut Connection) -> Option> { + let read_lock = conn + .transaction_with_behavior(rusqlite::TransactionBehavior::Deferred) + .unwrap(); + let _: i32 = read_lock + .query_row("SELECT COUNT(1) FROM sqlite_schema", [], |row| row.get(0)) + .unwrap_or_else(|e| panic!("failed to get read_lock tx: {e}")); + Some(read_lock) +} + +#[cfg(test)] +mod tests { + // use super::*; + + #[test] + fn todo_test() {} +} diff --git a/crates/synddb-wal-sequencer/src/wal_reader.rs b/crates/synddb-wal-sequencer/src/wal_reader.rs new file mode 100644 index 00000000..66150b25 --- /dev/null +++ b/crates/synddb-wal-sequencer/src/wal_reader.rs @@ -0,0 +1,244 @@ +use std::{ + fs::File, + io::{self, Read}, + path::Path, +}; +use thiserror::Error; +use zerocopy::{byteorder::Order, FromBytes, Immutable, KnownLayout, BE, LE, U32}; + +#[derive(Debug, Error)] +pub enum WalError { + #[error("IO error: {0}")] + Io(#[from] io::Error), + #[error("invalid WAL header bytes: {0}")] + InvalidHeaderBytes(String), + #[error("invalid WAL frame header bytes: {0}")] + InvalidFrameHeaderBytes(String), +} + +// https://www.sqlite.org/fileformat.html#the_write_ahead_log +#[derive(Debug)] +pub struct WalState { + pub header: WalHeader, + pub frames: Vec, +} + +const WAL_HEADER_SIZE: usize = 32; +const WAL_FRAME_HEADER_SIZE: usize = 24; +const FILE_FORMAT_VERSION: u32 = 3007000; + +pub fn read_wal_file>(path: P) -> Result { + let mut file = File::open(path)?; + + let mut header_bytes = [0u8; WAL_HEADER_SIZE]; + file.read_exact(&mut header_bytes)?; + + let header = WalHeader::read_from_bytes(&header_bytes) + .map_err(|e| WalError::InvalidHeaderBytes(e.to_string()))?; + + let checksum_byte_order = header.byte_order(); + + let (mut checksum_1, mut checksum_2) = + wal_checksum(&checksum_byte_order, 0, 0, &header_bytes[..24]); + + assert_eq!( + checksum_1, + header.checksum_1.get(), + "WAL header checksum 1 mismatch" + ); + assert_eq!( + checksum_2, + header.checksum_2.get(), + "WAL header checksum 2 mismatch" + ); + + assert_eq!( + header.file_format_version, FILE_FORMAT_VERSION, + "invalid file format version on WAL header" + ); + + let page_size: usize = header.page_size.get().try_into().unwrap(); + let mut frames = vec![]; + loop { + let mut frame_header_bytes = [0u8; WAL_FRAME_HEADER_SIZE]; + match file.read_exact(&mut frame_header_bytes) { + Ok(_) => {} + Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break, + Err(e) => return Err(e.into()), + } + let frame_header = WalFrameHeader::read_from_bytes(&frame_header_bytes) + .map_err(|e| WalError::InvalidFrameHeaderBytes(e.to_string()))?; + + let mut data = vec![0u8; page_size]; + file.read_exact(&mut data)?; + + (checksum_1, checksum_2) = wal_checksum( + &checksum_byte_order, + checksum_1, + checksum_2, + &frame_header_bytes[..8], + ); + (checksum_1, checksum_2) = + wal_checksum(&checksum_byte_order, checksum_1, checksum_2, &data); + assert_eq!( + checksum_1, + frame_header.checksum_1.get(), + "WAL frame checksum 1 mismatch" + ); + assert_eq!( + checksum_2, + frame_header.checksum_2.get(), + "WAL frame checksum 2 mismatch" + ); + + frames.push(WalFrame { + header: frame_header, + data, + }); + } + + Ok(WalState { header, frames }) +} + +fn wal_checksum(byte_order: &Order, mut s0: u32, mut s1: u32, data: &[u8]) -> (u32, u32) { + assert!(data.len().is_multiple_of(8), "checksum bad data length"); + + for chunk in data.chunks_exact(8) { + let (d0, d1) = match byte_order { + Order::BigEndian => { + let vals = <[U32; 2]>::read_from_bytes(chunk).unwrap(); + (vals[0].get(), vals[1].get()) + } + Order::LittleEndian => { + let vals = <[U32; 2]>::read_from_bytes(chunk).unwrap(); + (vals[0].get(), vals[1].get()) + } + }; + s0 = s0.wrapping_add(d0).wrapping_add(s1); + s1 = s1.wrapping_add(d1).wrapping_add(s0); + } + + (s0, s1) +} + +#[derive(Debug, FromBytes, Immutable, KnownLayout)] +#[repr(C)] +pub struct WalHeader { + pub magic_number: U32, + pub file_format_version: U32, + pub page_size: U32, + pub checkpoint_seq: U32, + pub salt_1: U32, + pub salt_2: U32, + pub checksum_1: U32, + pub checksum_2: U32, +} + +impl WalHeader { + fn byte_order(&self) -> Order { + match self.magic_number.get() { + 0x377f0682 => Order::LittleEndian, + 0x377f0683 => Order::BigEndian, + magic => panic!("invalid WAL magic number: {:#010x}", magic), + } + } +} + +#[derive(Debug, FromBytes, Immutable, KnownLayout)] +#[repr(C)] +struct WalFrameHeader { + page_number: U32, + /// expressed in pages after commit (0 for non-commit frames) + db_size_after_commit: U32, + salt_1: U32, + salt_2: U32, + checksum_1: U32, + checksum_2: U32, +} + +#[derive(Debug)] +pub struct WalFrame { + header: WalFrameHeader, + data: Vec, +} + +#[cfg(test)] +mod tests { + use super::*; + use rusqlite::Connection; + use synddb_shared::utils::tmp_dir; + + #[test] + fn test_wal_checksum_little_endian() { + let data = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]; + let (s0, s1) = wal_checksum(&Order::LittleEndian, 0, 0, &data); + + // d0 = 0x04030201, d1 = 0x08070605 + // s0 = 0 + 0x04030201 + 0 = 0x04030201 + // s1 = 0 + 0x08070605 + 0x04030201 = 0x0c0a0806 + assert_eq!(s0, 0x04030201); + assert_eq!(s1, 0x0c0a0806); + } + + #[test] + fn test_wal_checksum_big_endian() { + let data = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]; + let (s0, s1) = wal_checksum(&Order::BigEndian, 0, 0, &data); + + // d0 = 0x01020304, d1 = 0x05060708 + // s0 = 0 + 0x01020304 + 0 = 0x01020304 + // s1 = 0 + 0x05060708 + 0x01020304 = 0x06080a0c + assert_eq!(s0, 0x01020304); + assert_eq!(s1, 0x06080a0c); + } + + #[test] + fn test_wal_checksum_chained() { + let data1 = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]; + let data2 = [0x10, 0x20, 0x30, 0x40, 0x50, 0x60, 0x70, 0x80]; + + let (s0, s1) = wal_checksum(&Order::LittleEndian, 0, 0, &data1); + let (s0, s1) = wal_checksum(&Order::LittleEndian, s0, s1, &data2); + + // Verify chaining works by computing the same thing in one call + let mut combined = [0u8; 16]; + combined[..8].copy_from_slice(&data1); + combined[8..].copy_from_slice(&data2); + let (expected_s0, expected_s1) = wal_checksum(&Order::LittleEndian, 0, 0, &combined); + + assert_eq!(s0, expected_s0); + assert_eq!(s1, expected_s1); + } + + #[test] + #[should_panic(expected = "checksum bad data length")] + fn test_wal_checksum_bad_length() { + let data = [0x01, 0x02, 0x03, 0x04, 0x05]; + wal_checksum(&Order::LittleEndian, 0, 0, &data); + } + + #[test] + fn test_read_wal_file_with_sqlite() { + let dir = tmp_dir("read_wal_file_with_sqlite", None); + let db_path = dir.join("test.db"); + let wal_path = dir.join("test.db-wal"); + + let conn = Connection::open(&db_path).unwrap(); + + conn.pragma_update(None, "journal_mode", "WAL").unwrap(); + + conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)", []) + .unwrap(); + + for i in 0..10 { + conn.execute( + "INSERT INTO test (value) VALUES (?1)", + [format!("value_{}", i)], + ) + .unwrap(); + } + + let wal_state = read_wal_file(&wal_path).unwrap(); + assert!(!wal_state.frames.is_empty()) + } +}