diff --git a/Cargo.toml b/Cargo.toml index 31693d8e..83d7e630 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ path = "tests/functional_tests.rs" [workspace] members = [ "libazureinit", + "libazureinit-kvp", ] [features] diff --git a/doc/kvp_pr_description.md b/doc/kvp_pr_description.md new file mode 100644 index 00000000..b008dfb4 --- /dev/null +++ b/doc/kvp_pr_description.md @@ -0,0 +1,74 @@ +# PR: Add `libazureinit-kvp` with unified KVP pool store + +## Summary + +- Adds workspace member `libazureinit-kvp` in root `Cargo.toml` +- Adds `libazureinit-kvp/Cargo.toml` +- Adds `libazureinit-kvp/src/lib.rs` and `libazureinit-kvp/src/kvp_pool.rs` + +## Crate Design + +- One trait: `KvpStore` +- One implementation: `KvpPoolStore` + +`KvpStore` splits each operation into a `backend_*` method (raw I/O, +provided by the implementor) and a public method (`write`, `read`, +`clear`) that validates inputs then delegates to the backend. + +Public API: + +- `write`, `read` (validate then delegate to `backend_write`/`backend_read`) +- `entries`, `entries_raw` +- `delete`, `clear` +- `is_stale` + +Validation is centralized in trait default methods and policy-aware via: + +- `max_key_size(&self)` +- `max_value_size(&self)` + +`KvpPoolStore` is file-backed using Hyper-V KVP wire format +(fixed-size 512-byte key + 2048-byte value records), with lock-based concurrency. + +## Policy and Limits + +Constructor: + +- `new(path: Option, mode: PoolMode, truncate_on_stale: bool)` + +`PoolMode`: + +- `Restricted` (default): key <= 254 bytes, value <= 1022 bytes +- `Full`: key <= 512 bytes, value <= 2048 bytes + +Behavior: + +- Default path when `None`: `/var/lib/hyperv/.kvp_pool_1` +- Unique key cap: 1024 + - new key beyond cap is rejected + - overwrite of existing key at cap is allowed +- `clear()` truncates the store +- `truncate_on_stale` keeps truncation caller-controlled + +## Errors + +`KvpError` includes explicit variants: + +- `EmptyKey` +- `KeyContainsNull` +- `KeyTooLarge { max, actual }` +- `ValueTooLarge { max, actual }` +- `MaxUniqueKeysExceeded { max }` +- `Io(io::Error)` + +## Testing + +17 tests covering: + +- restricted/full key/value boundary checks +- default and explicit path behavior +- mode getter +- unique-key cap behavior (including overwrite-at-cap and add-after-delete) +- `entries` last-write-wins and `entries_raw` duplicate preservation +- `delete`, `clear`, and stale checks +- key validation (empty and null-byte) diff --git a/doc/libazurekvp.md b/doc/libazurekvp.md deleted file mode 100644 index 476bdcb1..00000000 --- a/doc/libazurekvp.md +++ /dev/null @@ -1,126 +0,0 @@ -# Azure-init Tracing System - -## Overview - -Azure-init implements a comprehensive tracing system that captures detailed information about the provisioning process. -This information is crucial for monitoring, debugging, and troubleshooting VM provisioning issues in Azure environments. -The tracing system is built on a multi-layered architecture that provides flexibility and robustness. - -## Architecture - -The tracing architecture consists of four specialized layers, each handling a specific aspect of the tracing process: - -### 1. EmitKVPLayer - -**Purpose**: Processes spans and events by capturing metadata, generating key-value pairs (KVPs), and writing to Hyper-V's data exchange file. - -**Key Functions**: -- Captures span lifecycle events (creation, entry, exit, closing) -- Processes emitted events within spans -- Formats data as KVPs for Hyper-V consumption -- Writes encoded data to `/var/lib/hyperv/.kvp_pool_1` - -Additionally, events emitted with a `health_report` field are written as special provisioning reports using the key `PROVISIONING_REPORT`. - -**Integration with Azure**: -- The `/var/lib/hyperv/.kvp_pool_1` file is monitored by the Hyper-V `hv_kvp_daemon` service -- This enables key metrics and logs to be transferred from the VM to the Azure platform -- Administrators can access this data through the Azure portal or API - -### 2. OpenTelemetryLayer - -**Purpose**: Propagates tracing context and prepares span data for export. - -**Key Functions**: -- Maintains distributed tracing context across service boundaries -- Exports standardized trace data to compatible backends -- Enables integration with broader monitoring ecosystems - -### 3. Stderr Layer - -**Purpose**: Formats and logs trace data to stderr. - -**Key Functions**: -- Provides human-readable logging for immediate inspection -- Supports debugging during development -- Captures trace events even when other layers might fail - -### 4. File Layer - -**Purpose**: Writes formatted logs to a file (default path: `/var/log/azure-init.log`). - -**Key Functions**: -- Provides a persistent log for post-provisioning inspection -- Uses file permissions `0600` when possible -- Log level controlled by `AZURE_INIT_LOG` (defaults to `info` for the file layer) - -## How the Layers Work Together - -Despite operating independently, these layers collaborate to provide comprehensive tracing: - -1. **Independent Processing**: Each layer processes spans and events without dependencies on other layers -2. **Ordered Execution**: Layers are executed in the order they are registered in `setup_layers` (stderr, OpenTelemetry, KVP if enabled, file if available) -3. **Complementary Functions**: Each layer serves a specific purpose in the tracing ecosystem: - - `EmitKVPLayer` focuses on Azure Hyper-V integration - - `OpenTelemetryLayer` handles standardized tracing and exports - - `Stderr Layer` provides immediate visibility for debugging - -### Configuration - -The tracing system's behavior is controlled through configuration files and environment variables, allowing more control over what data is captured and where it's sent: - -- `telemetry.kvp_diagnostics` (config): Enables/disables KVP emission. Default: `true`. -- `telemetry.kvp_filter` (config): Optional `EnvFilter`-style directives to select which spans/events go to KVP. -- `azure_init_log_path.path` (config): Target path for the file layer. Default: `/var/log/azure-init.log`. -- `AZURE_INIT_KVP_FILTER` (env): Overrides `telemetry.kvp_filter`. Precedence: env > config > default. -- `AZURE_INIT_LOG` (env): Controls stderr and file fmt layers’ levels (defaults: stderr=`error`, file=`info`). - -The KVP layer uses a conservative default filter aimed at essential provisioning signals; adjust that via the settings above as needed. -For more on how to use these configuration variables, see the [configuration documentation](./configuration.md#complete-configuration-example). - -## Practical Usage - -### Instrumenting Functions - -To instrument code with tracing, use the `#[instrument]` attribute on functions: - -```rust -use tracing::{instrument, Level, event}; - -#[instrument(fields(user_id = ?user.id))] -async fn provision_user(user: User) -> Result<(), Error> { - event!(Level::INFO, "Starting user provisioning"); - - // Function logic - - event!(Level::INFO, "User provisioning completed successfully"); - Ok(()) -} -``` - -### Emitting Events - -To record specific points within a span: - -```rust -use tracing::{event, Level}; - -fn configure_ssh_keys(user: &str, keys: &[String]) { - event!(Level::INFO, user = user, key_count = keys.len(), "Configuring SSH keys"); - - for (i, key) in keys.iter().enumerate() { - event!(Level::DEBUG, user = user, key_index = i, "Processing SSH key"); - // Process each key - } - - event!(Level::INFO, user = user, "SSH keys configured successfully"); -} -``` - -## Reference Documentation - -For more details on how the Hyper-V Data Exchange Service works, refer to the official documentation: -[Hyper-V Data Exchange Service (KVP)](https://learn.microsoft.com/en-us/virtualization/hyper-v-on-windows/reference/integration-services#hyper-v-data-exchange-service-kvp) - -For OpenTelemetry integration details: -[OpenTelemetry for Rust](https://opentelemetry.io/docs/instrumentation/rust/) \ No newline at end of file diff --git a/libazureinit-kvp/Cargo.toml b/libazureinit-kvp/Cargo.toml new file mode 100644 index 00000000..0ace7528 --- /dev/null +++ b/libazureinit-kvp/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "libazureinit-kvp" +version = "0.1.0" +edition = "2021" +rust-version = "1.88" +repository = "https://github.com/Azure/azure-init/" +homepage = "https://github.com/Azure/azure-init/" +license = "MIT" +description = "Hyper-V KVP (Key-Value Pair) storage library for azure-init." + +[dependencies] +libc = "0.2" +serde = { version = "1", features = ["derive"] } +serde_json = "1" +sysinfo = "0.38" + +[dev-dependencies] +tempfile = "3" + +[lib] +name = "libazureinit_kvp" +path = "src/lib.rs" diff --git a/libazureinit-kvp/src/kvp_pool.rs b/libazureinit-kvp/src/kvp_pool.rs new file mode 100644 index 00000000..c68accb0 --- /dev/null +++ b/libazureinit-kvp/src/kvp_pool.rs @@ -0,0 +1,1498 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Unified KVP pool file backend for Hyper-V and Azure guests. +//! +//! The on-disk pool format is fixed-width and identical across +//! environments: +//! - key field: 512 bytes +//! - value field: 2048 bytes +//! - record size: 2560 bytes +//! +//! [`PoolMode`] selects which size limits are enforced on writes: +//! - [`Restricted`](PoolMode::Restricted) (default): key <= 254 bytes, +//! value <= 1022 bytes +//! - [`Full`](PoolMode::Full): key <= 512 bytes, value <= 2048 bytes +//! +//! ## Reference +//! - [Hyper-V Data Exchange Service (KVP)](https://learn.microsoft.com/en-us/virtualization/hyper-v-on-windows/reference/integration-services#hyper-v-data-exchange-service-kvp) + +use std::collections::{HashMap, HashSet}; +use std::fs::{File, OpenOptions}; +use std::io::{self, ErrorKind, Read, Seek, Write}; +use std::os::unix::fs::MetadataExt; +use std::os::unix::io::AsRawFd; +use std::path::{Path, PathBuf}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use sysinfo::System; + +use crate::{KvpError, KvpStore}; + +/// Default KVP pool path when none is provided. +pub const DEFAULT_KVP_POOL_PATH: &str = "/var/lib/hyperv/.kvp_pool_1"; + +const WIRE_MAX_KEY_BYTES: usize = 512; +const WIRE_MAX_VALUE_BYTES: usize = 2048; +const SAFE_MAX_KEY_BYTES: usize = 254; +const SAFE_MAX_VALUE_BYTES: usize = 1022; + +/// Maximum number of unique keys allowed in the pool. +const MAX_UNIQUE_KEYS: usize = 1024; + +const RECORD_SIZE: usize = WIRE_MAX_KEY_BYTES + WIRE_MAX_VALUE_BYTES; + +/// Acquire an exclusive (write) lock on the entire file. +/// +/// Uses `fcntl(F_OFD_SETLKW)` — open file description locks that are +/// per-FD (safe for multi-threaded use) yet conflict with traditional +/// `fcntl` record locks used by `hv_kvp_daemon.c` and cloud-init. +fn fcntl_lock_exclusive(file: &File) -> io::Result<()> { + let fl = libc::flock { + l_type: libc::F_WRLCK as libc::c_short, + l_whence: libc::SEEK_SET as libc::c_short, + l_start: 0, + l_len: 0, + l_pid: 0, + }; + if unsafe { libc::fcntl(file.as_raw_fd(), libc::F_OFD_SETLKW, &fl) } == -1 { + return Err(io::Error::last_os_error()); + } + Ok(()) +} + +/// Acquire a shared (read) lock on the entire file. +fn fcntl_lock_shared(file: &File) -> io::Result<()> { + let fl = libc::flock { + l_type: libc::F_RDLCK as libc::c_short, + l_whence: libc::SEEK_SET as libc::c_short, + l_start: 0, + l_len: 0, + l_pid: 0, + }; + if unsafe { libc::fcntl(file.as_raw_fd(), libc::F_OFD_SETLKW, &fl) } == -1 { + return Err(io::Error::last_os_error()); + } + Ok(()) +} + +/// Release a lock on the entire file. +fn fcntl_unlock(file: &File) -> io::Result<()> { + let fl = libc::flock { + l_type: libc::F_UNLCK as libc::c_short, + l_whence: libc::SEEK_SET as libc::c_short, + l_start: 0, + l_len: 0, + l_pid: 0, + }; + if unsafe { libc::fcntl(file.as_raw_fd(), libc::F_OFD_SETLK, &fl) } == -1 { + return Err(io::Error::last_os_error()); + } + Ok(()) +} + +/// Policy mode controlling key/value size limits for writes. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum PoolMode { + /// Conservative limits for Azure compatibility + /// (key <= 254 bytes, value <= 1022 bytes). + Restricted, + /// Full Hyper-V wire-format limits + /// (key <= 512 bytes, value <= 2048 bytes). + Full, +} + +impl PoolMode { + fn max_key_size(self) -> usize { + match self { + Self::Restricted => SAFE_MAX_KEY_BYTES, + Self::Full => WIRE_MAX_KEY_BYTES, + } + } + + fn max_value_size(self) -> usize { + match self { + Self::Restricted => SAFE_MAX_VALUE_BYTES, + Self::Full => WIRE_MAX_VALUE_BYTES, + } + } +} + +/// Unified KVP pool file store. +#[derive(Clone, Debug)] +pub struct KvpPoolStore { + path: PathBuf, + mode: PoolMode, +} + +impl KvpPoolStore { + /// Create a new KVP pool store. + /// + /// - `path`: pool file path, defaults to [`DEFAULT_KVP_POOL_PATH`]. + /// - `mode`: size-limit policy (see [`PoolMode`]). + /// - `truncate_on_stale`: if `true`, clears stale data from a + /// previous boot. + pub fn new( + path: Option, + mode: PoolMode, + truncate_on_stale: bool, + ) -> Result { + let store = Self { + path: path.unwrap_or_else(|| PathBuf::from(DEFAULT_KVP_POOL_PATH)), + mode, + }; + if truncate_on_stale && store.pool_is_stale()? { + store.truncate_pool()?; + } + Ok(store) + } + + /// Return a reference to the pool path. + pub fn path(&self) -> &Path { + &self.path + } + + /// The policy mode this store was created with. + pub fn mode(&self) -> PoolMode { + self.mode + } + + fn validate_key(&self, key: &str) -> Result<(), KvpError> { + if key.is_empty() { + return Err(KvpError::EmptyKey); + } + if key.as_bytes().contains(&0) { + return Err(KvpError::KeyContainsNull); + } + let actual = key.len(); + let max = self.mode.max_key_size(); + if actual > max { + return Err(KvpError::KeyTooLarge { max, actual }); + } + Ok(()) + } + + /// Looser validation for reads: accepts keys up to the full + /// wire-format maximum regardless of [`PoolMode`]. + fn validate_key_for_read(key: &str) -> Result<(), KvpError> { + if key.is_empty() { + return Err(KvpError::EmptyKey); + } + if key.as_bytes().contains(&0) { + return Err(KvpError::KeyContainsNull); + } + let actual = key.len(); + if actual > WIRE_MAX_KEY_BYTES { + return Err(KvpError::KeyTooLarge { + max: WIRE_MAX_KEY_BYTES, + actual, + }); + } + Ok(()) + } + + fn validate_value(&self, value: &str) -> Result<(), KvpError> { + let actual = value.len(); + let max = self.mode.max_value_size(); + if actual > max { + return Err(KvpError::ValueTooLarge { max, actual }); + } + Ok(()) + } + + fn boot_time() -> Result { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|e| io::Error::other(format!("clock error: {e}")))? + .as_secs(); + Ok(now.saturating_sub(get_uptime().as_secs()) as i64) + } + + fn pool_is_stale(&self) -> Result { + let metadata = match std::fs::metadata(&self.path) { + Ok(m) => m, + Err(ref e) if e.kind() == ErrorKind::NotFound => return Ok(false), + Err(e) => return Err(e.into()), + }; + let boot = Self::boot_time()?; + Ok(metadata.mtime() < boot) + } + + #[cfg(test)] + fn pool_is_stale_at_boot(&self, boot_time: i64) -> Result { + let metadata = match std::fs::metadata(&self.path) { + Ok(m) => m, + Err(ref e) if e.kind() == ErrorKind::NotFound => return Ok(false), + Err(e) => return Err(e.into()), + }; + Ok(metadata.mtime() < boot_time) + } + + fn truncate_pool(&self) -> Result<(), KvpError> { + let file = + match OpenOptions::new().read(true).write(true).open(&self.path) { + Ok(f) => f, + Err(ref e) if e.kind() == ErrorKind::NotFound => return Ok(()), + Err(e) => return Err(e.into()), + }; + + fcntl_lock_exclusive(&file).map_err(|e| { + io::Error::other(format!("failed to lock KVP file: {e}")) + })?; + let result = file.set_len(0).map_err(KvpError::from); + let _ = fcntl_unlock(&file); + result + } + + fn open_for_read(&self) -> io::Result { + OpenOptions::new().read(true).open(&self.path) + } + + fn open_for_read_write(&self) -> io::Result { + OpenOptions::new().read(true).write(true).open(&self.path) + } + + fn open_for_read_write_create(&self) -> io::Result { + OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(false) + .open(&self.path) + } + + /// Create a read-only iterator over all records. + fn iter(&self) -> Result { + let file = self.open_for_read()?; + KvpPoolIter::new(file, false) + } + + /// Create a read-write iterator over all records. + fn iter_mut(&self) -> Result { + let file = self.open_for_read_write_create()?; + KvpPoolIter::new(file, true) + } +} + +pub(crate) fn encode_record(key: &str, value: &str) -> Vec { + let mut buf = vec![0u8; RECORD_SIZE]; + + let key_bytes = key.as_bytes(); + let key_len = key_bytes.len().min(WIRE_MAX_KEY_BYTES); + buf[..key_len].copy_from_slice(&key_bytes[..key_len]); + + let value_bytes = value.as_bytes(); + let value_len = value_bytes.len().min(WIRE_MAX_VALUE_BYTES); + buf[WIRE_MAX_KEY_BYTES..WIRE_MAX_KEY_BYTES + value_len] + .copy_from_slice(&value_bytes[..value_len]); + + buf +} + +pub(crate) fn decode_record(data: &[u8]) -> io::Result<(String, String)> { + if data.len() != RECORD_SIZE { + return Err(io::Error::other(format!( + "record size mismatch: expected {RECORD_SIZE}, got {}", + data.len() + ))); + } + + let (key_bytes, value_bytes) = data.split_at(WIRE_MAX_KEY_BYTES); + + let key = std::str::from_utf8(key_bytes) + .map_err(|e| io::Error::new(ErrorKind::InvalidData, e))? + .trim_end_matches('\0') + .to_string(); + let value = std::str::from_utf8(value_bytes) + .map_err(|e| io::Error::new(ErrorKind::InvalidData, e))? + .trim_end_matches('\0') + .to_string(); + + Ok((key, value)) +} + +/// A record-at-a-time iterator over a KVP pool file. +/// +/// Uses file I/O with seek semantics for memory-efficient iteration +/// without loading all records into memory. Supports in-place value +/// overwrites and record removal for the last-yielded record. +/// +/// The underlying file is locked for the lifetime of the iterator; +/// the lock is released on drop. +#[derive(Debug)] +struct KvpPoolIter { + file: File, + record_count: usize, + current_index: usize, +} + +impl KvpPoolIter { + fn new(mut file: File, lock_exclusive: bool) -> Result { + let lock_result = if lock_exclusive { + fcntl_lock_exclusive(&file) + } else { + fcntl_lock_shared(&file) + }; + lock_result.map_err(|e| { + io::Error::other(format!("failed to lock KVP file: {e}")) + })?; + + let len = file.metadata()?.len() as usize; + if len > 0 && !len.is_multiple_of(RECORD_SIZE) { + let _ = fcntl_unlock(&file); + return Err(io::Error::other(format!( + "file size ({len}) is not a multiple of record size \ + ({RECORD_SIZE})" + )) + .into()); + } + + let record_count = len / RECORD_SIZE; + file.seek(io::SeekFrom::Start(0))?; + + Ok(Self { + file, + record_count, + current_index: 0, + }) + } + + /// The number of records in the file (updated by + /// [`append`](Self::append) and + /// [`remove_current`](Self::remove_current)). + fn record_count(&self) -> usize { + self.record_count + } + + /// Overwrite the value field of the record last returned by + /// [`next()`](Iterator::next), zero-padding to fill the + /// fixed-width 2048-byte field. + /// + /// After the write the file position is at the start of the next + /// record, so iteration can continue normally. + /// + /// Returns `InvalidInput` if called before the first `next()`. + fn overwrite_current_value(&mut self, value: &str) -> io::Result<()> { + if self.current_index == 0 { + return Err(io::Error::new( + ErrorKind::InvalidInput, + "no record has been read yet", + )); + } + + let record_start = (self.current_index - 1) as u64 * RECORD_SIZE as u64; + let value_offset = record_start + WIRE_MAX_KEY_BYTES as u64; + + self.file.seek(io::SeekFrom::Start(value_offset))?; + + let mut buf = [0u8; WIRE_MAX_VALUE_BYTES]; + let bytes = value.as_bytes(); + let len = bytes.len().min(WIRE_MAX_VALUE_BYTES); + buf[..len].copy_from_slice(&bytes[..len]); + + self.file.write_all(&buf)?; + Ok(()) + } + + /// Append a new record at the end of the file, zero-padded to the + /// fixed-width wire format. + fn append(&mut self, key: &str, value: &str) -> io::Result<()> { + self.file.seek(io::SeekFrom::End(0))?; + self.file.write_all(&encode_record(key, value))?; + self.record_count += 1; + Ok(()) + } + + /// Remove the record last returned by [`next()`](Iterator::next) + /// by swapping it with the final record and truncating the file. + /// + /// After removal the file position rewinds so that the next call + /// to `next()` reads the record that was swapped into this slot + /// (unless the removed record was already the last one). + /// + /// Returns `InvalidInput` if called before the first `next()`. + fn remove_current(&mut self) -> io::Result<()> { + if self.current_index == 0 { + return Err(io::Error::new( + ErrorKind::InvalidInput, + "no record has been read yet", + )); + } + + let delete_index = self.current_index - 1; + let last_index = self.record_count - 1; + + if delete_index != last_index { + self.file.seek(io::SeekFrom::Start( + last_index as u64 * RECORD_SIZE as u64, + ))?; + let mut buf = [0u8; RECORD_SIZE]; + self.file.read_exact(&mut buf)?; + + self.file.seek(io::SeekFrom::Start( + delete_index as u64 * RECORD_SIZE as u64, + ))?; + self.file.write_all(&buf)?; + + self.file.seek(io::SeekFrom::Start( + delete_index as u64 * RECORD_SIZE as u64, + ))?; + self.current_index = delete_index; + } + + self.file.set_len(last_index as u64 * RECORD_SIZE as u64)?; + self.record_count -= 1; + Ok(()) + } + + /// Flush any buffered writes to the underlying file. + fn flush(&mut self) -> io::Result<()> { + self.file.flush() + } +} + +impl Drop for KvpPoolIter { + fn drop(&mut self) { + let _ = fcntl_unlock(&self.file); + } +} + +impl Iterator for KvpPoolIter { + type Item = io::Result<(String, String)>; + + fn next(&mut self) -> Option { + if self.current_index >= self.record_count { + return None; + } + let mut buf = [0u8; RECORD_SIZE]; + match self.file.read_exact(&mut buf) { + Ok(()) => { + self.current_index += 1; + Some(decode_record(&buf)) + } + Err(e) => Some(Err(e)), + } + } + + fn size_hint(&self) -> (usize, Option) { + let remaining = self.record_count - self.current_index; + (remaining, Some(remaining)) + } +} + +impl KvpStore for KvpPoolStore { + fn max_key_size(&self) -> usize { + self.mode.max_key_size() + } + + fn max_value_size(&self) -> usize { + self.mode.max_value_size() + } + + fn insert(&self, key: &str, value: &str) -> Result<(), KvpError> { + self.validate_key(key)?; + self.validate_value(value)?; + + let mut iter = self.iter_mut()?; + let mut found = false; + let mut unique_keys = HashSet::new(); + + while let Some(record) = iter.next() { + let (k, _) = record?; + let is_target = k == key; + unique_keys.insert(k); + if is_target { + iter.overwrite_current_value(value)?; + found = true; + break; + } + } + + if !found { + if unique_keys.len() >= MAX_UNIQUE_KEYS { + return Err(KvpError::MaxUniqueKeysExceeded { + max: MAX_UNIQUE_KEYS, + }); + } + iter.append(key, value)?; + } + + iter.flush()?; + Ok(()) + } + + fn append(&self, key: &str, value: &str) -> Result<(), KvpError> { + self.validate_key(key)?; + self.validate_value(value)?; + + let mut iter = self.iter_mut()?; + iter.append(key, value)?; + iter.flush()?; + Ok(()) + } + + fn read(&self, key: &str) -> Result, KvpError> { + Self::validate_key_for_read(key)?; + + let iter = match self.iter() { + Ok(it) => it, + Err(KvpError::Io(e)) if e.kind() == ErrorKind::NotFound => { + return Ok(None); + } + Err(e) => return Err(e), + }; + + for record in iter { + let (k, v) = record?; + if k == key { + return Ok(Some(v)); + } + } + + Ok(None) + } + + fn entries(&self) -> Result, KvpError> { + let iter = match self.iter() { + Ok(it) => it, + Err(KvpError::Io(e)) if e.kind() == ErrorKind::NotFound => { + return Ok(HashMap::new()); + } + Err(e) => return Err(e), + }; + + let mut map = HashMap::with_capacity(iter.record_count()); + for record in iter { + let (k, v) = record?; + map.entry(k).or_insert(v); + } + Ok(map) + } + + fn delete(&self, key: &str) -> Result { + Self::validate_key_for_read(key)?; + + let file = match self.open_for_read_write() { + Ok(f) => f, + Err(ref e) if e.kind() == ErrorKind::NotFound => { + return Ok(false); + } + Err(e) => return Err(e.into()), + }; + + let mut iter = KvpPoolIter::new(file, true)?; + + while let Some(record) = iter.next() { + let (k, _) = record?; + if k == key { + iter.remove_current()?; + iter.flush()?; + return Ok(true); + } + } + + Ok(false) + } + + fn clear(&self) -> Result<(), KvpError> { + self.truncate_pool() + } + + fn len(&self) -> Result { + match std::fs::metadata(&self.path) { + Ok(m) => Ok(m.len() as usize / RECORD_SIZE), + Err(ref e) if e.kind() == ErrorKind::NotFound => Ok(0), + Err(e) => Err(e.into()), + } + } + + fn is_empty(&self) -> Result { + match std::fs::metadata(&self.path) { + Ok(m) => Ok(m.len() == 0), + Err(ref e) if e.kind() == ErrorKind::NotFound => Ok(true), + Err(e) => Err(e.into()), + } + } + + fn is_stale(&self) -> Result { + self.pool_is_stale() + } + + fn dump(&self, path: &Path) -> Result<(), KvpError> { + let entries = self.entries()?; + let json = serde_json::to_string_pretty(&entries).map_err(|e| { + io::Error::other(format!("JSON serialization failed: {e}")) + })?; + std::fs::write(path, json)?; + Ok(()) + } +} + +fn get_uptime() -> Duration { + Duration::from_secs(System::uptime()) +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::NamedTempFile; + + fn restricted_store(path: &Path) -> KvpPoolStore { + KvpPoolStore::new(Some(path.to_path_buf()), PoolMode::Restricted, false) + .unwrap() + } + + fn full_store(path: &Path) -> KvpPoolStore { + KvpPoolStore::new(Some(path.to_path_buf()), PoolMode::Full, false) + .unwrap() + } + + #[test] + fn test_insert_rejects_empty_key() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + + let err = store.insert("", "value").unwrap_err(); + assert!(matches!(err, KvpError::EmptyKey)); + } + + #[test] + fn test_insert_rejects_null_in_key() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + + let err = store.insert("bad\0key", "value").unwrap_err(); + assert!(matches!(err, KvpError::KeyContainsNull)); + } + + #[test] + fn test_default_path_is_used() { + let store = + KvpPoolStore::new(None, PoolMode::Restricted, false).unwrap(); + assert_eq!(store.path(), Path::new(DEFAULT_KVP_POOL_PATH)); + } + + #[test] + fn test_explicit_path_is_used() { + let tmp = NamedTempFile::new().unwrap(); + let store = KvpPoolStore::new( + Some(tmp.path().to_path_buf()), + PoolMode::Restricted, + false, + ) + .unwrap(); + assert_eq!(store.path(), tmp.path()); + } + + #[test] + fn test_restricted_limits_key_254_pass_255_fail() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + + let ok_key = "k".repeat(254); + store.insert(&ok_key, "v").unwrap(); + + let bad_key = "k".repeat(255); + let err = store.insert(&bad_key, "v").unwrap_err(); + assert!(matches!(err, KvpError::KeyTooLarge { .. })); + } + + #[test] + fn test_restricted_limits_value_1022_pass_1023_fail() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + + let ok_val = "v".repeat(1022); + store.insert("k", &ok_val).unwrap(); + + let bad_val = "v".repeat(1023); + let err = store.insert("k2", &bad_val).unwrap_err(); + assert!(matches!(err, KvpError::ValueTooLarge { .. })); + } + + #[test] + fn test_full_limits_key_512_pass_513_fail() { + let tmp = NamedTempFile::new().unwrap(); + let store = full_store(tmp.path()); + + let ok_key = "k".repeat(512); + store.insert(&ok_key, "v").unwrap(); + + let bad_key = "k".repeat(513); + let err = store.insert(&bad_key, "v").unwrap_err(); + assert!(matches!(err, KvpError::KeyTooLarge { .. })); + } + + #[test] + fn test_full_limits_value_2048_pass_2049_fail() { + let tmp = NamedTempFile::new().unwrap(); + let store = full_store(tmp.path()); + + let ok_val = "v".repeat(2048); + store.insert("k", &ok_val).unwrap(); + + let bad_val = "v".repeat(2049); + let err = store.insert("k2", &bad_val).unwrap_err(); + assert!(matches!(err, KvpError::ValueTooLarge { .. })); + } + + #[test] + fn test_insert_overwrites_existing_key() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + + store.insert("key", "v1").unwrap(); + store.insert("key", "v2").unwrap(); + store.insert("other", "v3").unwrap(); + + let entries = store.entries().unwrap(); + assert_eq!(entries.len(), 2); + assert_eq!(entries.get("key"), Some(&"v2".to_string())); + assert_eq!(entries.get("other"), Some(&"v3".to_string())); + } + + #[test] + fn test_unique_key_cap_allows_1024_then_rejects_1025th() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + + for i in 0..MAX_UNIQUE_KEYS { + store.insert(&format!("k{i}"), "v").unwrap(); + } + let err = store.insert("overflow", "v").unwrap_err(); + assert!(matches!(err, KvpError::MaxUniqueKeysExceeded { .. })); + } + + #[test] + fn test_unique_key_cap_allows_overwrite_at_limit() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + + for i in 0..MAX_UNIQUE_KEYS { + store.insert(&format!("k{i}"), "v").unwrap(); + } + store.insert("k0", "updated").unwrap(); + assert_eq!(store.read("k0").unwrap(), Some("updated".to_string())); + } + + #[test] + fn test_insert_cap_uses_unique_keys_not_record_count() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + + for i in 0..MAX_UNIQUE_KEYS - 1 { + store.insert(&format!("k{i}"), "v").unwrap(); + } + + store.append("k0", "dup").unwrap(); + assert_eq!(store.len().unwrap(), MAX_UNIQUE_KEYS); + + store.insert("new_unique", "v").unwrap(); + assert_eq!(store.read("new_unique").unwrap(), Some("v".to_string())); + } + + #[test] + fn test_unique_key_cap_allows_new_key_after_delete() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + + for i in 0..MAX_UNIQUE_KEYS { + store.insert(&format!("k{i}"), "v").unwrap(); + } + assert!(store.delete("k0").unwrap()); + store.insert("new-key", "v").unwrap(); + } + + #[test] + fn test_clear_empties_store() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + + store.insert("key", "value").unwrap(); + store.clear().unwrap(); + assert_eq!(store.read("key").unwrap(), None); + } + + #[test] + fn test_delete_removes_key() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + + store.insert("key", "v1").unwrap(); + store.insert("other", "v2").unwrap(); + + assert!(store.delete("key").unwrap()); + assert_eq!(store.read("key").unwrap(), None); + assert_eq!(store.read("other").unwrap(), Some("v2".to_string())); + } + + #[test] + fn test_is_stale_and_pool_is_stale_at_boot_helpers() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + store.insert("key", "value").unwrap(); + + assert!(!store.is_stale().unwrap()); + assert!(store.pool_is_stale_at_boot(i64::MAX).unwrap()); + assert!(!store.pool_is_stale_at_boot(0).unwrap()); + } + + #[test] + fn test_mode_getter() { + let tmp = NamedTempFile::new().unwrap(); + let restricted = restricted_store(tmp.path()); + assert_eq!(restricted.mode(), PoolMode::Restricted); + + let tmp2 = NamedTempFile::new().unwrap(); + let full = full_store(tmp2.path()); + assert_eq!(full.mode(), PoolMode::Full); + } + + #[test] + fn test_len_and_is_empty() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + + assert!(store.is_empty().unwrap()); + assert_eq!(store.len().unwrap(), 0); + + store.insert("key", "value").unwrap(); + assert!(!store.is_empty().unwrap()); + assert_eq!(store.len().unwrap(), 1); + + store.insert("key2", "value2").unwrap(); + assert_eq!(store.len().unwrap(), 2); + + store.insert("key", "updated").unwrap(); + assert_eq!(store.len().unwrap(), 2); + } + + #[test] + fn test_read_accepts_wire_max_key_in_restricted_mode() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + + // Write a record using Full mode so a 300-byte key is accepted + let full = full_store(tmp.path()); + let long_key = "k".repeat(300); + full.insert(&long_key, "val").unwrap(); + + // Restricted store can still read the 300-byte key (> 254 safe limit) + assert_eq!(store.read(&long_key).unwrap(), Some("val".to_string())); + + // But a key beyond the wire max (512) is rejected even for reads + let too_long = "k".repeat(513); + let err = store.read(&too_long).unwrap_err(); + assert!(matches!(err, KvpError::KeyTooLarge { .. })); + } + + #[test] + fn test_dump_writes_json() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + + store.insert("key1", "value1").unwrap(); + store.insert("key2", "value2").unwrap(); + + let dump_file = NamedTempFile::new().unwrap(); + store.dump(dump_file.path()).unwrap(); + + let contents = std::fs::read_to_string(dump_file.path()).unwrap(); + let parsed: HashMap = + serde_json::from_str(&contents).unwrap(); + assert_eq!(parsed.len(), 2); + assert_eq!(parsed.get("key1"), Some(&"value1".to_string())); + assert_eq!(parsed.get("key2"), Some(&"value2".to_string())); + } + + #[test] + fn test_concurrent_inserts_to_different_keys() { + let tmp = NamedTempFile::new().unwrap(); + let path = tmp.path().to_path_buf(); + let threads: Vec<_> = (0..8) + .map(|t| { + let p = path.clone(); + std::thread::spawn(move || { + let store = + KvpPoolStore::new(Some(p), PoolMode::Restricted, false) + .unwrap(); + for i in 0..10 { + let key = format!("t{t}_k{i}"); + store.insert(&key, &format!("val_{t}_{i}")).unwrap(); + } + }) + }) + .collect(); + + for t in threads { + t.join().unwrap(); + } + + let store = restricted_store(tmp.path()); + let entries = store.entries().unwrap(); + assert_eq!(entries.len(), 80); + for t in 0..8 { + for i in 0..10 { + let key = format!("t{t}_k{i}"); + assert_eq!( + entries.get(&key), + Some(&format!("val_{t}_{i}")), + "missing or wrong value for {key}" + ); + } + } + } + + #[test] + fn test_concurrent_inserts_to_same_key() { + let tmp = NamedTempFile::new().unwrap(); + let path = tmp.path().to_path_buf(); + let threads: Vec<_> = (0..8) + .map(|t| { + let p = path.clone(); + std::thread::spawn(move || { + let store = + KvpPoolStore::new(Some(p), PoolMode::Restricted, false) + .unwrap(); + for i in 0..10 { + store + .insert("shared_key", &format!("t{t}_v{i}")) + .unwrap(); + } + }) + }) + .collect(); + + for t in threads { + t.join().unwrap(); + } + + let store = restricted_store(tmp.path()); + assert_eq!(store.len().unwrap(), 1); + let val = store.read("shared_key").unwrap().unwrap(); + assert!( + val.starts_with('t') && val.contains("_v"), + "unexpected value format: {val}" + ); + + let file_len = std::fs::metadata(tmp.path()).unwrap().len() as usize; + assert_eq!(file_len, RECORD_SIZE); + } + + #[test] + fn test_concurrent_readers_and_writers() { + let tmp = NamedTempFile::new().unwrap(); + let path = tmp.path().to_path_buf(); + + let store = restricted_store(tmp.path()); + for i in 0..10 { + store.insert(&format!("k{i}"), &format!("v{i}")).unwrap(); + } + + let writer_threads: Vec<_> = (0..4) + .map(|t| { + let p = path.clone(); + std::thread::spawn(move || { + let store = + KvpPoolStore::new(Some(p), PoolMode::Restricted, false) + .unwrap(); + for round in 0..5 { + let key = format!("k{t}"); + store.insert(&key, &format!("w{t}_r{round}")).unwrap(); + } + }) + }) + .collect(); + + let reader_threads: Vec<_> = (0..4) + .map(|_| { + let p = path.clone(); + std::thread::spawn(move || { + let store = + KvpPoolStore::new(Some(p), PoolMode::Restricted, false) + .unwrap(); + for _ in 0..20 { + let entries = store.entries().unwrap(); + assert!(entries.len() <= 10); + for (k, v) in &entries { + assert!(!k.is_empty()); + assert!(!v.is_empty()); + } + } + }) + }) + .collect(); + + for t in writer_threads { + t.join().unwrap(); + } + for t in reader_threads { + t.join().unwrap(); + } + + let entries = store.entries().unwrap(); + assert_eq!(entries.len(), 10); + } + + #[test] + fn test_concurrent_writers_at_key_cap() { + let tmp = NamedTempFile::new().unwrap(); + let path = tmp.path().to_path_buf(); + + let store = restricted_store(tmp.path()); + for i in 0..MAX_UNIQUE_KEYS - 4 { + store.insert(&format!("pre{i}"), "v").unwrap(); + } + + let threads: Vec<_> = (0..4) + .map(|t| { + let p = path.clone(); + std::thread::spawn(move || { + let store = + KvpPoolStore::new(Some(p), PoolMode::Restricted, false) + .unwrap(); + store.insert(&format!("new{t}"), "v") + }) + }) + .collect(); + + let results: Vec<_> = + threads.into_iter().map(|t| t.join().unwrap()).collect(); + + let successes = results.iter().filter(|r| r.is_ok()).count(); + let cap_errors = results + .iter() + .filter(|r| { + matches!(r, Err(KvpError::MaxUniqueKeysExceeded { .. })) + }) + .count(); + + assert_eq!(successes, 4); + assert_eq!(cap_errors, 0); + assert_eq!(store.len().unwrap(), MAX_UNIQUE_KEYS); + + let err = store.insert("one_too_many", "v").unwrap_err(); + assert!(matches!(err, KvpError::MaxUniqueKeysExceeded { .. })); + } + + #[test] + fn test_iter_yields_all_records_in_order() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + + let expected: Vec<(&str, &str)> = + vec![("a", "1"), ("b", "2"), ("c", "3"), ("d", "4"), ("e", "5")]; + for (k, v) in &expected { + store.append(k, v).unwrap(); + } + + let iter = store.iter().unwrap(); + assert_eq!(iter.record_count(), 5); + + let records: Vec<(String, String)> = iter.map(|r| r.unwrap()).collect(); + for (i, (k, v)) in records.iter().enumerate() { + assert_eq!(k, expected[i].0); + assert_eq!(v, expected[i].1); + } + } + + #[test] + fn test_iter_empty_file() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + + let iter = store.iter().unwrap(); + assert_eq!(iter.record_count(), 0); + assert_eq!(iter.count(), 0); + } + + #[test] + fn test_iter_malformed_file_error() { + let tmp = NamedTempFile::new().unwrap(); + std::fs::write(tmp.path(), [0u8; RECORD_SIZE + 1]).unwrap(); + + let store = restricted_store(tmp.path()); + let err = store.iter().unwrap_err(); + assert!(matches!(err, KvpError::Io(_))); + } + + #[test] + fn test_overwrite_adjacent_records_untouched() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + store.append("first", "val_first").unwrap(); + store.append("middle", "val_middle").unwrap(); + store.append("last", "val_last").unwrap(); + + let raw_before = std::fs::read(tmp.path()).unwrap(); + let first_before = &raw_before[..RECORD_SIZE]; + let last_before = &raw_before[2 * RECORD_SIZE..3 * RECORD_SIZE]; + + { + let mut iter = store.iter_mut().unwrap(); + iter.next().unwrap().unwrap(); // first + let (k, _) = iter.next().unwrap().unwrap(); // middle + assert_eq!(k, "middle"); + iter.overwrite_current_value("UPDATED").unwrap(); + iter.flush().unwrap(); + } + + let raw_after = std::fs::read(tmp.path()).unwrap(); + assert_eq!(&raw_after[..RECORD_SIZE], first_before); + assert_eq!(&raw_after[2 * RECORD_SIZE..3 * RECORD_SIZE], last_before); + + assert_eq!(store.read("middle").unwrap(), Some("UPDATED".to_string())); + } + + #[test] + fn test_overwrite_multiple_in_one_pass() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + store.append("a", "old_a").unwrap(); + store.append("b", "old_b").unwrap(); + store.append("c", "old_c").unwrap(); + + { + let mut iter = store.iter_mut().unwrap(); + let mut idx = 0; + while let Some(record) = iter.next() { + record.unwrap(); + if idx == 0 || idx == 2 { + iter.overwrite_current_value("new").unwrap(); + } + idx += 1; + } + iter.flush().unwrap(); + } + + assert_eq!(store.read("a").unwrap(), Some("new".to_string())); + assert_eq!(store.read("b").unwrap(), Some("old_b".to_string())); + assert_eq!(store.read("c").unwrap(), Some("new".to_string())); + } + + #[test] + fn test_remove_swap_and_continue() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + store.append("A", "1").unwrap(); + store.append("B", "2").unwrap(); + store.append("C", "3").unwrap(); + store.append("D", "4").unwrap(); + + let mut collected = Vec::new(); + { + let mut iter = store.iter_mut().unwrap(); + while let Some(record) = iter.next() { + let (k, v) = record.unwrap(); + if k == "B" { + iter.remove_current().unwrap(); + } else { + collected.push((k, v)); + } + } + iter.flush().unwrap(); + } + + assert_eq!(collected.len(), 3); + assert!(collected.iter().any(|(k, _)| k == "A")); + assert!(collected.iter().any(|(k, _)| k == "C")); + assert!(collected.iter().any(|(k, _)| k == "D")); + + assert_eq!(store.len().unwrap(), 3); + assert_eq!(store.read("B").unwrap(), None); + } + + #[test] + fn test_remove_last_record_no_swap() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + store.append("a", "1").unwrap(); + store.append("b", "2").unwrap(); + store.append("c", "3").unwrap(); + + { + let mut iter = store.iter_mut().unwrap(); + iter.next().unwrap().unwrap(); + iter.next().unwrap().unwrap(); + iter.next().unwrap().unwrap(); + iter.remove_current().unwrap(); + iter.flush().unwrap(); + } + + assert_eq!(store.len().unwrap(), 2); + assert_eq!(store.read("c").unwrap(), None); + assert_eq!(store.read("a").unwrap(), Some("1".to_string())); + assert_eq!(store.read("b").unwrap(), Some("2".to_string())); + } + + #[test] + fn test_remove_only_record() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + store.append("solo", "val").unwrap(); + + { + let mut iter = store.iter_mut().unwrap(); + iter.next().unwrap().unwrap(); + iter.remove_current().unwrap(); + iter.flush().unwrap(); + } + + assert!(store.is_empty().unwrap()); + } + + #[test] + fn test_append_no_key_cap() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + + for i in 0..MAX_UNIQUE_KEYS { + store.insert(&format!("k{i}"), "v").unwrap(); + } + store.append("extra", "val").unwrap(); + assert_eq!(store.len().unwrap(), MAX_UNIQUE_KEYS + 1); + } + + #[test] + fn test_append_validates_sizes() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + + let big_key = "k".repeat(255); + let err = store.append(&big_key, "v").unwrap_err(); + assert!(matches!(err, KvpError::KeyTooLarge { .. })); + + let big_val = "v".repeat(1023); + let err = store.append("k", &big_val).unwrap_err(); + assert!(matches!(err, KvpError::ValueTooLarge { .. })); + } + + #[test] + fn test_append_then_read() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + + store.append("diag_001", "event_data").unwrap(); + assert_eq!( + store.read("diag_001").unwrap(), + Some("event_data".to_string()) + ); + } + + #[test] + fn test_append_duplicate_key_semantics() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + + store.append("X", "first").unwrap(); + store.append("X", "second").unwrap(); + + assert_eq!(store.read("X").unwrap(), Some("first".to_string())); + + let entries = store.entries().unwrap(); + assert_eq!(entries.get("X"), Some(&"first".to_string())); + } + + #[test] + fn test_insert_file_size_unchanged_on_overwrite() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + + store.insert("key", "original").unwrap(); + let size_before = std::fs::metadata(tmp.path()).unwrap().len(); + + store.insert("key", "updated").unwrap(); + let size_after = std::fs::metadata(tmp.path()).unwrap().len(); + + assert_eq!(size_before, size_after); + assert_eq!(store.read("key").unwrap(), Some("updated".to_string())); + } + + #[test] + fn test_insert_file_size_grows_on_new_key() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + + store.insert("k1", "v1").unwrap(); + let size_before = std::fs::metadata(tmp.path()).unwrap().len(); + + store.insert("k2", "v2").unwrap(); + let size_after = std::fs::metadata(tmp.path()).unwrap().len(); + + assert_eq!(size_after - size_before, RECORD_SIZE as u64); + } + + #[test] + fn test_insert_preserves_other_records() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + + for i in 0..5 { + store.insert(&format!("k{i}"), &format!("v{i}")).unwrap(); + } + + let raw_before = std::fs::read(tmp.path()).unwrap(); + store.insert("k2", "CHANGED").unwrap(); + let raw_after = std::fs::read(tmp.path()).unwrap(); + + assert_eq!(raw_before.len(), raw_after.len()); + for i in 0..5 { + if i == 2 { + continue; + } + let start = i * RECORD_SIZE; + let end = start + RECORD_SIZE; + assert_eq!( + &raw_before[start..end], + &raw_after[start..end], + "record {i} was modified unexpectedly" + ); + } + } + + #[test] + fn test_file_integrity_after_mixed_ops() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + + for i in 0..10 { + store.insert(&format!("u{i}"), &format!("uv{i}")).unwrap(); + } + for i in 0..5 { + store.append(&format!("a{i}"), &format!("av{i}")).unwrap(); + } + store.delete("u3").unwrap(); + store.delete("u7").unwrap(); + store.insert("u0", "overwritten").unwrap(); + + let raw = std::fs::read(tmp.path()).unwrap(); + assert_eq!(raw.len() % RECORD_SIZE, 0); + + let record_count = raw.len() / RECORD_SIZE; + for i in 0..record_count { + let start = i * RECORD_SIZE; + let end = start + RECORD_SIZE; + let result = decode_record(&raw[start..end]); + assert!( + result.is_ok(), + "record {i} failed to decode: {:?}", + result.err() + ); + } + } + + #[test] + fn test_concurrent_appends() { + let tmp = NamedTempFile::new().unwrap(); + let path = tmp.path().to_path_buf(); + + let threads: Vec<_> = (0..8) + .map(|t| { + let p = path.clone(); + std::thread::spawn(move || { + let store = + KvpPoolStore::new(Some(p), PoolMode::Restricted, false) + .unwrap(); + for i in 0..10 { + store + .append( + &format!("t{t}_d{i}"), + &format!("data_{t}_{i}"), + ) + .unwrap(); + } + }) + }) + .collect(); + + for t in threads { + t.join().unwrap(); + } + + let store = restricted_store(tmp.path()); + assert_eq!(store.len().unwrap(), 80); + + let entries = store.entries().unwrap(); + for t in 0..8 { + for i in 0..10 { + let key = format!("t{t}_d{i}"); + assert!(entries.contains_key(&key), "missing key {key}"); + } + } + } + + #[test] + fn test_concurrent_mixed_insert_and_append() { + let tmp = NamedTempFile::new().unwrap(); + let path = tmp.path().to_path_buf(); + + let store = restricted_store(tmp.path()); + for i in 0..4 { + store.insert(&format!("fixed{i}"), "init").unwrap(); + } + + let insert_threads: Vec<_> = (0..4) + .map(|t| { + let p = path.clone(); + std::thread::spawn(move || { + let store = + KvpPoolStore::new(Some(p), PoolMode::Restricted, false) + .unwrap(); + for round in 0..5 { + store + .insert(&format!("fixed{t}"), &format!("r{round}")) + .unwrap(); + } + }) + }) + .collect(); + + let append_threads: Vec<_> = (0..4) + .map(|t| { + let p = path.clone(); + std::thread::spawn(move || { + let store = + KvpPoolStore::new(Some(p), PoolMode::Restricted, false) + .unwrap(); + for i in 0..5 { + store + .append( + &format!("diag_t{t}_i{i}"), + &format!("d{i}"), + ) + .unwrap(); + } + }) + }) + .collect(); + + for t in insert_threads { + t.join().unwrap(); + } + for t in append_threads { + t.join().unwrap(); + } + + let entries = store.entries().unwrap(); + assert_eq!(entries.len(), 24); + for t in 0..4 { + assert!(entries.contains_key(&format!("fixed{t}"))); + } + } + + #[test] + fn test_iter_drop_mid_iteration_releases_lock() { + let tmp = NamedTempFile::new().unwrap(); + let store = restricted_store(tmp.path()); + store.insert("k1", "v1").unwrap(); + store.insert("k2", "v2").unwrap(); + + { + let mut iter = store.iter().unwrap(); + iter.next().unwrap().unwrap(); + } + + store.insert("k3", "v3").unwrap(); + assert_eq!(store.len().unwrap(), 3); + } +} diff --git a/libazureinit-kvp/src/lib.rs b/libazureinit-kvp/src/lib.rs new file mode 100644 index 00000000..85f18311 --- /dev/null +++ b/libazureinit-kvp/src/lib.rs @@ -0,0 +1,115 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! `libazureinit-kvp` provides a storage trait and unified KVP pool +//! implementation for Hyper-V/Azure guests. +//! +//! - [`KvpStore`]: storage interface used by higher layers. +//! - [`KvpPoolStore`]: KVP pool file implementation with +//! [`PoolMode`](kvp_pool::PoolMode)-based policy. + +use std::collections::HashMap; +use std::fmt; +use std::io; +use std::path::Path; + +pub mod kvp_pool; + +/// Errors returned by [`KvpStore`] operations. +#[derive(Debug)] +pub enum KvpError { + /// The key was empty. + EmptyKey, + /// The key exceeds the store's maximum key size. + KeyTooLarge { max: usize, actual: usize }, + /// The value exceeds the store's maximum value size. + ValueTooLarge { max: usize, actual: usize }, + /// The store already has the maximum allowed number of unique keys. + MaxUniqueKeysExceeded { max: usize }, + /// The key contains a null byte, which is incompatible with the + /// on-disk format (null-padded fixed-width fields). + KeyContainsNull, + /// An underlying I/O error. + Io(io::Error), +} + +impl fmt::Display for KvpError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::EmptyKey => write!(f, "KVP key must not be empty"), + Self::KeyTooLarge { max, actual } => { + write!(f, "KVP key length ({actual}) exceeds maximum ({max})") + } + Self::ValueTooLarge { max, actual } => { + write!(f, "KVP value length ({actual}) exceeds maximum ({max})") + } + Self::MaxUniqueKeysExceeded { max } => { + write!(f, "KVP unique key count exceeded maximum ({max})") + } + Self::KeyContainsNull => { + write!(f, "KVP key must not contain null bytes") + } + Self::Io(e) => write!(f, "{e}"), + } + } +} + +impl std::error::Error for KvpError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Self::Io(e) => Some(e), + _ => None, + } + } +} + +impl From for KvpError { + fn from(err: io::Error) -> Self { + Self::Io(err) + } +} + +pub use kvp_pool::KvpPoolStore; + +/// Key-value store with Hyper-V KVP semantics. +pub trait KvpStore: Send + Sync { + /// Maximum key size in bytes for writes. + fn max_key_size(&self) -> usize; + + /// Maximum value size in bytes for writes. + fn max_value_size(&self) -> usize; + + /// Insert a new key-value pair or update an existing key's value. + fn insert(&self, key: &str, value: &str) -> Result<(), KvpError>; + + /// Append a key-value pair without checking for an existing key. + fn append(&self, key: &str, value: &str) -> Result<(), KvpError>; + + /// Read the value for a key. Returns `Ok(None)` when absent. + fn read(&self, key: &str) -> Result, KvpError>; + + /// Remove a key. Returns `true` if the key was present. + fn delete(&self, key: &str) -> Result; + + /// Remove all entries from the store. + fn clear(&self) -> Result<(), KvpError>; + + /// Return all key-value pairs. + fn entries(&self) -> Result, KvpError>; + + /// Return the number of records in the store. + /// + /// This counts on-disk records, not unique keys. If [`append`](Self::append) + /// was used to write duplicate keys, this may exceed the number of + /// unique keys returned by [`entries`](Self::entries). + fn len(&self) -> Result; + + /// Return whether the store is empty. + fn is_empty(&self) -> Result; + + /// Whether the store's data is stale (e.g. predates current boot). + fn is_stale(&self) -> Result; + + /// Dump all entries to a JSON file at the given path. + fn dump(&self, path: &Path) -> Result<(), KvpError>; +}