diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 0984d8fc64..0cb86a0a7a 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -27,7 +27,7 @@ use itertools::Itertools; use parquet::arrow::AsyncArrowWriter; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter; -use parquet::file::metadata::ParquetMetaData; +use parquet::file::metadata::{KeyValue, ParquetMetaData}; use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics; @@ -46,6 +46,9 @@ use crate::transform::create_transform_function; use crate::writer::{CurrentFileStatus, DataFile}; use crate::{Error, ErrorKind, Result}; +/// The key used to store the Iceberg schema JSON in the Parquet file footer metadata. +const ICEBERG_SCHEMA_KEY: &str = "iceberg.schema"; + /// ParquetWriterBuilder is used to builder a [`ParquetWriter`] #[derive(Clone, Debug)] pub struct ParquetWriterBuilder { @@ -67,12 +70,33 @@ impl ParquetWriterBuilder { schema: SchemaRef, match_mode: FieldMatchMode, ) -> Self { + let props = Self::add_iceberg_schema_metadata(props, &schema); Self { props, schema, match_mode, } } + + /// Adds the `iceberg.schema` key-value metadata to the Parquet writer properties. + /// + /// This embeds the full Iceberg schema JSON (including field IDs, types, and + /// schema ID) into the Parquet file footer. + fn add_iceberg_schema_metadata(props: WriterProperties, schema: &Schema) -> WriterProperties { + let schema_json = serde_json::to_string(schema) + .expect("Iceberg schema serialization to JSON should not fail"); + + let iceberg_kv = KeyValue::new(ICEBERG_SCHEMA_KEY.to_string(), schema_json); + + // Preserve any existing key-value metadata from the caller + let mut kv_metadata = props.key_value_metadata().cloned().unwrap_or_default(); + kv_metadata.push(iceberg_kv); + + props + .into_builder() + .set_key_value_metadata(Some(kv_metadata)) + .build() + } } impl FileWriterBuilder for ParquetWriterBuilder { @@ -489,7 +513,19 @@ impl FileWriter for ParquetWriter { let writer = if let Some(writer) = &mut self.inner_writer { writer } else { - let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?); + let arrow_schema: ArrowSchemaRef = { + let mut schema: arrow_schema::Schema = self.schema.as_ref().try_into()?; + // Embed the Iceberg schema JSON in the Arrow schema metadata so it + // is written into the Parquet `ARROW:schema` IPC section. Some + // downstream readers (e.g. Snowflake) expect the `iceberg.schema` + // key here in addition to the Parquet footer key-value metadata. + let iceberg_schema_json = serde_json::to_string(self.schema.as_ref()) + .expect("Iceberg schema serialization should not fail"); + let mut metadata = schema.metadata.clone(); + metadata.insert(ICEBERG_SCHEMA_KEY.to_string(), iceberg_schema_json); + schema.metadata = metadata; + Arc::new(schema) + }; let inner_writer = self.output_file.writer().await?; let async_writer = AsyncFileWriter::new(inner_writer); let writer = AsyncArrowWriter::try_new( @@ -2279,4 +2315,338 @@ mod tests { assert_eq!(lower_bounds, HashMap::from([(0, Datum::int(i32::MIN))])); assert_eq!(upper_bounds, HashMap::from([(0, Datum::int(i32::MAX))])); } + + /// Helper to read the Parquet file footer key-value metadata from a written data file. + async fn read_parquet_kv_metadata( + file_io: &FileIO, + data_file: &DataFile, + ) -> Option> { + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + + let input_file = file_io.new_input(data_file.file_path.clone()).unwrap(); + let input_content = input_file.read().await.unwrap(); + let reader_builder = ParquetRecordBatchReaderBuilder::try_new(input_content).unwrap(); + reader_builder + .metadata() + .file_metadata() + .key_value_metadata() + .cloned() + } + + /// Helper to write a simple parquet file and return the data file, file_io, + /// and TempDir (must be kept alive so the file remains on disk). + async fn write_simple_parquet_file( + props: WriterProperties, + ) -> Result<(DataFile, FileIO, TempDir)> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIO::new_with_fs(); + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + let iceberg_schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(0, "id", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::optional(1, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema: ArrowSchemaRef = Arc::new(iceberg_schema.as_ref().try_into().unwrap()); + let col0 = Arc::new(Int64Array::from_iter_values(0..10)) as ArrayRef; + let col1 = Arc::new(arrow_array::StringArray::from(vec![ + Some("a"), + Some("b"), + Some("c"), + None, + Some("e"), + Some("f"), + Some("g"), + Some("h"), + Some("i"), + Some("j"), + ])) as ArrayRef; + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![col0, col1]).unwrap(); + + let output_file = file_io.new_output( + location_gen.generate_location(None, &file_name_gen.generate_file_name()), + )?; + + let mut pw = ParquetWriterBuilder::new(props, iceberg_schema) + .build(output_file) + .await?; + pw.write(&batch).await?; + let res = pw.close().await?; + assert_eq!(res.len(), 1); + let data_file = res + .into_iter() + .next() + .unwrap() + .content(DataContentType::Data) + .partition(Struct::empty()) + .partition_spec_id(0) + .build() + .unwrap(); + + Ok((data_file, file_io, temp_dir)) + } + + #[tokio::test] + async fn test_iceberg_schema_metadata_present_in_parquet_footer() -> Result<()> { + let (data_file, file_io, _temp_dir) = + write_simple_parquet_file(WriterProperties::builder().build()).await?; + + let kv_metadata = read_parquet_kv_metadata(&file_io, &data_file).await; + let kv_metadata = kv_metadata.expect("key_value_metadata should be present"); + + let iceberg_schema_entry = kv_metadata.iter().find(|kv| kv.key == ICEBERG_SCHEMA_KEY); + assert!( + iceberg_schema_entry.is_some(), + "Parquet footer must contain '{ICEBERG_SCHEMA_KEY}' key-value metadata" + ); + + let schema_json = iceberg_schema_entry + .unwrap() + .value + .as_ref() + .expect("iceberg.schema value should not be None"); + assert!( + !schema_json.is_empty(), + "iceberg.schema JSON should not be empty" + ); + + Ok(()) + } + + /// Verify that the `iceberg.schema` JSON is embedded in the Arrow schema + /// metadata (the `ARROW:schema` IPC section), not just the Parquet footer + /// key-value metadata. Downstream readers such as Snowflake resolve the + /// Iceberg schema from this location. + #[tokio::test] + async fn test_iceberg_schema_in_arrow_schema_metadata() -> Result<()> { + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + + let (data_file, file_io, _temp_dir) = + write_simple_parquet_file(WriterProperties::builder().build()).await?; + + let input_file = file_io.new_input(data_file.file_path.clone()).unwrap(); + let input_content = input_file.read().await.unwrap(); + let reader_builder = ParquetRecordBatchReaderBuilder::try_new(input_content).unwrap(); + + // `.schema()` returns the Arrow schema decoded from the ARROW:schema + // IPC metadata in the Parquet file. + let arrow_schema = reader_builder.schema(); + let schema_json = arrow_schema + .metadata() + .get(ICEBERG_SCHEMA_KEY) + .expect("Arrow schema metadata must contain 'iceberg.schema' key"); + + // Deserialize and verify basic roundtrip + let deserialized: Schema = serde_json::from_str(schema_json) + .expect("iceberg.schema JSON in Arrow metadata should be valid"); + assert_eq!(deserialized.schema_id(), 1); + assert_eq!(deserialized.as_struct().fields().len(), 2); + assert_eq!(deserialized.field_by_id(0).expect("field 0").name, "id"); + assert_eq!(deserialized.field_by_id(1).expect("field 1").name, "name"); + + Ok(()) + } + + #[tokio::test] + async fn test_iceberg_schema_metadata_roundtrip() -> Result<()> { + let (data_file, file_io, _temp_dir) = + write_simple_parquet_file(WriterProperties::builder().build()).await?; + + let kv_metadata = read_parquet_kv_metadata(&file_io, &data_file) + .await + .expect("key_value_metadata should be present"); + + let schema_json = kv_metadata + .iter() + .find(|kv| kv.key == ICEBERG_SCHEMA_KEY) + .expect("iceberg.schema key must be present") + .value + .as_ref() + .expect("iceberg.schema value must not be None"); + + // Deserialize back to an Iceberg Schema + let deserialized: Schema = serde_json::from_str(schema_json) + .expect("iceberg.schema JSON should deserialize to a valid Schema"); + + // Verify schema ID + assert_eq!(deserialized.schema_id(), 1); + + // Verify field count + assert_eq!(deserialized.as_struct().fields().len(), 2); + + // Verify field IDs and names + let id_field = deserialized.field_by_id(0).expect("field 0 should exist"); + assert_eq!(id_field.name, "id"); + + let name_field = deserialized.field_by_id(1).expect("field 1 should exist"); + assert_eq!(name_field.name, "name"); + + Ok(()) + } + + #[tokio::test] + async fn test_iceberg_schema_metadata_with_nested_schema() -> Result<()> { + // Use a nested schema with Struct and List types to verify the iceberg.schema + // JSON correctly roundtrips complex types including field IDs. + let schema = Schema::builder() + .with_schema_id(2) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::optional( + 2, + "location", + Type::Struct(StructType::new(vec![ + NestedField::required(3, "lat", Type::Primitive(PrimitiveType::Double)) + .into(), + NestedField::required(4, "lon", Type::Primitive(PrimitiveType::Double)) + .into(), + ])), + ) + .into(), + NestedField::optional( + 5, + "tags", + Type::List(ListType::new( + NestedField::required(6, "element", Type::Primitive(PrimitiveType::String)) + .into(), + )), + ) + .into(), + ]) + .build() + .unwrap(); + + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIO::new_with_fs(); + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap()); + + // Build columns + let col_id = Arc::new(Int64Array::from(vec![1i64, 2, 3])) as ArrayRef; + let col_location = Arc::new(StructArray::new( + { + if let DataType::Struct(fields) = arrow_schema.field(1).data_type() { + fields.clone() + } else { + unreachable!() + } + }, + vec![ + Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef, + Arc::new(Float64Array::from(vec![4.0, 5.0, 6.0])) as ArrayRef, + ], + None, + )) as ArrayRef; + let col_tags = Arc::new({ + let values = arrow_array::StringArray::from(vec!["a", "b", "c", "d", "e", "f"]); + let offsets = arrow_buffer::OffsetBuffer::new(arrow_buffer::ScalarBuffer::from(vec![ + 0i32, 2, 4, 6, + ])); + let field = arrow_schema.field(2).clone(); + if let DataType::List(inner) = field.data_type() { + ListArray::new(inner.clone(), offsets, Arc::new(values), None) + } else { + unreachable!() + } + }) as ArrayRef; + + let batch = + RecordBatch::try_new(arrow_schema.clone(), vec![col_id, col_location, col_tags]) + .unwrap(); + + let output_file = file_io.new_output( + location_gen.generate_location(None, &file_name_gen.generate_file_name()), + )?; + + let mut pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + Arc::new(schema.clone()), + ) + .build(output_file) + .await?; + pw.write(&batch).await?; + let res = pw.close().await?; + assert_eq!(res.len(), 1); + let data_file = res + .into_iter() + .next() + .unwrap() + .content(DataContentType::Data) + .partition(Struct::empty()) + .partition_spec_id(0) + .build() + .unwrap(); + + let kv_metadata = read_parquet_kv_metadata(&file_io, &data_file) + .await + .expect("key_value_metadata should be present"); + + let schema_json = kv_metadata + .iter() + .find(|kv| kv.key == ICEBERG_SCHEMA_KEY) + .expect("iceberg.schema key must be present for nested schema") + .value + .as_ref() + .expect("iceberg.schema value must not be None"); + + let deserialized: Schema = serde_json::from_str(schema_json) + .expect("nested iceberg.schema JSON should deserialize"); + + // Verify the nested schema round-trips correctly + assert_eq!(deserialized, schema); + + // Spot-check nested field IDs survived the roundtrip + assert_eq!(deserialized.field_by_id(3).unwrap().name, "lat"); + assert_eq!(deserialized.field_by_id(6).unwrap().name, "element"); + + Ok(()) + } + + #[tokio::test] + async fn test_iceberg_schema_preserves_existing_kv_metadata() -> Result<()> { + let props = WriterProperties::builder() + .set_key_value_metadata(Some(vec![parquet::file::metadata::KeyValue::new( + "custom.key".to_string(), + "custom_value".to_string(), + )])) + .build(); + + let (data_file, file_io, _temp_dir) = write_simple_parquet_file(props).await?; + + let kv_metadata = read_parquet_kv_metadata(&file_io, &data_file) + .await + .expect("key_value_metadata should be present"); + + // The caller's custom metadata should still be present + let custom_entry = kv_metadata.iter().find(|kv| kv.key == "custom.key"); + assert!( + custom_entry.is_some(), + "Caller's custom key-value metadata must be preserved" + ); + assert_eq!(custom_entry.unwrap().value.as_deref(), Some("custom_value")); + + // iceberg.schema should also be present + let iceberg_entry = kv_metadata.iter().find(|kv| kv.key == ICEBERG_SCHEMA_KEY); + assert!( + iceberg_entry.is_some(), + "iceberg.schema must be present alongside custom metadata" + ); + + Ok(()) + } }