From c79f8ed339cd4757f67e05e704e59b3e7205964d Mon Sep 17 00:00:00 2001 From: Xander Date: Thu, 26 Feb 2026 16:47:02 +0000 Subject: [PATCH 1/4] RFC for table encryption --- docs/rfcs/0003_table_encryption.md | 729 +++++++++++++++++++++++++++++ 1 file changed, 729 insertions(+) create mode 100644 docs/rfcs/0003_table_encryption.md diff --git a/docs/rfcs/0003_table_encryption.md b/docs/rfcs/0003_table_encryption.md new file mode 100644 index 0000000000..f830377568 --- /dev/null +++ b/docs/rfcs/0003_table_encryption.md @@ -0,0 +1,729 @@ + + +# Table Encryption + +## Background + +### Iceberg Spec: Encryption + +The [Iceberg table spec](https://iceberg.apache.org/spec/#table-metadata) defines encryption +as a first-class concept. Tables may store an `encryption-keys` map in their metadata, +snapshots may reference an `encryption-key-id`, and manifest files carry optional +`key_metadata` bytes. Data files themselves can be encrypted either at the stream level +(AES-GCM envelope encryption, the "AGS1" format) or natively by the file format (e.g. +Parquet Modular Encryption). + +The Java implementation (`org.apache.iceberg.encryption`) is the reference and has been +production-tested. It defines: + +- **`EncryptionManager`** -- orchestrates encrypt/decrypt of `InputFile`/`OutputFile` +- **`KeyManagementClient`** -- pluggable KMS integration (wrap/unwrap keys) +- **`EncryptedInputFile` / `EncryptedOutputFile`** -- thin wrappers pairing a raw file handle + with its `EncryptionKeyMetadata` +- **`StandardEncryptionManager`** -- envelope encryption with key caching, AGS1 streams, + and Parquet native encryption support +- **`StandardKeyMetadata`** -- Avro-serialized key metadata (wrapped DEK, AAD prefix, file length) +- **`AesGcmInputStream` / `AesGcmOutputStream`** -- block-based stream encryption (AGS1 format) + +### Problem Statement + +The Rust implementation currently has no encryption support. Users reading encrypted Iceberg +tables created by Java/Spark cannot do so from `iceberg-rust`. Writing encrypted tables is +likewise impossible. + +Additionally, the current `InputFile` type is a concrete struct tightly coupled to `opendal::Operator`. +This prevents cleanly representing encrypted input files -- in the Java implementation, `InputFile` +is an interface with encrypted variants (`EncryptedInputFile`, `NativeEncryptionInputFile`). + +### Relationship to Storage Trait RFC + +[RFC 0002 (Making Storage a Trait)](https://github.com/apache/iceberg-rust/pull/2116) proposes +converting `Storage` from an enum to a trait and removing the `Extensions` mechanism from +`FileIOBuilder`. This encryption RFC is designed to work both with the current `Extensions`-based +`FileIO` and with the future trait-based storage. Specific adaptation points are called out below. + +--- + +## High-Level Architecture + +The encryption system follows the same envelope encryption pattern as the Java implementation, +adapted to Rust's ownership and async model. + +### Component Overview + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ User / Table Scan │ +└─────────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ EncryptionManager │ +│ │ +│ - Orchestrates key unwrapping, caching, and encryptor creation │ +│ - Holds Arc for KMS integration │ +│ - Maintains KeyCache (LRU + TTL) to avoid redundant KMS calls │ +│ - Provides prepare_decryption() and bulk_prepare_decryption() │ +│ - Provides extract_aad_prefix() for Parquet native encryption │ +└─────────────────────────────────────────────────────────────────────────────┘ + │ │ + ▼ ▼ +┌──────────────────────────┐ ┌──────────────────────────────────────────────┐ +│ KeyManagementClient │ │ KeyCache │ +│ (trait) │ │ │ +│ │ │ - LRU cache with configurable TTL │ +│ wrap_key(dek, kek_id) │ │ - Thread-safe │ +│ unwrap_key(wrapped_dek) │ │ - Caches Arc per metadata │ +└──────────────────────────┘ └──────────────────────────────────────────────┘ + │ + ▼ +┌──────────────────────────┐ +│ KMS Implementations │ +│ │ +│ - InMemoryKms (testing) │ +│ - AWS KMS (future) │ +│ - Azure KV (future) │ +│ - GCP KMS (future) │ +└──────────────────────────┘ +``` + +### Decryption Data Flow + +``` +TableMetadata + └── encryption_keys: {key_id → EncryptedKey(key_metadata bytes)} + │ +Snapshot │ + └── encryption_key_id ──────┘ + │ + ▼ + load_manifest_list(file_io, table_metadata) + 1. Look up encryption_key_id in table_metadata.encryption_keys + 2. If found: file_io.new_encrypted_input(path, key_metadata) giving a new encrypted InputFile + 3. If not: file_io.new_input(path) + │ + ▼ +ManifestFile + └── key_metadata: Option> + │ + load_manifest(file_io) + 1. If key_metadata present: file_io.new_encrypted_input(path, key_metadata) giving a new encrypted InputFile + 2. If not: file_io.new_input(path) + │ + ▼ +FileScanTask + └── key_metadata: Option> + │ + ArrowReader::create_parquet_record_batch_stream_builder() + 1. If key_metadata present: + * Parquet-native encrypted → FileDecryptionProperties with IcebergKeyRetriever + 2. If not: standard Parquet read +``` + +### Crate Structure + +All encryption code lives in a new `encryption` module within the `iceberg` crate, gated +behind an `encryption` feature flag: + +``` +crates/iceberg/src/ +├── encryption/ +│ ├── mod.rs # Module re-exports +│ ├── crypto.rs # AES-GCM primitives (SecureKey, AesGcmEncryptor) +│ ├── cache.rs # KeyCache (LRU + TTL) +│ ├── key_management.rs # KeyManagementClient trait + InMemoryKms +│ ├── key_metadata.rs # EncryptionKeyMetadata trait + StandardKeyMetadata +│ ├── manager.rs # EncryptionManager (orchestrator) +│ ├── parquet_key_retriever.rs # Bridge to parquet-rs KeyRetriever +│ └── stream.rs # AesGcmFileRead (AGS1 stream decryption) +├── io/ +│ └── file_io.rs # InputFile enum + EncryptedInputFile variant +└── arrow/ + └── reader.rs # Parquet decryption integration +``` + +--- + +## Design + +### Core Cryptographic Primitives + +#### EncryptionAlgorithm + +```rust +pub enum EncryptionAlgorithm { + Aes128Gcm, + // Future: Aes256Gcm +} + +impl EncryptionAlgorithm { + pub fn key_length(&self) -> usize; // 16 for AES-128 + pub fn nonce_length(&self) -> usize; // 12 (96-bit) +} +``` + +#### SecureKey + +Wraps key material with automatic zeroization on drop via `zeroize::Zeroizing>`: + +```rust +pub struct SecureKey { + data: Zeroizing>, + algorithm: EncryptionAlgorithm, +} + +impl SecureKey { + pub fn new(data: Vec, algorithm: EncryptionAlgorithm) -> Result; + pub fn generate(algorithm: EncryptionAlgorithm) -> Self; +} +``` + +#### AesGcmEncryptor + +Performs AES-GCM encrypt/decrypt operations. Ciphertext format matches the Java implementation: +`[12-byte nonce][ciphertext][16-byte GCM tag]`. + +```rust +pub struct AesGcmEncryptor { /* ... */ } + +impl AesGcmEncryptor { + pub fn new(key: SecureKey) -> Self; + pub fn encrypt(&self, plaintext: &[u8], aad: Option<&[u8]>) -> Result>; + pub fn decrypt(&self, ciphertext: &[u8], aad: Option<&[u8]>) -> Result>; +} +``` + +### Key Management + +#### KeyManagementClient Trait + +Pluggable interface for KMS integration. Mirrors the Java `KeyManagementClient`: + +```rust +#[async_trait] +pub trait KeyManagementClient: Send + Sync { + /// Wraps a DEK using the master key identified by `master_key_id`. + async fn wrap_key(&self, dek: &[u8], master_key_id: &str) -> Result>; + + /// Unwraps a previously wrapped DEK. + async fn unwrap_key(&self, wrapped_dek: &[u8]) -> Result>; +} +``` + +Users implement this trait to integrate with their KMS of choice (AWS KMS, Azure Key Vault, +GCP KMS, HashiCorp Vault, etc.). An `InMemoryKms` is provided for testing. + +#### StandardKeyMetadata + +Avro-serialized metadata stored alongside encrypted files. Compatible with the Java +`StandardKeyMetadata` format for cross-language interoperability: + +```rust +pub struct StandardKeyMetadata { + encryption_key: Vec, // Wrapped DEK + aad_prefix: Vec, // Additional authenticated data prefix + file_length: Option, // Optional encrypted file length +} + +impl StandardKeyMetadata { + pub fn serialize(&self) -> Result>; + pub fn deserialize(bytes: &[u8]) -> Result; +} +``` + +#### KeyCache + +Thread-safe LRU cache with TTL to avoid redundant KMS round-trips: + +```rust +pub struct KeyCache { /* ... */ } + +impl KeyCache { + pub fn new(capacity: usize, ttl: Duration) -> Self; + pub async fn get(&self, key_metadata: &[u8]) -> Option>; + pub async fn insert(&self, key_metadata: &[u8], encryptor: Arc); + pub async fn evict_expired(&self); +} +``` + +### EncryptionManager + +Central orchestrator that ties together KMS, caching, and encryptor creation: + +```rust +pub struct EncryptionManager { + kms_client: Arc, + algorithm: EncryptionAlgorithm, + key_cache: Arc, +} + +impl EncryptionManager { + pub fn new( + kms_client: Arc, + algorithm: EncryptionAlgorithm, + cache_ttl: Duration, + ) -> Self; + + pub fn with_defaults(kms_client: Arc) -> Self; + + /// Unwraps a DEK from key metadata and returns a cached encryptor. + pub async fn prepare_decryption( + &self, + key_metadata: &[u8], + ) -> Result>; + + /// Batch preparation for multiple files (parallel KMS calls). + pub async fn bulk_prepare_decryption( + &self, + key_metadata_list: Vec>, + ) -> Result>>; + + /// Extracts the AAD prefix from key metadata for Parquet native encryption. + pub fn extract_aad_prefix(&self, key_metadata: &[u8]) -> Result>; +} +``` + +### AGS1 Stream Encryption + +Block-based stream encryption format compatible with Java's `AesGcmInputStream`/`AesGcmOutputStream`. + +#### Format + +``` +┌──────────────────────────────────────────┐ +│ Header (8 bytes) │ +│ Magic: "AGS1" (4 bytes) │ +│ Plain block size: u32 LE (4 bytes) │ +│ Default: 1,048,576 (1 MiB) │ +├──────────────────────────────────────────┤ +│ Block 0 │ +│ Nonce (12 bytes) │ +│ Ciphertext (up to plain_block_size) │ +│ GCM Tag (16 bytes) │ +├──────────────────────────────────────────┤ +│ Block 1..N (same structure) │ +├──────────────────────────────────────────┤ +│ Final block (may be shorter) │ +│ Nonce (12 bytes) │ +│ Ciphertext (remaining bytes) │ +│ GCM Tag (16 bytes) │ +└──────────────────────────────────────────┘ + +Cipher block size = plain_block_size + 12 (nonce) + 16 (tag) = 1,048,604 +``` + +Each block's AAD is constructed as: `aad_prefix || block_index (4 bytes, little-endian)`. +This binds each block to its position in the stream, preventing block reordering attacks. + +#### AesGcmFileRead + +Implements the `FileRead` trait to provide transparent decryption of AGS1-encrypted files. +Supports random-access reads with an internal block cache (LRU, default 16 blocks): + +```rust +pub struct AesGcmFileRead { /* ... */ } + +impl AesGcmFileRead { + pub async fn new( + inner: Box, + encryptor: Arc, + key_metadata: &StandardKeyMetadata, + file_length: u64, + ) -> Result; + + pub async fn calculate_plaintext_length_from_file( + reader: &impl FileRead, + file_length: u64, + ) -> Result; +} + +#[async_trait] +impl FileRead for AesGcmFileRead { + async fn read(&self, range: Range) -> Result; +} +``` + +### InputFile: From Struct to Enum + +**This is a key design change.** The current `InputFile` is a concrete struct. In the Java +implementation, `InputFile` is an interface with multiple implementations including encrypted +variants. We propose converting `InputFile` to an enum to support encrypted files without +requiring a separate type at every call site: + +```rust +pub enum InputFile { + /// Standard unencrypted input file. + Plain { + op: Operator, + path: String, + relative_path_pos: usize, + }, + + /// AGS1 stream-encrypted input file. + /// The file is decrypted transparently on read. + Encrypted { + op: Operator, + path: String, + relative_path_pos: usize, + encryptor: Arc, + key_metadata: StandardKeyMetadata, + }, + + /// Parquet-native encrypted input file. + /// Decryption is handled by the Parquet reader using FileDecryptionProperties. + /// The InputFile itself reads raw (encrypted) bytes. + NativeEncrypted { + op: Operator, + path: String, + relative_path_pos: usize, + key_metadata: Vec, + }, +} +``` + +This mirrors the Java hierarchy: + +| Java | Rust | +|-------------------------------|-------------------------------| +| `InputFile` (interface) | `InputFile` (enum) | +| Regular `InputFile` impl | `InputFile::Plain` | +| `EncryptedInputFile` wrapper | `InputFile::Encrypted` | +| `NativeEncryptionInputFile` | `InputFile::NativeEncrypted` | + +Common operations delegate to the appropriate variant: + +```rust +impl InputFile { + pub fn location(&self) -> &str; + pub async fn exists(&self) -> Result; + pub async fn metadata(&self) -> Result; + pub async fn read(&self) -> Result; + pub async fn reader(&self) -> Result>; +} +``` + +For the `Encrypted` variant, `read()` and `reader()` transparently decrypt via `AesGcmFileRead`. +For the `NativeEncrypted` variant, `read()` and `reader()` return raw bytes -- the Parquet +reader handles decryption using `FileDecryptionProperties`. + +#### Adaptation for Storage Trait RFC + +Once RFC 0002 merges, `InputFile` will hold `Arc` instead of `Operator`. The enum +structure remains the same -- only the inner storage handle type changes: + +```rust +// After Storage Trait RFC merges: +pub enum InputFile { + Plain { + storage: Arc, + path: String, + }, + Encrypted { + storage: Arc, + path: String, + encryptor: Arc, + key_metadata: StandardKeyMetadata, + }, + NativeEncrypted { + storage: Arc, + path: String, + key_metadata: Vec, + }, +} +``` + +### FileIO Integration + +#### Current Approach (with Extensions) + +The `EncryptionManager` is injected into `FileIO` via the existing `Extensions` mechanism: + +```rust +let encryption_manager = EncryptionManager::with_defaults(Arc::new(kms_client)); + +let file_io = FileIOBuilder::new("s3") + .with_prop("s3.region", "us-east-1") + .with_extension(encryption_manager) + .build()?; + +// Creates an encrypted InputFile +let input = file_io.new_input(path, key_metadata).await?; +let data = input.read().await?; +``` + +#### After Storage Trait RFC + +RFC 0002 removes `Extensions` from `FileIOBuilder`. The `EncryptionManager` will instead be +provided through the `StorageFactory` or configured at the catalog level: + +```rust +// Option A: EncryptionManager on the catalog +let catalog = GlueCatalogBuilder::default() + .with_storage_factory(Arc::new(OpenDalStorageFactory::S3)) + .with_encryption_manager(encryption_manager) + .load("my_catalog", props) + .await?; + +// Option B: Wrapping StorageFactory - I'm pretty sure this is more idomatic in the new trait world. +pub struct EncryptingStorageFactory { + inner: Arc, + encryption_manager: Arc, +} + +impl StorageFactory for EncryptingStorageFactory { + fn build(&self, config: &StorageConfig) -> Result> { + let storage = self.inner.build(config)?; + Ok(Arc::new(EncryptingStorage::new(storage, self.encryption_manager.clone()))) + } +} +``` + +The exact integration point will be finalized when RFC 0002 merges. The encryption +module's internal design (crypto, key management, stream format) is unaffected. + +### Parquet Native Encryption Bridge + +For files using Parquet Modular Encryption (where the Parquet file itself contains encrypted +column chunks), we bridge Iceberg's async key management with parquet-rs's synchronous +`KeyRetriever` trait: + +```rust +pub struct IcebergKeyRetriever { + encryption_manager: Arc, + runtime: tokio::runtime::Handle, +} + +impl KeyRetriever for IcebergKeyRetriever { + fn retrieve_key(&self, key_metadata: &[u8]) -> parquet::errors::Result> { + // Bridge async → sync using the tokio runtime handle + std::thread::scope(|s| { + s.spawn(|| { + self.runtime.block_on(async { + self.encryption_manager + .prepare_decryption(key_metadata) + .await + }) + }) + .join() + }) + } +} +``` + +The Arrow reader integrates this when `key_metadata` is present on a `FileScanTask`: + +```rust +// In ArrowReader: +if let Some(key_metadata) = &task.key_metadata { + let key_retriever = Arc::new(IcebergKeyRetriever::new( + encryption_manager, + runtime_handle, + )); + let decryption_properties = FileDecryptionProperties::with_key_retriever( + key_retriever as Arc, + ) + .build()?; + builder = builder.with_file_decryption_properties(decryption_properties); +} +``` + +### Manifest & Snapshot Integration + +#### ManifestFile + +The `ManifestFile` struct gains an optional `key_metadata` field. When present, +`load_manifest()` uses encrypted I/O: + +```rust +pub struct ManifestFile { + // ... existing fields ... + pub key_metadata: Option>, +} + +impl ManifestFile { + pub async fn load_manifest(&self, file_io: &FileIO) -> Result { + let avro = match &self.key_metadata { + Some(km) => { + file_io + .new_encrypted_input(&self.manifest_path, km) + .await? + .read() + .await? + } + None => { + file_io.new_input(&self.manifest_path)?.read().await? + } + }; + // Deserialize Avro manifest... + } +} +``` + +#### Snapshot + +Snapshots reference an `encryption_key_id` that maps to a key in `TableMetadata.encryption_keys`: + +```rust +pub struct Snapshot { + // ... existing fields ... + pub encryption_key_id: Option, +} + +impl Snapshot { + pub async fn load_manifest_list( + &self, + file_io: &FileIO, + table_metadata: &TableMetadata, + ) -> Result { + let bytes = match &self.encryption_key_id { + Some(key_id) => { + let encrypted_key = table_metadata + .encryption_keys + .get(key_id) + .ok_or_else(|| /* error */)?; + file_io + .new_encrypted_input(&self.manifest_list, &encrypted_key.key_metadata) + .await? + .read() + .await? + } + None => file_io.new_input(&self.manifest_list)?.read().await?, + }; + ManifestList::parse(bytes, /* ... */) + } +} +``` + +#### FileScanTask + +Propagates per-file encryption metadata through the scan pipeline: + +```rust +pub struct FileScanTask { + // ... existing fields ... + pub key_metadata: Option>, +} +``` + +--- + +## Implementation Plan + +### Phase 1: Core Encryption (Read Path) + +- Cryptographic primitives: `EncryptionAlgorithm`, `SecureKey`, `AesGcmEncryptor` +- `KeyManagementClient` trait and `InMemoryKms` +- `StandardKeyMetadata` with Avro serialization (Java-compatible) +- `KeyCache` with LRU + TTL +- `EncryptionManager` with `prepare_decryption()` and `bulk_prepare_decryption()` +- `AesGcmFileRead` (AGS1 stream decryption implementing `FileRead`) +- `InputFile` enum conversion (`Plain`, `Encrypted`, `NativeEncrypted`) +- `FileIO::new_encrypted_input()` integration +- Manifest and snapshot decryption +- `FileScanTask.key_metadata` propagation +- `IcebergKeyRetriever` for Parquet native encryption +- Arrow reader integration with `FileDecryptionProperties` +- Feature-gated behind `encryption` feature flag +- Integration tests with `InMemoryKms` + +### Phase 2: Write Path + +- `OutputFile` enum conversion (mirroring `InputFile`) +- `AesGcmFileWrite` (AGS1 stream encryption implementing `FileWrite`) +- `EncryptionManager::prepare_encryption()` (generate DEK, wrap with KMS, create metadata) +- `FileIO::new_encrypted_output()` integration +- Parquet writer encryption support (`FileEncryptionProperties`) +- Encrypted manifest and manifest list writing +- Encrypted snapshot commit flow + +### Phase 3: Production KMS Implementations + +- AWS KMS `KeyManagementClient` implementation +- Azure Key Vault `KeyManagementClient` implementation +- GCP KMS `KeyManagementClient` implementation + + +### Phase 4: Storage Trait Adaptation + +- Adapt to RFC 0002 when it merges: + - Replace `Operator` with `Arc` in `InputFile`/`OutputFile` enum variants + - Replace `Extensions`-based `EncryptionManager` injection with the new pattern + (catalog-level or `EncryptingStorageFactory` wrapper) + - Remove any `Extensions`-specific code + +### Future Work + +- Column-level encryption policies (encrypt specific columns with different keys) +- Key rotation support (re-encrypt DEKs with new KEKs without re-encrypting data) +- Encryption metadata in `TableMetadata` write path +- AES-256-GCM support (depends on apache/arrow-rs#9203) + +--- + +## Compatibility + +### Java Interoperability + +Cross-language compatibility is a hard requirement: + +- **AGS1 format**: Byte-level compatible with Java's `AesGcmInputStream`/`AesGcmOutputStream` + (same header, block size, nonce/tag layout, AAD construction) +- **StandardKeyMetadata**: Avro-serialized with the same schema as Java, enabling Rust to + read tables encrypted by Java/Spark and vice versa +- **Parquet native encryption**: Uses the same `KeyRetriever` interface from `parquet-rs`, + which follows the Parquet spec + +### Feature Flag + +All encryption code is gated behind `--features encryption` to avoid adding cryptographic +dependencies for users who don't need encryption. The `aes-gcm` and `zeroize` crates are only compiled when enabled. + +When the `encryption` feature is not enabled and an encrypted file is encountered, clear +error messages are returned indicating that the feature must be enabled. + +--- + +## Risks and Mitigations + +| Risk | Description | Mitigation | +| ---- | ----------- | ---------- | +| Storage trait churn | RFC 0002 may change `InputFile`/`FileIO` significantly | Design encryption module with clean boundaries; crypto/key management/stream code is independent of storage abstraction | +| Parquet async/sync bridge | `KeyRetriever` is sync but KMS calls are async | Use `std::thread::scope` + `runtime.block_on()` to bridge; document the requirement for a tokio runtime handle | +| Key metadata format drift | Java may evolve `StandardKeyMetadata` | Pin to Avro schema version; add schema version detection for forward compatibility | +| Performance: KMS latency | KMS round-trips add latency to file opens | `KeyCache` with TTL; `bulk_prepare_decryption()` for parallel unwrapping | +| `InputFile` enum breaking change | Converting from struct to enum breaks existing code | Not sure, I think we have to break this | + +## Open Questions + +1. **KMS crate structure**: Should KMS implementations live in `iceberg-encryption-{provider}` + crates, or in the existing catalog crates (since AWS KMS is often used with Glue catalog)? +2. **Write path priority**: Should Phase 2 (write path) block on Phase 3 (storage trait + adaptation), or proceed independently? + +## Conclusion + +This RFC introduces encryption support for `iceberg-rust`, following the Iceberg spec and +maintaining byte-level compatibility with the Java reference implementation. The design +separates concerns into pluggable components (KMS client, key cache, stream cipher, encryption +manager) and integrates with the existing read path through `FileIO`, manifest loading, and +the Arrow/Parquet reader. The `InputFile` type is evolved from a concrete struct to an enum +to cleanly represent encrypted file variants, mirroring Java's interface hierarchy. The +implementation is feature-gated and designed to adapt cleanly to the upcoming storage trait +refactoring from RFC 0002. From 4eef7ded4b0f61e4e0ffadebccfc6770bc32dda5 Mon Sep 17 00:00:00 2001 From: Xander Date: Fri, 27 Feb 2026 19:10:49 +0000 Subject: [PATCH 2/4] update --- docs/rfcs/0003_table_encryption.md | 645 ++++++++++------------------- 1 file changed, 227 insertions(+), 418 deletions(-) diff --git a/docs/rfcs/0003_table_encryption.md b/docs/rfcs/0003_table_encryption.md index f830377568..544f06e484 100644 --- a/docs/rfcs/0003_table_encryption.md +++ b/docs/rfcs/0003_table_encryption.md @@ -42,16 +42,6 @@ production-tested. It defines: - **`StandardKeyMetadata`** -- Avro-serialized key metadata (wrapped DEK, AAD prefix, file length) - **`AesGcmInputStream` / `AesGcmOutputStream`** -- block-based stream encryption (AGS1 format) -### Problem Statement - -The Rust implementation currently has no encryption support. Users reading encrypted Iceberg -tables created by Java/Spark cannot do so from `iceberg-rust`. Writing encrypted tables is -likewise impossible. - -Additionally, the current `InputFile` type is a concrete struct tightly coupled to `opendal::Operator`. -This prevents cleanly representing encrypted input files -- in the Java implementation, `InputFile` -is an interface with encrypted variants (`EncryptedInputFile`, `NativeEncryptionInputFile`). - ### Relationship to Storage Trait RFC [RFC 0002 (Making Storage a Trait)](https://github.com/apache/iceberg-rust/pull/2116) proposes @@ -63,8 +53,23 @@ converting `Storage` from an enum to a trait and removing the `Extensions` mecha ## High-Level Architecture -The encryption system follows the same envelope encryption pattern as the Java implementation, -adapted to Rust's ownership and async model. +The encryption system uses two-layer envelope encryption, adapted from the Java implementation +to Rust's ownership and async model. + +### Key Hierarchy + +``` +Master Key (in KMS) + └── wraps → KEK (Key Encryption Key) — stored in table metadata as EncryptedKey + └── wraps → DEK (Data Encryption Key) — stored in StandardKeyMetadata per file + └── encrypts → file content (AGS1 stream or Parquet native) +``` + +- **Master keys** live in the KMS and never leave it +- **KEKs** are wrapped by the master key and stored in `TableMetadata.encryption_keys` +- **DEKs** are wrapped by a KEK and stored per-file in `StandardKeyMetadata` +- KEKs are cached in memory (moka async cache with configurable TTL) to avoid redundant KMS calls +- KEK rotation occurs automatically when a KEK exceeds its configurable lifespan (default 730 days per NIST SP 800-57) ### Component Overview @@ -75,22 +80,26 @@ adapted to Rust's ownership and async model. │ ▼ ┌─────────────────────────────────────────────────────────────────────────────┐ -│ EncryptionManager │ -│ │ -│ - Orchestrates key unwrapping, caching, and encryptor creation │ -│ - Holds Arc for KMS integration │ -│ - Maintains KeyCache (LRU + TTL) to avoid redundant KMS calls │ -│ - Provides prepare_decryption() and bulk_prepare_decryption() │ -│ - Provides extract_aad_prefix() for Parquet native encryption │ +│ EncryptionManager (trait) │ +│ │ +│ StandardEncryptionManager: │ +│ - Two-layer envelope: Master → KEK → DEK │ +│ - KEK cache (moka async, configurable TTL) │ +│ - Automatic KEK rotation │ +│ - encrypt() / decrypt() for AGS1 stream files │ +│ - encrypt_native() for Parquet Modular Encryption │ +│ - wrap/unwrap_key_metadata() for manifest list keys │ +│ - generate_dek() with KEK management │ └─────────────────────────────────────────────────────────────────────────────┘ │ │ ▼ ▼ ┌──────────────────────────┐ ┌──────────────────────────────────────────────┐ -│ KeyManagementClient │ │ KeyCache │ -│ (trait) │ │ │ -│ │ │ - LRU cache with configurable TTL │ -│ wrap_key(dek, kek_id) │ │ - Thread-safe │ -│ unwrap_key(wrapped_dek) │ │ - Caches Arc per metadata │ +│ KeyManagementClient │ │ KEK Cache │ +│ (trait) │ │ │ +│ │ │ - moka::future::Cache with configurable TTL │ +│ wrap_key(key, key_id) │ │ - Thread-safe async │ +│ unwrap_key(wrapped, id) │ │ - Caches plaintext KEK bytes per key ID │ +│ initialize(props) │ │ │ └──────────────────────────┘ └──────────────────────────────────────────────┘ │ ▼ @@ -104,59 +113,99 @@ adapted to Rust's ownership and async model. └──────────────────────────┘ ``` -### Decryption Data Flow +### Data Flow + +#### Read Path (Decryption) ``` TableMetadata - └── encryption_keys: {key_id → EncryptedKey(key_metadata bytes)} + └── encryption_keys: {key_id → EncryptedKey} │ Snapshot │ - └── encryption_key_id ──────┘ + └── encryption_key_id ──────┘ (V3 format only) │ ▼ load_manifest_list(file_io, table_metadata) 1. Look up encryption_key_id in table_metadata.encryption_keys - 2. If found: file_io.new_encrypted_input(path, key_metadata) giving a new encrypted InputFile - 3. If not: file_io.new_input(path) + 2. em.unwrap_key_metadata() → plaintext key metadata + 3. file_io.new_encrypted_input(path, key_metadata) → AGS1-decrypting InputFile │ ▼ ManifestFile └── key_metadata: Option> │ load_manifest(file_io) - 1. If key_metadata present: file_io.new_encrypted_input(path, key_metadata) giving a new encrypted InputFile - 2. If not: file_io.new_input(path) + 1. If key_metadata present: file_io.new_encrypted_input() → AGS1-decrypting InputFile + 2. If not: file_io.new_input() │ ▼ FileScanTask └── key_metadata: Option> │ - ArrowReader::create_parquet_record_batch_stream_builder() + ArrowReader::create_parquet_record_batch_stream_builder_with_key_metadata() 1. If key_metadata present: - * Parquet-native encrypted → FileDecryptionProperties with IcebergKeyRetriever + a. file_io.new_native_encrypted_input(path, key_metadata) → NativeEncrypted InputFile + b. Build FileDecryptionProperties from NativeKeyMaterial (DEK + AAD prefix) + c. Pass to ParquetRecordBatchStreamBuilder 2. If not: standard Parquet read ``` -### Crate Structure +#### Write Path (Encryption) + +``` +RollingFileWriter::new_output_file() + 1. If file_io.encryption_manager() is Some: + a. file_io.new_native_encrypted_output(path) → EncryptedOutputFile + b. EncryptionManager generates DEK, wraps with KEK + c. OutputFile::NativeEncrypted carries NativeKeyMaterial for Parquet writer + d. Store key_metadata bytes on DataFile + 2. ParquetWriter detects NativeEncrypted, configures FileEncryptionProperties + +SnapshotProducer::commit() + 1. Manifest writing: + a. file_io.new_encrypted_output(path) → AGS1-encrypting OutputFile + b. Store key_metadata on ManifestFile entry + 2. Manifest list writing: + a. file_io.new_encrypted_output(path) → AGS1-encrypting OutputFile + b. em.wrap_key_metadata() → EncryptedKey for table metadata + c. Store key_id on Snapshot.encryption_key_id + 3. Table updates include AddEncryptionKey for new KEKs +``` -All encryption code lives in a new `encryption` module within the `iceberg` crate, gated -behind an `encryption` feature flag: +### Module Structure ``` crates/iceberg/src/ ├── encryption/ -│ ├── mod.rs # Module re-exports -│ ├── crypto.rs # AES-GCM primitives (SecureKey, AesGcmEncryptor) -│ ├── cache.rs # KeyCache (LRU + TTL) -│ ├── key_management.rs # KeyManagementClient trait + InMemoryKms -│ ├── key_metadata.rs # EncryptionKeyMetadata trait + StandardKeyMetadata -│ ├── manager.rs # EncryptionManager (orchestrator) -│ ├── parquet_key_retriever.rs # Bridge to parquet-rs KeyRetriever -│ └── stream.rs # AesGcmFileRead (AGS1 stream decryption) +│ ├── mod.rs # Module re-exports +│ ├── crypto.rs # AES-GCM primitives (SecureKey, AesGcmEncryptor) +│ ├── key_management.rs # KeyManagementClient trait +│ ├── key_metadata.rs # StandardKeyMetadata (Avro V1, Java-compatible) +│ ├── encryption_manager.rs # EncryptionManager trait + StandardEncryptionManager +│ ├── plaintext_encryption_manager.rs # No-op pass-through for unencrypted tables +│ ├── file_encryptor.rs # FileEncryptor (write-side AGS1 wrapper) +│ ├── file_decryptor.rs # FileDecryptor (read-side AGS1 wrapper) +│ ├── encrypted_io.rs # EncryptedInputFile / EncryptedOutputFile wrappers +│ ├── stream.rs # AesGcmFileRead / AesGcmFileWrite (AGS1 format) +│ ├── kms/ +│ │ ├── mod.rs +│ │ └── in_memory.rs # InMemoryKms (testing only) +│ └── integration_tests.rs # End-to-end encryption round-trip tests ├── io/ -│ └── file_io.rs # InputFile enum + EncryptedInputFile variant -└── arrow/ - └── reader.rs # Parquet decryption integration +│ └── file_io.rs # InputFile/OutputFile enums, FileIO encryption methods +├── arrow/ +│ └── reader.rs # Parquet decryption via FileDecryptionProperties +├── writer/file_writer/ +│ ├── parquet_writer.rs # Parquet FileEncryptionProperties integration +│ └── rolling_writer.rs # Encrypted output file creation + key_metadata propagation +├── transaction/ +│ └── snapshot.rs # Encrypted manifest/manifest list writing, KEK management +├── scan/ +│ ├── context.rs # key_metadata propagation from DataFile → FileScanTask +│ └── task.rs # FileScanTask.key_metadata field +└── spec/ + ├── snapshot.rs # Snapshot.encryption_key_id, load_manifest_list decryption + └── manifest_list.rs # ManifestFile.key_metadata, load_manifest decryption ``` --- @@ -170,12 +219,7 @@ crates/iceberg/src/ ```rust pub enum EncryptionAlgorithm { Aes128Gcm, - // Future: Aes256Gcm -} - -impl EncryptionAlgorithm { - pub fn key_length(&self) -> usize; // 16 for AES-128 - pub fn nonce_length(&self) -> usize; // 12 (96-bit) + // Future: Aes256Gcm (depends on parquet-rs support) } ``` @@ -197,7 +241,7 @@ impl SecureKey { #### AesGcmEncryptor -Performs AES-GCM encrypt/decrypt operations. Ciphertext format matches the Java implementation: +AES-GCM encrypt/decrypt. Ciphertext format matches the Java implementation: `[12-byte nonce][ciphertext][16-byte GCM tag]`. ```rust @@ -218,12 +262,10 @@ Pluggable interface for KMS integration. Mirrors the Java `KeyManagementClient`: ```rust #[async_trait] -pub trait KeyManagementClient: Send + Sync { - /// Wraps a DEK using the master key identified by `master_key_id`. - async fn wrap_key(&self, dek: &[u8], master_key_id: &str) -> Result>; - - /// Unwraps a previously wrapped DEK. - async fn unwrap_key(&self, wrapped_dek: &[u8]) -> Result>; +pub trait KeyManagementClient: Debug + Send + Sync { + async fn initialize(&mut self, properties: HashMap) -> Result<()>; + async fn wrap_key(&self, key: &[u8], wrapping_key_id: &str) -> Result>; + async fn unwrap_key(&self, wrapped_key: &[u8], wrapping_key_id: &str) -> Result>; } ``` @@ -237,69 +279,54 @@ Avro-serialized metadata stored alongside encrypted files. Compatible with the J ```rust pub struct StandardKeyMetadata { - encryption_key: Vec, // Wrapped DEK + encryption_key: Vec, // Plaintext DEK (for PME) or wrapped DEK (for AGS1) aad_prefix: Vec, // Additional authenticated data prefix file_length: Option, // Optional encrypted file length } - -impl StandardKeyMetadata { - pub fn serialize(&self) -> Result>; - pub fn deserialize(bytes: &[u8]) -> Result; -} ``` -#### KeyCache +Wire format: `[version byte 0x01][Avro binary datum]` — byte-compatible with Java. -Thread-safe LRU cache with TTL to avoid redundant KMS round-trips: +### EncryptionManager -```rust -pub struct KeyCache { /* ... */ } +The `EncryptionManager` trait abstracts encryption orchestration. `StandardEncryptionManager` +implements two-layer envelope encryption with KEK caching and rotation: -impl KeyCache { - pub fn new(capacity: usize, ttl: Duration) -> Self; - pub async fn get(&self, key_metadata: &[u8]) -> Option>; - pub async fn insert(&self, key_metadata: &[u8], encryptor: Arc); - pub async fn evict_expired(&self); +```rust +#[async_trait] +pub trait EncryptionManager: Debug + Send + Sync { + /// Decrypt an AGS1 stream-encrypted file. + async fn decrypt(&self, encrypted: EncryptedInputFile) -> Result; + + /// Encrypt a file with AGS1 stream encryption. + async fn encrypt(&self, raw_output: OutputFile) -> Result; + + /// Encrypt for Parquet Modular Encryption (generates NativeKeyMaterial). + async fn encrypt_native(&self, raw_output: OutputFile) -> Result; + + /// Unwrap key metadata using the table's KEK hierarchy. + async fn unwrap_key_metadata( + &self, encrypted_key: &EncryptedKey, + encryption_keys: &HashMap, + ) -> Result>; + + /// Wrap key metadata with a KEK for storage in table metadata. + async fn wrap_key_metadata( + &self, key_metadata: &[u8], + ) -> Result<(EncryptedKey, Option)>; } ``` -### EncryptionManager - -Central orchestrator that ties together KMS, caching, and encryptor creation: +`StandardEncryptionManager` configuration: ```rust -pub struct EncryptionManager { - kms_client: Arc, - algorithm: EncryptionAlgorithm, - key_cache: Arc, -} - -impl EncryptionManager { - pub fn new( - kms_client: Arc, - algorithm: EncryptionAlgorithm, - cache_ttl: Duration, - ) -> Self; - - pub fn with_defaults(kms_client: Arc) -> Self; - - /// Unwraps a DEK from key metadata and returns a cached encryptor. - pub async fn prepare_decryption( - &self, - key_metadata: &[u8], - ) -> Result>; - - /// Batch preparation for multiple files (parallel KMS calls). - pub async fn bulk_prepare_decryption( - &self, - key_metadata_list: Vec>, - ) -> Result>>; - - /// Extracts the AAD prefix from key metadata for Parquet native encryption. - pub fn extract_aad_prefix(&self, key_metadata: &[u8]) -> Result>; -} +let em = StandardEncryptionManager::new(Arc::new(kms_client)) + .with_table_key_id("master-key-1") // Master key ID in KMS + .with_aad_prefix(b"my-table".to_vec()); // AAD prefix for GCM blocks ``` +A `PlaintextEncryptionManager` is also provided as a no-op pass-through for unencrypted tables. + ### AGS1 Stream Encryption Block-based stream encryption format compatible with Java's `AesGcmInputStream`/`AesGcmOutputStream`. @@ -321,153 +348,78 @@ Block-based stream encryption format compatible with Java's `AesGcmInputStream`/ │ Block 1..N (same structure) │ ├──────────────────────────────────────────┤ │ Final block (may be shorter) │ -│ Nonce (12 bytes) │ -│ Ciphertext (remaining bytes) │ -│ GCM Tag (16 bytes) │ └──────────────────────────────────────────┘ - -Cipher block size = plain_block_size + 12 (nonce) + 16 (tag) = 1,048,604 ``` -Each block's AAD is constructed as: `aad_prefix || block_index (4 bytes, little-endian)`. -This binds each block to its position in the stream, preventing block reordering attacks. +Each block's AAD is constructed as `aad_prefix || block_index (4 bytes LE)`, binding each +block to its position in the stream to prevent reordering attacks. -#### AesGcmFileRead +**`AesGcmFileRead`** implements the `FileRead` trait for transparent AGS1 decryption with +random-access reads. **`AesGcmFileWrite`** implements `FileWrite` for transparent AGS1 +encryption with block buffering. -Implements the `FileRead` trait to provide transparent decryption of AGS1-encrypted files. -Supports random-access reads with an internal block cache (LRU, default 16 blocks): +### InputFile and OutputFile Enums -```rust -pub struct AesGcmFileRead { /* ... */ } - -impl AesGcmFileRead { - pub async fn new( - inner: Box, - encryptor: Arc, - key_metadata: &StandardKeyMetadata, - file_length: u64, - ) -> Result; - - pub async fn calculate_plaintext_length_from_file( - reader: &impl FileRead, - file_length: u64, - ) -> Result; -} - -#[async_trait] -impl FileRead for AesGcmFileRead { - async fn read(&self, range: Range) -> Result; -} -``` - -### InputFile: From Struct to Enum - -**This is a key design change.** The current `InputFile` is a concrete struct. In the Java -implementation, `InputFile` is an interface with multiple implementations including encrypted -variants. We propose converting `InputFile` to an enum to support encrypted files without -requiring a separate type at every call site: +`InputFile` and `OutputFile` are enums with three variants each: ```rust pub enum InputFile { - /// Standard unencrypted input file. - Plain { - op: Operator, - path: String, - relative_path_pos: usize, - }, - - /// AGS1 stream-encrypted input file. - /// The file is decrypted transparently on read. - Encrypted { - op: Operator, - path: String, - relative_path_pos: usize, - encryptor: Arc, - key_metadata: StandardKeyMetadata, - }, - - /// Parquet-native encrypted input file. - /// Decryption is handled by the Parquet reader using FileDecryptionProperties. - /// The InputFile itself reads raw (encrypted) bytes. - NativeEncrypted { - op: Operator, - path: String, - relative_path_pos: usize, - key_metadata: Vec, - }, -} -``` + /// Standard unencrypted file. + Plain { storage: Arc, path: String }, -This mirrors the Java hierarchy: + /// AGS1 stream-encrypted file. Transparent decryption on read. + Encrypted { storage: Arc, path: String, decryptor: Arc }, -| Java | Rust | -|-------------------------------|-------------------------------| -| `InputFile` (interface) | `InputFile` (enum) | -| Regular `InputFile` impl | `InputFile::Plain` | -| `EncryptedInputFile` wrapper | `InputFile::Encrypted` | -| `NativeEncryptionInputFile` | `InputFile::NativeEncrypted` | - -Common operations delegate to the appropriate variant: + /// Parquet Modular Encryption. Raw reads; Parquet reader handles decryption. + NativeEncrypted { storage: Arc, path: String, key_material: NativeKeyMaterial }, +} -```rust -impl InputFile { - pub fn location(&self) -> &str; - pub async fn exists(&self) -> Result; - pub async fn metadata(&self) -> Result; - pub async fn read(&self) -> Result; - pub async fn reader(&self) -> Result>; +pub enum OutputFile { + Plain { storage: Arc, path: String }, + Encrypted { storage: Arc, path: String, encryptor: Arc }, + NativeEncrypted { storage: Arc, path: String, key_material: NativeKeyMaterial }, } ``` -For the `Encrypted` variant, `read()` and `reader()` transparently decrypt via `AesGcmFileRead`. -For the `NativeEncrypted` variant, `read()` and `reader()` return raw bytes -- the Parquet -reader handles decryption using `FileDecryptionProperties`. +`NativeKeyMaterial` carries the plaintext DEK and AAD prefix for Parquet's +`FileEncryptionProperties` / `FileDecryptionProperties`. + +Common operations (`location()`, `exists()`, `read()`, `reader()`, `write()`, `writer()`) +delegate to the appropriate variant, with `Encrypted` variants transparently encrypting/decrypting +via `AesGcmFileRead`/`AesGcmFileWrite`. #### Adaptation for Storage Trait RFC -Once RFC 0002 merges, `InputFile` will hold `Arc` instead of `Operator`. The enum -structure remains the same -- only the inner storage handle type changes: - -```rust -// After Storage Trait RFC merges: -pub enum InputFile { - Plain { - storage: Arc, - path: String, - }, - Encrypted { - storage: Arc, - path: String, - encryptor: Arc, - key_metadata: StandardKeyMetadata, - }, - NativeEncrypted { - storage: Arc, - path: String, - key_metadata: Vec, - }, -} -``` +Once RFC 0002 merges, `InputFile` will hold `Arc` instead of `Operator`. This is +already the case in this implementation — the enum structure is stable. Only the underlying +`Storage` trait implementation may change. ### FileIO Integration -#### Current Approach (with Extensions) - -The `EncryptionManager` is injected into `FileIO` via the existing `Extensions` mechanism: +The `EncryptionManager` is stored as a type-safe `FileIOBuilder` extension. This integrates +naturally with catalogs that support extensions (e.g. `RestCatalog.with_file_io_extension()`): ```rust -let encryption_manager = EncryptionManager::with_defaults(Arc::new(kms_client)); - +// Via FileIOBuilder extension (works with RestCatalog and any extension-aware catalog) let file_io = FileIOBuilder::new("s3") .with_prop("s3.region", "us-east-1") .with_extension(encryption_manager) .build()?; -// Creates an encrypted InputFile -let input = file_io.new_input(path, key_metadata).await?; -let data = input.read().await?; +// Or via convenience method on FileIO +let file_io = file_io.with_encryption_manager(encryption_manager); ``` +FileIO provides encryption-aware factory methods: + +| Method | Purpose | +|--------|---------| +| `new_encrypted_input(path, key_metadata)` | AGS1 stream decryption (manifests, manifest lists) | +| `new_encrypted_output(path)` | AGS1 stream encryption | +| `new_native_encrypted_input(path, key_metadata)` | PME input (Parquet handles decryption) | +| `new_native_encrypted_output(path)` | PME output (Parquet handles encryption) | +| `encryption_manager()` | Returns the configured EncryptionManager, if any | + #### After Storage Trait RFC RFC 0002 removes `Extensions` from `FileIOBuilder`. The `EncryptionManager` will instead be @@ -481,7 +433,7 @@ let catalog = GlueCatalogBuilder::default() .load("my_catalog", props) .await?; -// Option B: Wrapping StorageFactory - I'm pretty sure this is more idomatic in the new trait world. +// Option B: Wrapping StorageFactory pub struct EncryptingStorageFactory { inner: Arc, encryption_manager: Arc, @@ -498,182 +450,66 @@ impl StorageFactory for EncryptingStorageFactory { The exact integration point will be finalized when RFC 0002 merges. The encryption module's internal design (crypto, key management, stream format) is unaffected. -### Parquet Native Encryption Bridge +### Parquet Modular Encryption -For files using Parquet Modular Encryption (where the Parquet file itself contains encrypted -column chunks), we bridge Iceberg's async key management with parquet-rs's synchronous -`KeyRetriever` trait: +For Parquet data files, encryption is handled natively by the Parquet reader/writer using +`FileEncryptionProperties` and `FileDecryptionProperties` from `parquet-rs`. -```rust -pub struct IcebergKeyRetriever { - encryption_manager: Arc, - runtime: tokio::runtime::Handle, -} +**Write path** (`ParquetWriter`): When the output file is `NativeEncrypted`, the writer extracts +`NativeKeyMaterial` (plaintext DEK + AAD prefix) and configures `FileEncryptionProperties` on the +`AsyncArrowWriter`. The Parquet crate handles column/page-level encryption. -impl KeyRetriever for IcebergKeyRetriever { - fn retrieve_key(&self, key_metadata: &[u8]) -> parquet::errors::Result> { - // Bridge async → sync using the tokio runtime handle - std::thread::scope(|s| { - s.spawn(|| { - self.runtime.block_on(async { - self.encryption_manager - .prepare_decryption(key_metadata) - .await - }) - }) - .join() - }) - } -} -``` +**Read path** (`ArrowReader`): When `FileScanTask.key_metadata` is present, the reader calls +`file_io.new_native_encrypted_input()` which deserializes `StandardKeyMetadata` to extract the +plaintext DEK and AAD prefix. These are used to build `FileDecryptionProperties` which are +passed to `ParquetRecordBatchStreamBuilder::new_with_options()`. -The Arrow reader integrates this when `key_metadata` is present on a `FileScanTask`: +The `ArrowFileReader::get_metadata()` implementation forwards both `file_decryption_properties` +and `metadata_options` from `ArrowReaderOptions` to `ParquetMetaDataReader`, enabling encrypted +footer parsing. -```rust -// In ArrowReader: -if let Some(key_metadata) = &task.key_metadata { - let key_retriever = Arc::new(IcebergKeyRetriever::new( - encryption_manager, - runtime_handle, - )); - let decryption_properties = FileDecryptionProperties::with_key_retriever( - key_retriever as Arc, - ) - .build()?; - builder = builder.with_file_decryption_properties(decryption_properties); -} -``` +### Catalog Integration -### Manifest & Snapshot Integration +Encryption is configured at the catalog level. The `EncryptionManager` is attached to the +catalog's `FileIO`, and all tables loaded from that catalog inherit it automatically. -#### ManifestFile - -The `ManifestFile` struct gains an optional `key_metadata` field. When present, -`load_manifest()` uses encrypted I/O: +For catalogs that support `FileIOBuilder` extensions (e.g. `RestCatalog`), the encryption manager +can be added directly: ```rust -pub struct ManifestFile { - // ... existing fields ... - pub key_metadata: Option>, -} - -impl ManifestFile { - pub async fn load_manifest(&self, file_io: &FileIO) -> Result { - let avro = match &self.key_metadata { - Some(km) => { - file_io - .new_encrypted_input(&self.manifest_path, km) - .await? - .read() - .await? - } - None => { - file_io.new_input(&self.manifest_path)?.read().await? - } - }; - // Deserialize Avro manifest... - } -} +rest_catalog.with_file_io_extension(encryption_manager); ``` -#### Snapshot - -Snapshots reference an `encryption_key_id` that maps to a key in `TableMetadata.encryption_keys`: +For catalogs that don't support extensions, an `EncryptedCatalog` wrapper implements the +`Catalog` trait by delegating to an inner catalog and attaching the `EncryptionManager` to +every `Table` returned: ```rust -pub struct Snapshot { - // ... existing fields ... - pub encryption_key_id: Option, -} - -impl Snapshot { - pub async fn load_manifest_list( - &self, - file_io: &FileIO, - table_metadata: &TableMetadata, - ) -> Result { - let bytes = match &self.encryption_key_id { - Some(key_id) => { - let encrypted_key = table_metadata - .encryption_keys - .get(key_id) - .ok_or_else(|| /* error */)?; - file_io - .new_encrypted_input(&self.manifest_list, &encrypted_key.key_metadata) - .await? - .read() - .await? - } - None => file_io.new_input(&self.manifest_list)?.read().await?, - }; - ManifestList::parse(bytes, /* ... */) - } -} -``` - -#### FileScanTask - -Propagates per-file encryption metadata through the scan pipeline: - -```rust -pub struct FileScanTask { - // ... existing fields ... - pub key_metadata: Option>, -} +let inner_catalog = MemoryCatalogBuilder::default().load("c", props).await?; +let encrypted = EncryptedCatalog::new(Arc::new(inner_catalog), encryption_manager); ``` ---- - -## Implementation Plan - -### Phase 1: Core Encryption (Read Path) - -- Cryptographic primitives: `EncryptionAlgorithm`, `SecureKey`, `AesGcmEncryptor` -- `KeyManagementClient` trait and `InMemoryKms` -- `StandardKeyMetadata` with Avro serialization (Java-compatible) -- `KeyCache` with LRU + TTL -- `EncryptionManager` with `prepare_decryption()` and `bulk_prepare_decryption()` -- `AesGcmFileRead` (AGS1 stream decryption implementing `FileRead`) -- `InputFile` enum conversion (`Plain`, `Encrypted`, `NativeEncrypted`) -- `FileIO::new_encrypted_input()` integration -- Manifest and snapshot decryption -- `FileScanTask.key_metadata` propagation -- `IcebergKeyRetriever` for Parquet native encryption -- Arrow reader integration with `FileDecryptionProperties` -- Feature-gated behind `encryption` feature flag -- Integration tests with `InMemoryKms` - -### Phase 2: Write Path - -- `OutputFile` enum conversion (mirroring `InputFile`) -- `AesGcmFileWrite` (AGS1 stream encryption implementing `FileWrite`) -- `EncryptionManager::prepare_encryption()` (generate DEK, wrap with KMS, create metadata) -- `FileIO::new_encrypted_output()` integration -- Parquet writer encryption support (`FileEncryptionProperties`) -- Encrypted manifest and manifest list writing -- Encrypted snapshot commit flow +The wrapper intercepts all `Table`-returning methods (`load_table`, `create_table`, +`register_table`, `update_table`) and calls `table.with_file_io(...)` to attach encryption. +All other catalog methods delegate directly. -### Phase 3: Production KMS Implementations +`Table::with_file_io()` replaces the table's `FileIO` and rebuilds its `ObjectCache` (which +stores its own `FileIO` for manifest/manifest list loading). -- AWS KMS `KeyManagementClient` implementation -- Azure Key Vault `KeyManagementClient` implementation -- GCP KMS `KeyManagementClient` implementation +### DataFusion Integration +The DataFusion integration requires **no encryption-specific code**. Encryption flows +transparently through the existing pipeline: -### Phase 4: Storage Trait Adaptation +- **`IcebergTableProvider::scan()`** calls `catalog.load_table()` → table has encrypted FileIO → `IcebergTableScan` → `table.scan().to_arrow()` → `ArrowReader` decrypts via `key_metadata` +- **`IcebergTableProvider::insert_into()`** calls `catalog.load_table()` → table has encrypted FileIO → `IcebergWriteExec` uses `table.file_io()` → `RollingFileWriterBuilder` detects encryption → PME-encrypted data files + AGS1-encrypted manifests +- **`IcebergCommitExec`** uses `Transaction::fast_append()` → `SnapshotProducer` writes encrypted manifests/manifest list → `AddEncryptionKey` updates persisted in table metadata -- Adapt to RFC 0002 when it merges: - - Replace `Operator` with `Arc` in `InputFile`/`OutputFile` enum variants - - Replace `Extensions`-based `EncryptionManager` injection with the new pattern - (catalog-level or `EncryptingStorageFactory` wrapper) - - Remove any `Extensions`-specific code +### Format Version Requirement -### Future Work - -- Column-level encryption policies (encrypt specific columns with different keys) -- Key rotation support (re-encrypt DEKs with new KEKs without re-encrypting data) -- Encryption metadata in `TableMetadata` write path -- AES-256-GCM support (depends on apache/arrow-rs#9203) +Snapshot-level `encryption_key_id` is serialized only in the **V3** snapshot format. V2 snapshots +do not include this field, so encrypted manifest lists cannot be read back after a V2 round-trip. +Tables using encryption must use format version V3. --- @@ -687,43 +523,16 @@ Cross-language compatibility is a hard requirement: (same header, block size, nonce/tag layout, AAD construction) - **StandardKeyMetadata**: Avro-serialized with the same schema as Java, enabling Rust to read tables encrypted by Java/Spark and vice versa -- **Parquet native encryption**: Uses the same `KeyRetriever` interface from `parquet-rs`, - which follows the Parquet spec - -### Feature Flag - -All encryption code is gated behind `--features encryption` to avoid adding cryptographic -dependencies for users who don't need encryption. The `aes-gcm` and `zeroize` crates are only compiled when enabled. - -When the `encryption` feature is not enabled and an encrypted file is encountered, clear -error messages are returned indicating that the feature must be enabled. +- **Parquet native encryption**: Uses `FileDecryptionProperties`/`FileEncryptionProperties` + from `parquet-rs`, which follows the Parquet spec --- -## Risks and Mitigations - -| Risk | Description | Mitigation | -| ---- | ----------- | ---------- | -| Storage trait churn | RFC 0002 may change `InputFile`/`FileIO` significantly | Design encryption module with clean boundaries; crypto/key management/stream code is independent of storage abstraction | -| Parquet async/sync bridge | `KeyRetriever` is sync but KMS calls are async | Use `std::thread::scope` + `runtime.block_on()` to bridge; document the requirement for a tokio runtime handle | -| Key metadata format drift | Java may evolve `StandardKeyMetadata` | Pin to Avro schema version; add schema version detection for forward compatibility | -| Performance: KMS latency | KMS round-trips add latency to file opens | `KeyCache` with TTL; `bulk_prepare_decryption()` for parallel unwrapping | -| `InputFile` enum breaking change | Converting from struct to enum breaks existing code | Not sure, I think we have to break this | - -## Open Questions - -1. **KMS crate structure**: Should KMS implementations live in `iceberg-encryption-{provider}` - crates, or in the existing catalog crates (since AWS KMS is often used with Glue catalog)? -2. **Write path priority**: Should Phase 2 (write path) block on Phase 3 (storage trait - adaptation), or proceed independently? - -## Conclusion - -This RFC introduces encryption support for `iceberg-rust`, following the Iceberg spec and -maintaining byte-level compatibility with the Java reference implementation. The design -separates concerns into pluggable components (KMS client, key cache, stream cipher, encryption -manager) and integrates with the existing read path through `FileIO`, manifest loading, and -the Arrow/Parquet reader. The `InputFile` type is evolved from a concrete struct to an enum -to cleanly represent encrypted file variants, mirroring Java's interface hierarchy. The -implementation is feature-gated and designed to adapt cleanly to the upcoming storage trait -refactoring from RFC 0002. +## Future Work + +- **Production KMS implementations**: AWS KMS, Azure Key Vault, GCP KMS +- **Column-level encryption policies**: Encrypt specific columns with different keys +- **Key rotation support**: Re-encrypt DEKs with new KEKs without re-encrypting data +- **AES-256-GCM support**: Depends on `parquet-rs` support +- **Storage Trait adaptation**: Replace `Extensions`-based `EncryptionManager` injection + with the pattern from RFC 0002 (catalog-level or `EncryptingStorageFactory` wrapper) From b375682763bbca333e7b2148ffd2b3b753b46c4a Mon Sep 17 00:00:00 2001 From: Xander Date: Fri, 27 Feb 2026 20:58:13 +0000 Subject: [PATCH 3/4] Update with kms client registration --- docs/rfcs/0003_table_encryption.md | 95 ++++++++++++++++++++++++------ 1 file changed, 77 insertions(+), 18 deletions(-) diff --git a/docs/rfcs/0003_table_encryption.md b/docs/rfcs/0003_table_encryption.md index 544f06e484..c7506f21dc 100644 --- a/docs/rfcs/0003_table_encryption.md +++ b/docs/rfcs/0003_table_encryption.md @@ -317,16 +317,17 @@ pub trait EncryptionManager: Debug + Send + Sync { } ``` -`StandardEncryptionManager` configuration: +`StandardEncryptionManager` is typically not constructed directly by users. Instead, +`TableBuilder::build()` constructs it automatically from a `KeyManagementClient` +extension and the table's properties (see [Catalog Integration](#catalog-integration) below). +For manual construction in tests: ```rust let em = StandardEncryptionManager::new(Arc::new(kms_client)) .with_table_key_id("master-key-1") // Master key ID in KMS - .with_aad_prefix(b"my-table".to_vec()); // AAD prefix for GCM blocks + .with_encryption_keys(table_metadata.encryption_keys.clone()); ``` -A `PlaintextEncryptionManager` is also provided as a no-op pass-through for unencrypted tables. - ### AGS1 Stream Encryption Block-based stream encryption format compatible with Java's `AesGcmInputStream`/`AesGcmOutputStream`. @@ -470,32 +471,90 @@ footer parsing. ### Catalog Integration -Encryption is configured at the catalog level. The `EncryptionManager` is attached to the -catalog's `FileIO`, and all tables loaded from that catalog inherit it automatically. +#### How Java Does It + +In Java, encryption is configured through two catalog properties: + +- `encryption.kms-impl` — fully qualified class name of the `KeyManagementClient` implementation + (e.g. `org.apache.iceberg.aws.AwsKeyManagementClient`) +- `encryption.kms-type` — reserved for a future built-in registry of KMS types, but + **not yet implemented** in Java (any value throws `"Unsupported KMS type"`) + +These are mutually exclusive. `kms-impl` uses Java reflection to instantiate the KMS client +from a class name. The `table_key_id` is then read from the table's `encryption.key-id` +property, and a per-table `StandardEncryptionManager` is constructed in +`RESTTableOperations.encryption()`. The base `FileIO` is wrapped with `EncryptingFileIO.combine(io, encryption())` +to produce a per-table encrypting FileIO. + +#### How Rust Does It -For catalogs that support `FileIOBuilder` extensions (e.g. `RestCatalog`), the encryption manager -can be added directly: +Rust does not have Java's reflection-based class loading, so `encryption.kms-impl` (a class name +string) is not useful. Instead, the user provides a concrete `Arc` +as a `FileIOBuilder` extension on the catalog. The `table_key_id` and `encryption_keys` are +then inferred automatically from table metadata. + +**Step 1: User provides the KMS client to the catalog.** + +For catalogs that support `FileIOBuilder` extensions (e.g. `RestCatalog`): ```rust -rest_catalog.with_file_io_extension(encryption_manager); +let kms_client: Arc = Arc::new(my_aws_kms); + +let catalog = RestCatalogBuilder::default() + .load("rest", props) + .await? + .with_file_io_extension(kms_client); ``` -For catalogs that don't support extensions, an `EncryptedCatalog` wrapper implements the -`Catalog` trait by delegating to an inner catalog and attaching the `EncryptionManager` to -every `Table` returned: +**Step 2: `TableBuilder::build()` auto-configures encryption per table.** + +When building a `Table`, `TableBuilder::maybe_configure_encryption()` runs automatically. +This is the Rust equivalent of Java's `RESTTableOperations.io()` which calls +`EncryptingFileIO.combine(io, encryption())`. It checks: + +1. Does the `FileIO` have a `KeyManagementClient` extension? If not, return as-is. +2. Does the table metadata have an `encryption.key-id` property? If not, return as-is (unencrypted table). +3. If both are present, construct a `StandardEncryptionManager` with: + - `table_key_id` from the `encryption.key-id` table property + - `encryption_keys` from `TableMetadata.encryption_keys` (the KEK map) + - The `KeyManagementClient` from the extension +4. Attach the `EncryptionManager` to the table's `FileIO`. + +This runs on every `Table::builder().build()` call, so each table gets a correctly configured +per-table `EncryptionManager` even when a single catalog manages tables with different key IDs. ```rust -let inner_catalog = MemoryCatalogBuilder::default().load("c", props).await?; -let encrypted = EncryptedCatalog::new(Arc::new(inner_catalog), encryption_manager); +// User code — just provide the KMS client, everything else is automatic: +let table = catalog.load_table(&ident).await?; +// table.file_io() now has a StandardEncryptionManager configured with +// the correct table_key_id and encryption_keys from this table's metadata. ``` -The wrapper intercepts all `Table`-returning methods (`load_table`, `create_table`, -`register_table`, `update_table`) and calls `table.with_file_io(...)` to attach encryption. -All other catalog methods delegate directly. - `Table::with_file_io()` replaces the table's `FileIO` and rebuilds its `ObjectCache` (which stores its own `FileIO` for manifest/manifest list loading). +#### Open Decision: KMS Client Injection Mechanism + +The current approach requires the user to manually construct and provide the KMS client. +In production, the REST catalog server may return `encryption.kms-impl` or `encryption.kms-type` +in its config response. A few options for resolving this automatically in Rust: + +1. **Current approach (explicit)**: User constructs `Arc` and adds + it as a `FileIOBuilder` extension. Simple, no magic, works today. + +2. **Type registry**: A `HashMap Arc>>` + mapping `kms-type` strings (e.g. `"aws"`, `"gcp"`) to factory functions. The catalog reads + `encryption.kms-type` from properties and looks up the factory. Requires a registration step + but is closer to Java's `kms-type` intent. + +3. **Catalog-specific logic**: Each catalog implementation (REST, Glue, etc.) knows how to + create its KMS client based on the properties it receives. For example, `RestCatalog` could + detect `encryption.kms-impl = AwsKeyManagementClient` in the config response and + automatically create an `AwsKms` instance with the right endpoint and credentials. + +The right choice depends on how the upstream Iceberg spec evolves `encryption.kms-type`. For +now, option 1 (explicit) is implemented and sufficient for production use. + ### DataFusion Integration The DataFusion integration requires **no encryption-specific code**. Encryption flows From 24bc3f2df9819f8aca3c29dece25903dad37eec3 Mon Sep 17 00:00:00 2001 From: Xander Date: Wed, 4 Mar 2026 09:22:44 +0000 Subject: [PATCH 4/4] address comments --- docs/rfcs/0003_table_encryption.md | 111 +++++++++++++++++++++-------- 1 file changed, 81 insertions(+), 30 deletions(-) diff --git a/docs/rfcs/0003_table_encryption.md b/docs/rfcs/0003_table_encryption.md index c7506f21dc..5d028b0e17 100644 --- a/docs/rfcs/0003_table_encryption.md +++ b/docs/rfcs/0003_table_encryption.md @@ -53,21 +53,37 @@ converting `Storage` from an enum to a trait and removing the `Extensions` mecha ## High-Level Architecture -The encryption system uses two-layer envelope encryption, adapted from the Java implementation -to Rust's ownership and async model. +The encryption system uses envelope encryption with a chained key hierarchy, adapted from the +Java implementation to Rust's ownership and async model. KMS-managed master keys wrap KEKs, +which encrypt only manifest list key metadata. All other DEKs are protected by being stored +inside their encrypted parent files. ### Key Hierarchy ``` Master Key (in KMS) - └── wraps → KEK (Key Encryption Key) — stored in table metadata as EncryptedKey - └── wraps → DEK (Data Encryption Key) — stored in StandardKeyMetadata per file - └── encrypts → file content (AGS1 stream or Parquet native) + └── wraps → KEK (Key Encryption Key) — stored KMS-wrapped in table metadata + └── encrypts → manifest list StandardKeyMetadata (AES-GCM, KEY_TIMESTAMP as AAD) + │ + ├── manifest list DEK → encrypts manifest list file (AGS1) + │ └── manifest key_metadata (plaintext StandardKeyMetadata) stored in manifest list entries + │ └── manifest DEK → encrypts manifest file (AGS1) + │ └── data file key_metadata (plaintext StandardKeyMetadata) stored in manifest entries + │ └── data file DEK → encrypts data file (AGS1 or Parquet native) ``` - **Master keys** live in the KMS and never leave it -- **KEKs** are wrapped by the master key and stored in `TableMetadata.encryption_keys` -- **DEKs** are wrapped by a KEK and stored per-file in `StandardKeyMetadata` +- **KEKs** are wrapped by the master key via KMS (`kmsClient.wrapKey()`) and stored in + `TableMetadata.encryption_keys` with a `KEY_TIMESTAMP` property for rotation tracking +- **DEKs** are generated as plaintext random bytes and stored in `StandardKeyMetadata` per file. + DEKs are **not** individually wrapped by a KEK. Instead, they are protected by being stored + inside their encrypted parent file: + - **Manifest list DEKs**: Their `StandardKeyMetadata` is AES-GCM encrypted by a KEK + (using `KEY_TIMESTAMP` as AAD) and stored as an `EncryptedKey` in table metadata + - **Manifest DEKs**: Their `StandardKeyMetadata` is stored as plaintext `key_metadata` bytes + in manifest list entries — protected because the manifest list file itself is encrypted + - **Data file DEKs**: Their `StandardKeyMetadata` is stored as plaintext `key_metadata` bytes + in manifest entries — protected because the manifest file itself is encrypted - KEKs are cached in memory (moka async cache with configurable TTL) to avoid redundant KMS calls - KEK rotation occurs automatically when a KEK exceeds its configurable lifespan (default 730 days per NIST SP 800-57) @@ -83,13 +99,14 @@ Master Key (in KMS) │ EncryptionManager (trait) │ │ │ │ StandardEncryptionManager: │ -│ - Two-layer envelope: Master → KEK → DEK │ +│ - Envelope encryption: Master → KEK → manifest list StandardKeyMetadata │ +│ - DEKs are plaintext, protected by encrypted parent files │ │ - KEK cache (moka async, configurable TTL) │ -│ - Automatic KEK rotation │ +│ - Automatic KEK rotation (730 days, KEY_TIMESTAMP tracking) │ │ - encrypt() / decrypt() for AGS1 stream files │ │ - encrypt_native() for Parquet Modular Encryption │ -│ - wrap/unwrap_key_metadata() for manifest list keys │ -│ - generate_dek() with KEK management │ +│ - wrap/unwrap_key_metadata() for manifest list keys (KEK + KMS) │ +│ - generate_dek() for per-file plaintext DEK generation │ └─────────────────────────────────────────────────────────────────────────────┘ │ │ ▼ ▼ @@ -127,26 +144,35 @@ Snapshot │ ▼ load_manifest_list(file_io, table_metadata) 1. Look up encryption_key_id in table_metadata.encryption_keys - 2. em.unwrap_key_metadata() → plaintext key metadata - 3. file_io.new_encrypted_input(path, key_metadata) → AGS1-decrypting InputFile + → get manifest list EncryptedKey + 2. Find the KEK via EncryptedKey.encrypted_by_id + → unwrap KEK via KMS: kms_client.unwrap_key(kek.encrypted_key_metadata, table_key_id) + (KEK is cached to avoid redundant KMS calls) + 3. AES-GCM decrypt the manifest list's StandardKeyMetadata using the + unwrapped KEK, with KEY_TIMESTAMP as AAD + 4. Extract plaintext manifest list DEK from decrypted StandardKeyMetadata + 5. file_io.new_encrypted_input(path, key_metadata) → AGS1-decrypting InputFile │ ▼ ManifestFile - └── key_metadata: Option> + └── key_metadata: Option> (plaintext StandardKeyMetadata, read from encrypted manifest list) │ load_manifest(file_io) - 1. If key_metadata present: file_io.new_encrypted_input() → AGS1-decrypting InputFile + 1. If key_metadata present: + a. Parse StandardKeyMetadata → extract plaintext DEK + AAD prefix + b. file_io.new_encrypted_input() → AGS1-decrypting InputFile 2. If not: file_io.new_input() │ ▼ FileScanTask - └── key_metadata: Option> + └── key_metadata: Option> (plaintext StandardKeyMetadata, read from encrypted manifest) │ ArrowReader::create_parquet_record_batch_stream_builder_with_key_metadata() 1. If key_metadata present: a. file_io.new_native_encrypted_input(path, key_metadata) → NativeEncrypted InputFile - b. Build FileDecryptionProperties from NativeKeyMaterial (DEK + AAD prefix) - c. Pass to ParquetRecordBatchStreamBuilder + b. Parse StandardKeyMetadata → extract plaintext DEK + AAD prefix + c. Build FileDecryptionProperties from NativeKeyMaterial (DEK + AAD prefix) + d. Pass to ParquetRecordBatchStreamBuilder 2. If not: standard Parquet read ``` @@ -156,20 +182,31 @@ FileScanTask RollingFileWriter::new_output_file() 1. If file_io.encryption_manager() is Some: a. file_io.new_native_encrypted_output(path) → EncryptedOutputFile - b. EncryptionManager generates DEK, wraps with KEK + b. EncryptionManager generates random plaintext DEK + AAD prefix c. OutputFile::NativeEncrypted carries NativeKeyMaterial for Parquet writer - d. Store key_metadata bytes on DataFile + d. Store plaintext StandardKeyMetadata as key_metadata bytes on DataFile + (protected by being stored inside the encrypted parent manifest) 2. ParquetWriter detects NativeEncrypted, configures FileEncryptionProperties SnapshotProducer::commit() 1. Manifest writing: - a. file_io.new_encrypted_output(path) → AGS1-encrypting OutputFile - b. Store key_metadata on ManifestFile entry + a. em.encrypt(output_file) → generates random plaintext DEK + AAD prefix + b. Write manifest to AGS1-encrypting OutputFile + c. Store plaintext StandardKeyMetadata as key_metadata on ManifestFile entry + (protected by being stored inside the encrypted parent manifest list) 2. Manifest list writing: - a. file_io.new_encrypted_output(path) → AGS1-encrypting OutputFile - b. em.wrap_key_metadata() → EncryptedKey for table metadata - c. Store key_id on Snapshot.encryption_key_id - 3. Table updates include AddEncryptionKey for new KEKs + a. em.encrypt(output_file) → generates random plaintext DEK + AAD prefix + b. Write manifest list to AGS1-encrypting OutputFile + c. Get or create KEK: + - Find unexpired KEK (check KEY_TIMESTAMP, 730-day lifespan) + - If none: generate new KEK, wrap via KMS: kms_client.wrap_key(kek, table_key_id) + d. AES-GCM encrypt the manifest list's StandardKeyMetadata using the KEK, + with KEY_TIMESTAMP as AAD + e. Store as EncryptedKey (encrypted_by_id = kek_id) in encryption manager + f. Store manifest list key_id on Snapshot.encryption_key_id + 3. Table commit includes AddEncryptionKey for all new entries: + - New KEKs (encrypted_by_id = table_key_id, properties include KEY_TIMESTAMP) + - New manifest list key metadata (encrypted_by_id = kek_id) ``` ### Module Structure @@ -279,10 +316,14 @@ Avro-serialized metadata stored alongside encrypted files. Compatible with the J ```rust pub struct StandardKeyMetadata { - encryption_key: Vec, // Plaintext DEK (for PME) or wrapped DEK (for AGS1) + encryption_key: Vec, // Plaintext DEK (always plaintext — never individually wrapped) aad_prefix: Vec, // Additional authenticated data prefix file_length: Option, // Optional encrypted file length } +// Note: For manifest lists, the entire serialized StandardKeyMetadata is AES-GCM +// encrypted by a KEK before storage. For manifests and data files, the +// StandardKeyMetadata is stored as plaintext key_metadata in the parent +// encrypted file. ``` Wire format: `[version byte 0x01][Avro binary datum]` — byte-compatible with Java. @@ -290,7 +331,7 @@ Wire format: `[version byte 0x01][Avro binary datum]` — byte-compatible with J ### EncryptionManager The `EncryptionManager` trait abstracts encryption orchestration. `StandardEncryptionManager` -implements two-layer envelope encryption with KEK caching and rotation: +implements envelope encryption with KMS-backed KEK management, KEK caching, and rotation: ```rust #[async_trait] @@ -304,13 +345,23 @@ pub trait EncryptionManager: Debug + Send + Sync { /// Encrypt for Parquet Modular Encryption (generates NativeKeyMaterial). async fn encrypt_native(&self, raw_output: OutputFile) -> Result; - /// Unwrap key metadata using the table's KEK hierarchy. + /// Unwrap key metadata for a manifest list. + /// 1. Look up the manifest list's EncryptedKey by key ID + /// 2. Find the KEK via encrypted_by_id + /// 3. Unwrap the KEK via KMS: kms_client.unwrap_key(kek.encrypted_key_metadata, table_key_id) + /// 4. AES-GCM decrypt the manifest list's StandardKeyMetadata using the KEK, + /// with KEY_TIMESTAMP as AAD + /// 5. Return the decrypted StandardKeyMetadata bytes (containing plaintext DEK) async fn unwrap_key_metadata( &self, encrypted_key: &EncryptedKey, encryption_keys: &HashMap, ) -> Result>; - /// Wrap key metadata with a KEK for storage in table metadata. + /// Wrap key metadata for a manifest list with a KEK for storage in table metadata. + /// 1. Get or create a KEK (wrapping new KEK via KMS if needed) + /// 2. AES-GCM encrypt the StandardKeyMetadata using the KEK, with KEY_TIMESTAMP as AAD + /// 3. Return the manifest list EncryptedKey (encrypted_by_id = kek_id) + /// and optionally a new KEK EncryptedKey if one was created async fn wrap_key_metadata( &self, key_metadata: &[u8], ) -> Result<(EncryptedKey, Option)>;