From d0691947a2b558e304168a8a08899dde225ae835 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Sat, 28 Feb 2026 15:08:02 -0600 Subject: [PATCH 1/4] feat: Add `iceberg.schema` to footer for compatibility (#126) * mock change * remove fmt * add unit tests * fix tests * format * commit --- .../src/writer/file_writer/parquet_writer.rs | 540 ++++++++++++++---- 1 file changed, 428 insertions(+), 112 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index da04d5435b..97b0f06142 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -24,28 +24,31 @@ use arrow_schema::SchemaRef as ArrowSchemaRef; use bytes::Bytes; use futures::future::BoxFuture; use itertools::Itertools; -use parquet::arrow::AsyncArrowWriter; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter; -use parquet::file::metadata::ParquetMetaData; +use parquet::arrow::AsyncArrowWriter; +use parquet::file::metadata::{KeyValue, ParquetMetaData}; use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics; use super::{FileWriter, FileWriterBuilder}; use crate::arrow::{ - ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor, - get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum, + get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum, ArrowFileReader, FieldMatchMode, + NanValueCountVisitor, DEFAULT_MAP_FIELD_NAME, }; use crate::io::{FileIO, FileWrite, OutputFile}; use crate::spec::{ - DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType, - NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct, - StructType, TableMetadata, Type, visit_schema, + visit_schema, DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, + MapType, NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, + Struct, StructType, TableMetadata, Type, }; use crate::transform::create_transform_function; use crate::writer::{CurrentFileStatus, DataFile}; use crate::{Error, ErrorKind, Result}; +/// The key used to store the Iceberg schema JSON in the Parquet file footer metadata. +const ICEBERG_SCHEMA_KEY: &str = "iceberg.schema"; + /// ParquetWriterBuilder is used to builder a [`ParquetWriter`] #[derive(Clone, Debug)] pub struct ParquetWriterBuilder { @@ -67,12 +70,33 @@ impl ParquetWriterBuilder { schema: SchemaRef, match_mode: FieldMatchMode, ) -> Self { + let props = Self::add_iceberg_schema_metadata(props, &schema); Self { props, schema, match_mode, } } + + /// Adds the `iceberg.schema` key-value metadata to the Parquet writer properties. + /// + /// This embeds the full Iceberg schema JSON (including field IDs, types, and + /// schema ID) into the Parquet file footer. + fn add_iceberg_schema_metadata(props: WriterProperties, schema: &Schema) -> WriterProperties { + let schema_json = serde_json::to_string(schema) + .expect("Iceberg schema serialization to JSON should not fail"); + + let iceberg_kv = KeyValue::new(ICEBERG_SCHEMA_KEY.to_string(), schema_json); + + // Preserve any existing key-value metadata from the caller + let mut kv_metadata = props.key_value_metadata().cloned().unwrap_or_default(); + kv_metadata.push(iceberg_kv); + + props + .into_builder() + .set_key_value_metadata(Some(kv_metadata)) + .build() + } } impl FileWriterBuilder for ParquetWriterBuilder { @@ -726,21 +750,17 @@ mod tests { NestedField::required( 4, "col4", - Type::Struct(StructType::new(vec![ - NestedField::required( - 8, - "col_4_8", - Type::Struct(StructType::new(vec![ - NestedField::required( - 9, - "col_4_8_9", - Type::Primitive(PrimitiveType::Long), - ) - .into(), - ])), + Type::Struct(StructType::new(vec![NestedField::required( + 8, + "col_4_8", + Type::Struct(StructType::new(vec![NestedField::required( + 9, + "col_4_8_9", + Type::Primitive(PrimitiveType::Long), ) - .into(), - ])), + .into()])), + ) + .into()])), ) .into(), NestedField::required( @@ -1229,12 +1249,10 @@ mod tests { // check data file assert_eq!(data_file.record_count(), 4); assert!(data_file.value_counts().iter().all(|(_, &v)| { v == 4 })); - assert!( - data_file - .null_value_counts() - .iter() - .all(|(_, &v)| { v == 1 }) - ); + assert!(data_file + .null_value_counts() + .iter() + .all(|(_, &v)| { v == 1 })); assert_eq!( *data_file.lower_bounds(), HashMap::from([ @@ -1338,17 +1356,15 @@ mod tests { // test 1.1 and 2.2 let schema = Arc::new( Schema::builder() - .with_fields(vec![ - NestedField::optional( - 0, - "decimal", - Type::Primitive(PrimitiveType::Decimal { - precision: 28, - scale: 10, - }), - ) - .into(), - ]) + .with_fields(vec![NestedField::optional( + 0, + "decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 28, + scale: 10, + }), + ) + .into()]) .build() .unwrap(), ); @@ -1390,17 +1406,15 @@ mod tests { // test -1.1 and -2.2 let schema = Arc::new( Schema::builder() - .with_fields(vec![ - NestedField::optional( - 0, - "decimal", - Type::Primitive(PrimitiveType::Decimal { - precision: 28, - scale: 10, - }), - ) - .into(), - ]) + .with_fields(vec![NestedField::optional( + 0, + "decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 28, + scale: 10, + }), + ) + .into()]) .build() .unwrap(), ); @@ -1448,17 +1462,15 @@ mod tests { assert_eq!(decimal_scale(&decimal_max), decimal_scale(&decimal_min)); let schema = Arc::new( Schema::builder() - .with_fields(vec![ - NestedField::optional( - 0, - "decimal", - Type::Primitive(PrimitiveType::Decimal { - precision: 38, - scale: decimal_scale(&decimal_max), - }), - ) - .into(), - ]) + .with_fields(vec![NestedField::optional( + 0, + "decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 38, + scale: decimal_scale(&decimal_max), + }), + ) + .into()]) .build() .unwrap(), ); @@ -1718,31 +1730,29 @@ mod tests { let file_name_gen = DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); - let schema_struct_float_fields = Fields::from(vec![ - Field::new("col4", DataType::Float32, false).with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "4".to_string(), - )])), - ]); + let schema_struct_float_fields = + Fields::from(vec![Field::new("col4", DataType::Float32, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "4".to_string(), + )]))]); - let schema_struct_nested_float_fields = Fields::from(vec![ - Field::new("col7", DataType::Float32, false).with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "7".to_string(), - )])), - ]); + let schema_struct_nested_float_fields = + Fields::from(vec![Field::new("col7", DataType::Float32, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "7".to_string(), + )]))]); - let schema_struct_nested_fields = Fields::from(vec![ - Field::new( - "col6", - arrow_schema::DataType::Struct(schema_struct_nested_float_fields.clone()), - false, - ) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "6".to_string(), - )])), - ]); + let schema_struct_nested_fields = Fields::from(vec![Field::new( + "col6", + arrow_schema::DataType::Struct(schema_struct_nested_float_fields.clone()), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "6".to_string(), + )]))]); // prepare data let arrow_schema = { @@ -1868,11 +1878,15 @@ mod tests { "4".to_string(), )])); - let schema_struct_list_field = Fields::from(vec![ - Field::new_list("col2", schema_struct_list_float_field.clone(), true).with_metadata( - HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]), - ), - ]); + let schema_struct_list_field = Fields::from(vec![Field::new_list( + "col2", + schema_struct_list_float_field.clone(), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )]))]); let arrow_schema = { let fields = vec![ @@ -2080,20 +2094,18 @@ mod tests { [(PARQUET_FIELD_ID_META_KEY.to_string(), "7".to_string())], )); - let schema_struct_map_field = Fields::from(vec![ - Field::new_map( - "col3", - DEFAULT_MAP_FIELD_NAME, - struct_map_key_field_schema.clone(), - struct_map_value_field_schema.clone(), - false, - false, - ) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "5".to_string(), - )])), - ]); + let schema_struct_map_field = Fields::from(vec![Field::new_map( + "col3", + DEFAULT_MAP_FIELD_NAME, + struct_map_key_field_schema.clone(), + struct_map_value_field_schema.clone(), + false, + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "5".to_string(), + )]))]); let arrow_schema = { let fields = vec![ @@ -2225,11 +2237,13 @@ mod tests { Arc::new( Schema::builder() .with_schema_id(1) - .with_fields(vec![ - NestedField::required(0, "col", Type::Primitive(PrimitiveType::Long)) - .with_id(0) - .into(), - ]) + .with_fields(vec![NestedField::required( + 0, + "col", + Type::Primitive(PrimitiveType::Long), + ) + .with_id(0) + .into()]) .build() .expect("Failed to create schema"), ), @@ -2250,11 +2264,13 @@ mod tests { let schema = Arc::new( Schema::builder() .with_schema_id(1) - .with_fields(vec![ - NestedField::required(0, "col", Type::Primitive(PrimitiveType::Int)) - .with_id(0) - .into(), - ]) + .with_fields(vec![NestedField::required( + 0, + "col", + Type::Primitive(PrimitiveType::Int), + ) + .with_id(0) + .into()]) .build() .expect("Failed to create schema"), ); @@ -2279,4 +2295,304 @@ mod tests { assert_eq!(lower_bounds, HashMap::from([(0, Datum::int(i32::MIN))])); assert_eq!(upper_bounds, HashMap::from([(0, Datum::int(i32::MAX))])); } + + /// Helper to read the Parquet file footer key-value metadata from a written data file. + async fn read_parquet_kv_metadata( + file_io: &FileIO, + data_file: &DataFile, + ) -> Option> { + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + + let input_file = file_io.new_input(data_file.file_path.clone()).unwrap(); + let input_content = input_file.read().await.unwrap(); + let reader_builder = ParquetRecordBatchReaderBuilder::try_new(input_content).unwrap(); + reader_builder + .metadata() + .file_metadata() + .key_value_metadata() + .cloned() + } + + /// Helper to write a simple parquet file and return the data file, file_io, + /// and TempDir (must be kept alive so the file remains on disk). + async fn write_simple_parquet_file( + props: WriterProperties, + ) -> Result<(DataFile, FileIO, TempDir)> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + let iceberg_schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(0, "id", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::optional(1, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema: ArrowSchemaRef = Arc::new(iceberg_schema.as_ref().try_into().unwrap()); + let col0 = Arc::new(Int64Array::from_iter_values(0..10)) as ArrayRef; + let col1 = Arc::new(arrow_array::StringArray::from(vec![ + Some("a"), + Some("b"), + Some("c"), + None, + Some("e"), + Some("f"), + Some("g"), + Some("h"), + Some("i"), + Some("j"), + ])) as ArrayRef; + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![col0, col1]).unwrap(); + + let output_file = file_io.new_output( + location_gen.generate_location(None, &file_name_gen.generate_file_name()), + )?; + + let mut pw = ParquetWriterBuilder::new(props, iceberg_schema) + .build(output_file) + .await?; + pw.write(&batch).await?; + let res = pw.close().await?; + assert_eq!(res.len(), 1); + let data_file = res + .into_iter() + .next() + .unwrap() + .content(DataContentType::Data) + .partition(Struct::empty()) + .partition_spec_id(0) + .build() + .unwrap(); + + Ok((data_file, file_io, temp_dir)) + } + + #[tokio::test] + async fn test_iceberg_schema_metadata_present_in_parquet_footer() -> Result<()> { + let (data_file, file_io, _temp_dir) = + write_simple_parquet_file(WriterProperties::builder().build()).await?; + + let kv_metadata = read_parquet_kv_metadata(&file_io, &data_file).await; + let kv_metadata = kv_metadata.expect("key_value_metadata should be present"); + + let iceberg_schema_entry = kv_metadata.iter().find(|kv| kv.key == ICEBERG_SCHEMA_KEY); + assert!( + iceberg_schema_entry.is_some(), + "Parquet footer must contain '{ICEBERG_SCHEMA_KEY}' key-value metadata" + ); + + let schema_json = iceberg_schema_entry + .unwrap() + .value + .as_ref() + .expect("iceberg.schema value should not be None"); + assert!( + !schema_json.is_empty(), + "iceberg.schema JSON should not be empty" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_iceberg_schema_metadata_roundtrip() -> Result<()> { + let (data_file, file_io, _temp_dir) = + write_simple_parquet_file(WriterProperties::builder().build()).await?; + + let kv_metadata = read_parquet_kv_metadata(&file_io, &data_file) + .await + .expect("key_value_metadata should be present"); + + let schema_json = kv_metadata + .iter() + .find(|kv| kv.key == ICEBERG_SCHEMA_KEY) + .expect("iceberg.schema key must be present") + .value + .as_ref() + .expect("iceberg.schema value must not be None"); + + // Deserialize back to an Iceberg Schema + let deserialized: Schema = serde_json::from_str(schema_json) + .expect("iceberg.schema JSON should deserialize to a valid Schema"); + + // Verify schema ID + assert_eq!(deserialized.schema_id(), 1); + + // Verify field count + assert_eq!(deserialized.as_struct().fields().len(), 2); + + // Verify field IDs and names + let id_field = deserialized.field_by_id(0).expect("field 0 should exist"); + assert_eq!(id_field.name, "id"); + + let name_field = deserialized.field_by_id(1).expect("field 1 should exist"); + assert_eq!(name_field.name, "name"); + + Ok(()) + } + + #[tokio::test] + async fn test_iceberg_schema_metadata_with_nested_schema() -> Result<()> { + // Use a nested schema with Struct and List types to verify the iceberg.schema + // JSON correctly roundtrips complex types including field IDs. + let schema = Schema::builder() + .with_schema_id(2) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::optional( + 2, + "location", + Type::Struct(StructType::new(vec![ + NestedField::required(3, "lat", Type::Primitive(PrimitiveType::Double)) + .into(), + NestedField::required(4, "lon", Type::Primitive(PrimitiveType::Double)) + .into(), + ])), + ) + .into(), + NestedField::optional( + 5, + "tags", + Type::List(ListType::new( + NestedField::required(6, "element", Type::Primitive(PrimitiveType::String)) + .into(), + )), + ) + .into(), + ]) + .build() + .unwrap(); + + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap()); + + // Build columns + let col_id = Arc::new(Int64Array::from(vec![1i64, 2, 3])) as ArrayRef; + let col_location = Arc::new(StructArray::new( + { + if let DataType::Struct(fields) = arrow_schema.field(1).data_type() { + fields.clone() + } else { + unreachable!() + } + }, + vec![ + Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef, + Arc::new(Float64Array::from(vec![4.0, 5.0, 6.0])) as ArrayRef, + ], + None, + )) as ArrayRef; + let col_tags = Arc::new({ + let values = arrow_array::StringArray::from(vec!["a", "b", "c", "d", "e", "f"]); + let offsets = arrow_buffer::OffsetBuffer::new(arrow_buffer::ScalarBuffer::from(vec![ + 0i32, 2, 4, 6, + ])); + let field = arrow_schema.field(2).clone(); + if let DataType::List(inner) = field.data_type() { + ListArray::new(inner.clone(), offsets, Arc::new(values), None) + } else { + unreachable!() + } + }) as ArrayRef; + + let batch = + RecordBatch::try_new(arrow_schema.clone(), vec![col_id, col_location, col_tags]) + .unwrap(); + + let output_file = file_io.new_output( + location_gen.generate_location(None, &file_name_gen.generate_file_name()), + )?; + + let mut pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + Arc::new(schema.clone()), + ) + .build(output_file) + .await?; + pw.write(&batch).await?; + let res = pw.close().await?; + assert_eq!(res.len(), 1); + let data_file = res + .into_iter() + .next() + .unwrap() + .content(DataContentType::Data) + .partition(Struct::empty()) + .partition_spec_id(0) + .build() + .unwrap(); + + let kv_metadata = read_parquet_kv_metadata(&file_io, &data_file) + .await + .expect("key_value_metadata should be present"); + + let schema_json = kv_metadata + .iter() + .find(|kv| kv.key == ICEBERG_SCHEMA_KEY) + .expect("iceberg.schema key must be present for nested schema") + .value + .as_ref() + .expect("iceberg.schema value must not be None"); + + let deserialized: Schema = serde_json::from_str(schema_json) + .expect("nested iceberg.schema JSON should deserialize"); + + // Verify the nested schema round-trips correctly + assert_eq!(deserialized, schema); + + // Spot-check nested field IDs survived the roundtrip + assert_eq!(deserialized.field_by_id(3).unwrap().name, "lat"); + assert_eq!(deserialized.field_by_id(6).unwrap().name, "element"); + + Ok(()) + } + + #[tokio::test] + async fn test_iceberg_schema_preserves_existing_kv_metadata() -> Result<()> { + let props = WriterProperties::builder() + .set_key_value_metadata(Some(vec![parquet::file::metadata::KeyValue::new( + "custom.key".to_string(), + "custom_value".to_string(), + )])) + .build(); + + let (data_file, file_io, _temp_dir) = write_simple_parquet_file(props).await?; + + let kv_metadata = read_parquet_kv_metadata(&file_io, &data_file) + .await + .expect("key_value_metadata should be present"); + + // The caller's custom metadata should still be present + let custom_entry = kv_metadata.iter().find(|kv| kv.key == "custom.key"); + assert!( + custom_entry.is_some(), + "Caller's custom key-value metadata must be preserved" + ); + assert_eq!(custom_entry.unwrap().value.as_deref(), Some("custom_value")); + + // iceberg.schema should also be present + let iceberg_entry = kv_metadata.iter().find(|kv| kv.key == ICEBERG_SCHEMA_KEY); + assert!( + iceberg_entry.is_some(), + "iceberg.schema must be present alongside custom metadata" + ); + + Ok(()) + } } From 47388bde2f88b5c873d0365f9204e7c065c39b94 Mon Sep 17 00:00:00 2001 From: vovacf201 Date: Thu, 12 Mar 2026 16:16:45 +0000 Subject: [PATCH 2/4] =?UTF-8?q?fix:=20embed=20iceberg.schema=20in=20Arrow?= =?UTF-8?q?=20schema=20metadata=20for=20Snowflake=20comp=E2=80=A6=20(#134)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: embed iceberg.schema in Arrow schema metadata for Snowflake compatibility The ParquetWriter was only writing the iceberg.schema JSON into the Parquet footer key-value metadata (WriterProperties). Downstream readers like Snowflake also expect it in the Arrow schema metadata map, which is encoded in the ARROW:schema IPC section of the Parquet file. Inject the iceberg.schema JSON into the Arrow schema metadata during writer initialization so it is present in both locations, matching the behavior of the Java Iceberg implementation. * feat: fixed formatting --- .../src/writer/file_writer/parquet_writer.rs | 48 ++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 97b0f06142..25e98b9eb8 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -513,7 +513,19 @@ impl FileWriter for ParquetWriter { let writer = if let Some(writer) = &mut self.inner_writer { writer } else { - let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?); + let arrow_schema: ArrowSchemaRef = { + let mut schema: arrow_schema::Schema = self.schema.as_ref().try_into()?; + // Embed the Iceberg schema JSON in the Arrow schema metadata so it + // is written into the Parquet `ARROW:schema` IPC section. Some + // downstream readers (e.g. Snowflake) expect the `iceberg.schema` + // key here in addition to the Parquet footer key-value metadata. + let iceberg_schema_json = serde_json::to_string(self.schema.as_ref()) + .expect("Iceberg schema serialization should not fail"); + let mut metadata = schema.metadata.clone(); + metadata.insert(ICEBERG_SCHEMA_KEY.to_string(), iceberg_schema_json); + schema.metadata = metadata; + Arc::new(schema) + }; let inner_writer = self.output_file.writer().await?; let async_writer = AsyncFileWriter::new(inner_writer); let writer = AsyncArrowWriter::try_new( @@ -2403,6 +2415,40 @@ mod tests { Ok(()) } + /// Verify that the `iceberg.schema` JSON is embedded in the Arrow schema + /// metadata (the `ARROW:schema` IPC section), not just the Parquet footer + /// key-value metadata. Downstream readers such as Snowflake resolve the + /// Iceberg schema from this location. + #[tokio::test] + async fn test_iceberg_schema_in_arrow_schema_metadata() -> Result<()> { + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + + let (data_file, file_io, _temp_dir) = + write_simple_parquet_file(WriterProperties::builder().build()).await?; + + let input_file = file_io.new_input(data_file.file_path.clone()).unwrap(); + let input_content = input_file.read().await.unwrap(); + let reader_builder = ParquetRecordBatchReaderBuilder::try_new(input_content).unwrap(); + + // `.schema()` returns the Arrow schema decoded from the ARROW:schema + // IPC metadata in the Parquet file. + let arrow_schema = reader_builder.schema(); + let schema_json = arrow_schema + .metadata() + .get(ICEBERG_SCHEMA_KEY) + .expect("Arrow schema metadata must contain 'iceberg.schema' key"); + + // Deserialize and verify basic roundtrip + let deserialized: Schema = serde_json::from_str(schema_json) + .expect("iceberg.schema JSON in Arrow metadata should be valid"); + assert_eq!(deserialized.schema_id(), 1); + assert_eq!(deserialized.as_struct().fields().len(), 2); + assert_eq!(deserialized.field_by_id(0).expect("field 0").name, "id"); + assert_eq!(deserialized.field_by_id(1).expect("field 1").name, "name"); + + Ok(()) + } + #[tokio::test] async fn test_iceberg_schema_metadata_roundtrip() -> Result<()> { let (data_file, file_io, _temp_dir) = From fc44ac93764fe6eeead9967090c5a4129cc9ce44 Mon Sep 17 00:00:00 2001 From: volodymyr Date: Wed, 18 Mar 2026 11:34:46 +0000 Subject: [PATCH 3/4] fix: apply cargo fmt formatting fixes --- .../src/writer/file_writer/parquet_writer.rs | 214 +++++++++--------- 1 file changed, 111 insertions(+), 103 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 25e98b9eb8..ea5c51ce96 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -24,23 +24,23 @@ use arrow_schema::SchemaRef as ArrowSchemaRef; use bytes::Bytes; use futures::future::BoxFuture; use itertools::Itertools; +use parquet::arrow::AsyncArrowWriter; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter; -use parquet::arrow::AsyncArrowWriter; use parquet::file::metadata::{KeyValue, ParquetMetaData}; use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics; use super::{FileWriter, FileWriterBuilder}; use crate::arrow::{ - get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum, ArrowFileReader, FieldMatchMode, - NanValueCountVisitor, DEFAULT_MAP_FIELD_NAME, + ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor, + get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum, }; use crate::io::{FileIO, FileWrite, OutputFile}; use crate::spec::{ - visit_schema, DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, - MapType, NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, - Struct, StructType, TableMetadata, Type, + DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType, + NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct, + StructType, TableMetadata, Type, visit_schema, }; use crate::transform::create_transform_function; use crate::writer::{CurrentFileStatus, DataFile}; @@ -762,17 +762,21 @@ mod tests { NestedField::required( 4, "col4", - Type::Struct(StructType::new(vec![NestedField::required( - 8, - "col_4_8", - Type::Struct(StructType::new(vec![NestedField::required( - 9, - "col_4_8_9", - Type::Primitive(PrimitiveType::Long), + Type::Struct(StructType::new(vec![ + NestedField::required( + 8, + "col_4_8", + Type::Struct(StructType::new(vec![ + NestedField::required( + 9, + "col_4_8_9", + Type::Primitive(PrimitiveType::Long), + ) + .into(), + ])), ) - .into()])), - ) - .into()])), + .into(), + ])), ) .into(), NestedField::required( @@ -1261,10 +1265,12 @@ mod tests { // check data file assert_eq!(data_file.record_count(), 4); assert!(data_file.value_counts().iter().all(|(_, &v)| { v == 4 })); - assert!(data_file - .null_value_counts() - .iter() - .all(|(_, &v)| { v == 1 })); + assert!( + data_file + .null_value_counts() + .iter() + .all(|(_, &v)| { v == 1 }) + ); assert_eq!( *data_file.lower_bounds(), HashMap::from([ @@ -1368,15 +1374,17 @@ mod tests { // test 1.1 and 2.2 let schema = Arc::new( Schema::builder() - .with_fields(vec![NestedField::optional( - 0, - "decimal", - Type::Primitive(PrimitiveType::Decimal { - precision: 28, - scale: 10, - }), - ) - .into()]) + .with_fields(vec![ + NestedField::optional( + 0, + "decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 28, + scale: 10, + }), + ) + .into(), + ]) .build() .unwrap(), ); @@ -1418,15 +1426,17 @@ mod tests { // test -1.1 and -2.2 let schema = Arc::new( Schema::builder() - .with_fields(vec![NestedField::optional( - 0, - "decimal", - Type::Primitive(PrimitiveType::Decimal { - precision: 28, - scale: 10, - }), - ) - .into()]) + .with_fields(vec![ + NestedField::optional( + 0, + "decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 28, + scale: 10, + }), + ) + .into(), + ]) .build() .unwrap(), ); @@ -1474,15 +1484,17 @@ mod tests { assert_eq!(decimal_scale(&decimal_max), decimal_scale(&decimal_min)); let schema = Arc::new( Schema::builder() - .with_fields(vec![NestedField::optional( - 0, - "decimal", - Type::Primitive(PrimitiveType::Decimal { - precision: 38, - scale: decimal_scale(&decimal_max), - }), - ) - .into()]) + .with_fields(vec![ + NestedField::optional( + 0, + "decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 38, + scale: decimal_scale(&decimal_max), + }), + ) + .into(), + ]) .build() .unwrap(), ); @@ -1742,29 +1754,31 @@ mod tests { let file_name_gen = DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); - let schema_struct_float_fields = - Fields::from(vec![Field::new("col4", DataType::Float32, false) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "4".to_string(), - )]))]); + let schema_struct_float_fields = Fields::from(vec![ + Field::new("col4", DataType::Float32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "4".to_string(), + )])), + ]); - let schema_struct_nested_float_fields = - Fields::from(vec![Field::new("col7", DataType::Float32, false) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "7".to_string(), - )]))]); + let schema_struct_nested_float_fields = Fields::from(vec![ + Field::new("col7", DataType::Float32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "7".to_string(), + )])), + ]); - let schema_struct_nested_fields = Fields::from(vec![Field::new( - "col6", - arrow_schema::DataType::Struct(schema_struct_nested_float_fields.clone()), - false, - ) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "6".to_string(), - )]))]); + let schema_struct_nested_fields = Fields::from(vec![ + Field::new( + "col6", + arrow_schema::DataType::Struct(schema_struct_nested_float_fields.clone()), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "6".to_string(), + )])), + ]); // prepare data let arrow_schema = { @@ -1890,15 +1904,11 @@ mod tests { "4".to_string(), )])); - let schema_struct_list_field = Fields::from(vec![Field::new_list( - "col2", - schema_struct_list_float_field.clone(), - true, - ) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "3".to_string(), - )]))]); + let schema_struct_list_field = Fields::from(vec![ + Field::new_list("col2", schema_struct_list_float_field.clone(), true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]), + ), + ]); let arrow_schema = { let fields = vec![ @@ -2106,18 +2116,20 @@ mod tests { [(PARQUET_FIELD_ID_META_KEY.to_string(), "7".to_string())], )); - let schema_struct_map_field = Fields::from(vec![Field::new_map( - "col3", - DEFAULT_MAP_FIELD_NAME, - struct_map_key_field_schema.clone(), - struct_map_value_field_schema.clone(), - false, - false, - ) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "5".to_string(), - )]))]); + let schema_struct_map_field = Fields::from(vec![ + Field::new_map( + "col3", + DEFAULT_MAP_FIELD_NAME, + struct_map_key_field_schema.clone(), + struct_map_value_field_schema.clone(), + false, + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "5".to_string(), + )])), + ]); let arrow_schema = { let fields = vec![ @@ -2249,13 +2261,11 @@ mod tests { Arc::new( Schema::builder() .with_schema_id(1) - .with_fields(vec![NestedField::required( - 0, - "col", - Type::Primitive(PrimitiveType::Long), - ) - .with_id(0) - .into()]) + .with_fields(vec![ + NestedField::required(0, "col", Type::Primitive(PrimitiveType::Long)) + .with_id(0) + .into(), + ]) .build() .expect("Failed to create schema"), ), @@ -2276,13 +2286,11 @@ mod tests { let schema = Arc::new( Schema::builder() .with_schema_id(1) - .with_fields(vec![NestedField::required( - 0, - "col", - Type::Primitive(PrimitiveType::Int), - ) - .with_id(0) - .into()]) + .with_fields(vec![ + NestedField::required(0, "col", Type::Primitive(PrimitiveType::Int)) + .with_id(0) + .into(), + ]) .build() .expect("Failed to create schema"), ); From 3f9cf18f9530f8b41ebf62ee3d2470784c2b8ca2 Mon Sep 17 00:00:00 2001 From: volodymyr Date: Wed, 18 Mar 2026 11:49:52 +0000 Subject: [PATCH 4/4] fix: update FileIOBuilder to FileIO::new_with_fs() and re-apply cargo fmt --- crates/iceberg/src/writer/file_writer/parquet_writer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 3ed171214f..0cb86a0a7a 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -2339,7 +2339,7 @@ mod tests { props: WriterProperties, ) -> Result<(DataFile, FileIO, TempDir)> { let temp_dir = TempDir::new().unwrap(); - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let file_io = FileIO::new_with_fs(); let location_gen = DefaultLocationGenerator::with_data_location( temp_dir.path().to_str().unwrap().to_string(), ); @@ -2527,7 +2527,7 @@ mod tests { .unwrap(); let temp_dir = TempDir::new().unwrap(); - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let file_io = FileIO::new_with_fs(); let location_gen = DefaultLocationGenerator::with_data_location( temp_dir.path().to_str().unwrap().to_string(), );