From d0691947a2b558e304168a8a08899dde225ae835 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Sat, 28 Feb 2026 15:08:02 -0600 Subject: [PATCH 1/3] feat: Add `iceberg.schema` to footer for compatibility (#126) * mock change * remove fmt * add unit tests * fix tests * format * commit --- .../src/writer/file_writer/parquet_writer.rs | 540 ++++++++++++++---- 1 file changed, 428 insertions(+), 112 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index da04d5435b..97b0f06142 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -24,28 +24,31 @@ use arrow_schema::SchemaRef as ArrowSchemaRef; use bytes::Bytes; use futures::future::BoxFuture; use itertools::Itertools; -use parquet::arrow::AsyncArrowWriter; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter; -use parquet::file::metadata::ParquetMetaData; +use parquet::arrow::AsyncArrowWriter; +use parquet::file::metadata::{KeyValue, ParquetMetaData}; use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics; use super::{FileWriter, FileWriterBuilder}; use crate::arrow::{ - ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor, - get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum, + get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum, ArrowFileReader, FieldMatchMode, + NanValueCountVisitor, DEFAULT_MAP_FIELD_NAME, }; use crate::io::{FileIO, FileWrite, OutputFile}; use crate::spec::{ - DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType, - NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct, - StructType, TableMetadata, Type, visit_schema, + visit_schema, DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, + MapType, NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, + Struct, StructType, TableMetadata, Type, }; use crate::transform::create_transform_function; use crate::writer::{CurrentFileStatus, DataFile}; use crate::{Error, ErrorKind, Result}; +/// The key used to store the Iceberg schema JSON in the Parquet file footer metadata. +const ICEBERG_SCHEMA_KEY: &str = "iceberg.schema"; + /// ParquetWriterBuilder is used to builder a [`ParquetWriter`] #[derive(Clone, Debug)] pub struct ParquetWriterBuilder { @@ -67,12 +70,33 @@ impl ParquetWriterBuilder { schema: SchemaRef, match_mode: FieldMatchMode, ) -> Self { + let props = Self::add_iceberg_schema_metadata(props, &schema); Self { props, schema, match_mode, } } + + /// Adds the `iceberg.schema` key-value metadata to the Parquet writer properties. + /// + /// This embeds the full Iceberg schema JSON (including field IDs, types, and + /// schema ID) into the Parquet file footer. + fn add_iceberg_schema_metadata(props: WriterProperties, schema: &Schema) -> WriterProperties { + let schema_json = serde_json::to_string(schema) + .expect("Iceberg schema serialization to JSON should not fail"); + + let iceberg_kv = KeyValue::new(ICEBERG_SCHEMA_KEY.to_string(), schema_json); + + // Preserve any existing key-value metadata from the caller + let mut kv_metadata = props.key_value_metadata().cloned().unwrap_or_default(); + kv_metadata.push(iceberg_kv); + + props + .into_builder() + .set_key_value_metadata(Some(kv_metadata)) + .build() + } } impl FileWriterBuilder for ParquetWriterBuilder { @@ -726,21 +750,17 @@ mod tests { NestedField::required( 4, "col4", - Type::Struct(StructType::new(vec![ - NestedField::required( - 8, - "col_4_8", - Type::Struct(StructType::new(vec![ - NestedField::required( - 9, - "col_4_8_9", - Type::Primitive(PrimitiveType::Long), - ) - .into(), - ])), + Type::Struct(StructType::new(vec![NestedField::required( + 8, + "col_4_8", + Type::Struct(StructType::new(vec![NestedField::required( + 9, + "col_4_8_9", + Type::Primitive(PrimitiveType::Long), ) - .into(), - ])), + .into()])), + ) + .into()])), ) .into(), NestedField::required( @@ -1229,12 +1249,10 @@ mod tests { // check data file assert_eq!(data_file.record_count(), 4); assert!(data_file.value_counts().iter().all(|(_, &v)| { v == 4 })); - assert!( - data_file - .null_value_counts() - .iter() - .all(|(_, &v)| { v == 1 }) - ); + assert!(data_file + .null_value_counts() + .iter() + .all(|(_, &v)| { v == 1 })); assert_eq!( *data_file.lower_bounds(), HashMap::from([ @@ -1338,17 +1356,15 @@ mod tests { // test 1.1 and 2.2 let schema = Arc::new( Schema::builder() - .with_fields(vec![ - NestedField::optional( - 0, - "decimal", - Type::Primitive(PrimitiveType::Decimal { - precision: 28, - scale: 10, - }), - ) - .into(), - ]) + .with_fields(vec![NestedField::optional( + 0, + "decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 28, + scale: 10, + }), + ) + .into()]) .build() .unwrap(), ); @@ -1390,17 +1406,15 @@ mod tests { // test -1.1 and -2.2 let schema = Arc::new( Schema::builder() - .with_fields(vec![ - NestedField::optional( - 0, - "decimal", - Type::Primitive(PrimitiveType::Decimal { - precision: 28, - scale: 10, - }), - ) - .into(), - ]) + .with_fields(vec![NestedField::optional( + 0, + "decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 28, + scale: 10, + }), + ) + .into()]) .build() .unwrap(), ); @@ -1448,17 +1462,15 @@ mod tests { assert_eq!(decimal_scale(&decimal_max), decimal_scale(&decimal_min)); let schema = Arc::new( Schema::builder() - .with_fields(vec![ - NestedField::optional( - 0, - "decimal", - Type::Primitive(PrimitiveType::Decimal { - precision: 38, - scale: decimal_scale(&decimal_max), - }), - ) - .into(), - ]) + .with_fields(vec![NestedField::optional( + 0, + "decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 38, + scale: decimal_scale(&decimal_max), + }), + ) + .into()]) .build() .unwrap(), ); @@ -1718,31 +1730,29 @@ mod tests { let file_name_gen = DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); - let schema_struct_float_fields = Fields::from(vec![ - Field::new("col4", DataType::Float32, false).with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "4".to_string(), - )])), - ]); + let schema_struct_float_fields = + Fields::from(vec![Field::new("col4", DataType::Float32, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "4".to_string(), + )]))]); - let schema_struct_nested_float_fields = Fields::from(vec![ - Field::new("col7", DataType::Float32, false).with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "7".to_string(), - )])), - ]); + let schema_struct_nested_float_fields = + Fields::from(vec![Field::new("col7", DataType::Float32, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "7".to_string(), + )]))]); - let schema_struct_nested_fields = Fields::from(vec![ - Field::new( - "col6", - arrow_schema::DataType::Struct(schema_struct_nested_float_fields.clone()), - false, - ) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "6".to_string(), - )])), - ]); + let schema_struct_nested_fields = Fields::from(vec![Field::new( + "col6", + arrow_schema::DataType::Struct(schema_struct_nested_float_fields.clone()), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "6".to_string(), + )]))]); // prepare data let arrow_schema = { @@ -1868,11 +1878,15 @@ mod tests { "4".to_string(), )])); - let schema_struct_list_field = Fields::from(vec![ - Field::new_list("col2", schema_struct_list_float_field.clone(), true).with_metadata( - HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]), - ), - ]); + let schema_struct_list_field = Fields::from(vec![Field::new_list( + "col2", + schema_struct_list_float_field.clone(), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )]))]); let arrow_schema = { let fields = vec![ @@ -2080,20 +2094,18 @@ mod tests { [(PARQUET_FIELD_ID_META_KEY.to_string(), "7".to_string())], )); - let schema_struct_map_field = Fields::from(vec![ - Field::new_map( - "col3", - DEFAULT_MAP_FIELD_NAME, - struct_map_key_field_schema.clone(), - struct_map_value_field_schema.clone(), - false, - false, - ) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "5".to_string(), - )])), - ]); + let schema_struct_map_field = Fields::from(vec![Field::new_map( + "col3", + DEFAULT_MAP_FIELD_NAME, + struct_map_key_field_schema.clone(), + struct_map_value_field_schema.clone(), + false, + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "5".to_string(), + )]))]); let arrow_schema = { let fields = vec![ @@ -2225,11 +2237,13 @@ mod tests { Arc::new( Schema::builder() .with_schema_id(1) - .with_fields(vec![ - NestedField::required(0, "col", Type::Primitive(PrimitiveType::Long)) - .with_id(0) - .into(), - ]) + .with_fields(vec![NestedField::required( + 0, + "col", + Type::Primitive(PrimitiveType::Long), + ) + .with_id(0) + .into()]) .build() .expect("Failed to create schema"), ), @@ -2250,11 +2264,13 @@ mod tests { let schema = Arc::new( Schema::builder() .with_schema_id(1) - .with_fields(vec![ - NestedField::required(0, "col", Type::Primitive(PrimitiveType::Int)) - .with_id(0) - .into(), - ]) + .with_fields(vec![NestedField::required( + 0, + "col", + Type::Primitive(PrimitiveType::Int), + ) + .with_id(0) + .into()]) .build() .expect("Failed to create schema"), ); @@ -2279,4 +2295,304 @@ mod tests { assert_eq!(lower_bounds, HashMap::from([(0, Datum::int(i32::MIN))])); assert_eq!(upper_bounds, HashMap::from([(0, Datum::int(i32::MAX))])); } + + /// Helper to read the Parquet file footer key-value metadata from a written data file. + async fn read_parquet_kv_metadata( + file_io: &FileIO, + data_file: &DataFile, + ) -> Option> { + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + + let input_file = file_io.new_input(data_file.file_path.clone()).unwrap(); + let input_content = input_file.read().await.unwrap(); + let reader_builder = ParquetRecordBatchReaderBuilder::try_new(input_content).unwrap(); + reader_builder + .metadata() + .file_metadata() + .key_value_metadata() + .cloned() + } + + /// Helper to write a simple parquet file and return the data file, file_io, + /// and TempDir (must be kept alive so the file remains on disk). + async fn write_simple_parquet_file( + props: WriterProperties, + ) -> Result<(DataFile, FileIO, TempDir)> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + let iceberg_schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(0, "id", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::optional(1, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema: ArrowSchemaRef = Arc::new(iceberg_schema.as_ref().try_into().unwrap()); + let col0 = Arc::new(Int64Array::from_iter_values(0..10)) as ArrayRef; + let col1 = Arc::new(arrow_array::StringArray::from(vec![ + Some("a"), + Some("b"), + Some("c"), + None, + Some("e"), + Some("f"), + Some("g"), + Some("h"), + Some("i"), + Some("j"), + ])) as ArrayRef; + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![col0, col1]).unwrap(); + + let output_file = file_io.new_output( + location_gen.generate_location(None, &file_name_gen.generate_file_name()), + )?; + + let mut pw = ParquetWriterBuilder::new(props, iceberg_schema) + .build(output_file) + .await?; + pw.write(&batch).await?; + let res = pw.close().await?; + assert_eq!(res.len(), 1); + let data_file = res + .into_iter() + .next() + .unwrap() + .content(DataContentType::Data) + .partition(Struct::empty()) + .partition_spec_id(0) + .build() + .unwrap(); + + Ok((data_file, file_io, temp_dir)) + } + + #[tokio::test] + async fn test_iceberg_schema_metadata_present_in_parquet_footer() -> Result<()> { + let (data_file, file_io, _temp_dir) = + write_simple_parquet_file(WriterProperties::builder().build()).await?; + + let kv_metadata = read_parquet_kv_metadata(&file_io, &data_file).await; + let kv_metadata = kv_metadata.expect("key_value_metadata should be present"); + + let iceberg_schema_entry = kv_metadata.iter().find(|kv| kv.key == ICEBERG_SCHEMA_KEY); + assert!( + iceberg_schema_entry.is_some(), + "Parquet footer must contain '{ICEBERG_SCHEMA_KEY}' key-value metadata" + ); + + let schema_json = iceberg_schema_entry + .unwrap() + .value + .as_ref() + .expect("iceberg.schema value should not be None"); + assert!( + !schema_json.is_empty(), + "iceberg.schema JSON should not be empty" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_iceberg_schema_metadata_roundtrip() -> Result<()> { + let (data_file, file_io, _temp_dir) = + write_simple_parquet_file(WriterProperties::builder().build()).await?; + + let kv_metadata = read_parquet_kv_metadata(&file_io, &data_file) + .await + .expect("key_value_metadata should be present"); + + let schema_json = kv_metadata + .iter() + .find(|kv| kv.key == ICEBERG_SCHEMA_KEY) + .expect("iceberg.schema key must be present") + .value + .as_ref() + .expect("iceberg.schema value must not be None"); + + // Deserialize back to an Iceberg Schema + let deserialized: Schema = serde_json::from_str(schema_json) + .expect("iceberg.schema JSON should deserialize to a valid Schema"); + + // Verify schema ID + assert_eq!(deserialized.schema_id(), 1); + + // Verify field count + assert_eq!(deserialized.as_struct().fields().len(), 2); + + // Verify field IDs and names + let id_field = deserialized.field_by_id(0).expect("field 0 should exist"); + assert_eq!(id_field.name, "id"); + + let name_field = deserialized.field_by_id(1).expect("field 1 should exist"); + assert_eq!(name_field.name, "name"); + + Ok(()) + } + + #[tokio::test] + async fn test_iceberg_schema_metadata_with_nested_schema() -> Result<()> { + // Use a nested schema with Struct and List types to verify the iceberg.schema + // JSON correctly roundtrips complex types including field IDs. + let schema = Schema::builder() + .with_schema_id(2) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::optional( + 2, + "location", + Type::Struct(StructType::new(vec![ + NestedField::required(3, "lat", Type::Primitive(PrimitiveType::Double)) + .into(), + NestedField::required(4, "lon", Type::Primitive(PrimitiveType::Double)) + .into(), + ])), + ) + .into(), + NestedField::optional( + 5, + "tags", + Type::List(ListType::new( + NestedField::required(6, "element", Type::Primitive(PrimitiveType::String)) + .into(), + )), + ) + .into(), + ]) + .build() + .unwrap(); + + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap()); + + // Build columns + let col_id = Arc::new(Int64Array::from(vec![1i64, 2, 3])) as ArrayRef; + let col_location = Arc::new(StructArray::new( + { + if let DataType::Struct(fields) = arrow_schema.field(1).data_type() { + fields.clone() + } else { + unreachable!() + } + }, + vec![ + Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef, + Arc::new(Float64Array::from(vec![4.0, 5.0, 6.0])) as ArrayRef, + ], + None, + )) as ArrayRef; + let col_tags = Arc::new({ + let values = arrow_array::StringArray::from(vec!["a", "b", "c", "d", "e", "f"]); + let offsets = arrow_buffer::OffsetBuffer::new(arrow_buffer::ScalarBuffer::from(vec![ + 0i32, 2, 4, 6, + ])); + let field = arrow_schema.field(2).clone(); + if let DataType::List(inner) = field.data_type() { + ListArray::new(inner.clone(), offsets, Arc::new(values), None) + } else { + unreachable!() + } + }) as ArrayRef; + + let batch = + RecordBatch::try_new(arrow_schema.clone(), vec![col_id, col_location, col_tags]) + .unwrap(); + + let output_file = file_io.new_output( + location_gen.generate_location(None, &file_name_gen.generate_file_name()), + )?; + + let mut pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + Arc::new(schema.clone()), + ) + .build(output_file) + .await?; + pw.write(&batch).await?; + let res = pw.close().await?; + assert_eq!(res.len(), 1); + let data_file = res + .into_iter() + .next() + .unwrap() + .content(DataContentType::Data) + .partition(Struct::empty()) + .partition_spec_id(0) + .build() + .unwrap(); + + let kv_metadata = read_parquet_kv_metadata(&file_io, &data_file) + .await + .expect("key_value_metadata should be present"); + + let schema_json = kv_metadata + .iter() + .find(|kv| kv.key == ICEBERG_SCHEMA_KEY) + .expect("iceberg.schema key must be present for nested schema") + .value + .as_ref() + .expect("iceberg.schema value must not be None"); + + let deserialized: Schema = serde_json::from_str(schema_json) + .expect("nested iceberg.schema JSON should deserialize"); + + // Verify the nested schema round-trips correctly + assert_eq!(deserialized, schema); + + // Spot-check nested field IDs survived the roundtrip + assert_eq!(deserialized.field_by_id(3).unwrap().name, "lat"); + assert_eq!(deserialized.field_by_id(6).unwrap().name, "element"); + + Ok(()) + } + + #[tokio::test] + async fn test_iceberg_schema_preserves_existing_kv_metadata() -> Result<()> { + let props = WriterProperties::builder() + .set_key_value_metadata(Some(vec![parquet::file::metadata::KeyValue::new( + "custom.key".to_string(), + "custom_value".to_string(), + )])) + .build(); + + let (data_file, file_io, _temp_dir) = write_simple_parquet_file(props).await?; + + let kv_metadata = read_parquet_kv_metadata(&file_io, &data_file) + .await + .expect("key_value_metadata should be present"); + + // The caller's custom metadata should still be present + let custom_entry = kv_metadata.iter().find(|kv| kv.key == "custom.key"); + assert!( + custom_entry.is_some(), + "Caller's custom key-value metadata must be preserved" + ); + assert_eq!(custom_entry.unwrap().value.as_deref(), Some("custom_value")); + + // iceberg.schema should also be present + let iceberg_entry = kv_metadata.iter().find(|kv| kv.key == ICEBERG_SCHEMA_KEY); + assert!( + iceberg_entry.is_some(), + "iceberg.schema must be present alongside custom metadata" + ); + + Ok(()) + } } From 1c20f0c224d754644589fe70f5b2b7751a7502c8 Mon Sep 17 00:00:00 2001 From: volodymyr Date: Wed, 18 Mar 2026 11:36:10 +0000 Subject: [PATCH 2/3] fix: apply cargo fmt formatting fixes --- .../src/writer/file_writer/parquet_writer.rs | 214 +++++++++--------- 1 file changed, 111 insertions(+), 103 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 97b0f06142..1b89eed94f 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -24,23 +24,23 @@ use arrow_schema::SchemaRef as ArrowSchemaRef; use bytes::Bytes; use futures::future::BoxFuture; use itertools::Itertools; +use parquet::arrow::AsyncArrowWriter; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter; -use parquet::arrow::AsyncArrowWriter; use parquet::file::metadata::{KeyValue, ParquetMetaData}; use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics; use super::{FileWriter, FileWriterBuilder}; use crate::arrow::{ - get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum, ArrowFileReader, FieldMatchMode, - NanValueCountVisitor, DEFAULT_MAP_FIELD_NAME, + ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor, + get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum, }; use crate::io::{FileIO, FileWrite, OutputFile}; use crate::spec::{ - visit_schema, DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, - MapType, NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, - Struct, StructType, TableMetadata, Type, + DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType, + NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct, + StructType, TableMetadata, Type, visit_schema, }; use crate::transform::create_transform_function; use crate::writer::{CurrentFileStatus, DataFile}; @@ -750,17 +750,21 @@ mod tests { NestedField::required( 4, "col4", - Type::Struct(StructType::new(vec![NestedField::required( - 8, - "col_4_8", - Type::Struct(StructType::new(vec![NestedField::required( - 9, - "col_4_8_9", - Type::Primitive(PrimitiveType::Long), + Type::Struct(StructType::new(vec![ + NestedField::required( + 8, + "col_4_8", + Type::Struct(StructType::new(vec![ + NestedField::required( + 9, + "col_4_8_9", + Type::Primitive(PrimitiveType::Long), + ) + .into(), + ])), ) - .into()])), - ) - .into()])), + .into(), + ])), ) .into(), NestedField::required( @@ -1249,10 +1253,12 @@ mod tests { // check data file assert_eq!(data_file.record_count(), 4); assert!(data_file.value_counts().iter().all(|(_, &v)| { v == 4 })); - assert!(data_file - .null_value_counts() - .iter() - .all(|(_, &v)| { v == 1 })); + assert!( + data_file + .null_value_counts() + .iter() + .all(|(_, &v)| { v == 1 }) + ); assert_eq!( *data_file.lower_bounds(), HashMap::from([ @@ -1356,15 +1362,17 @@ mod tests { // test 1.1 and 2.2 let schema = Arc::new( Schema::builder() - .with_fields(vec![NestedField::optional( - 0, - "decimal", - Type::Primitive(PrimitiveType::Decimal { - precision: 28, - scale: 10, - }), - ) - .into()]) + .with_fields(vec![ + NestedField::optional( + 0, + "decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 28, + scale: 10, + }), + ) + .into(), + ]) .build() .unwrap(), ); @@ -1406,15 +1414,17 @@ mod tests { // test -1.1 and -2.2 let schema = Arc::new( Schema::builder() - .with_fields(vec![NestedField::optional( - 0, - "decimal", - Type::Primitive(PrimitiveType::Decimal { - precision: 28, - scale: 10, - }), - ) - .into()]) + .with_fields(vec![ + NestedField::optional( + 0, + "decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 28, + scale: 10, + }), + ) + .into(), + ]) .build() .unwrap(), ); @@ -1462,15 +1472,17 @@ mod tests { assert_eq!(decimal_scale(&decimal_max), decimal_scale(&decimal_min)); let schema = Arc::new( Schema::builder() - .with_fields(vec![NestedField::optional( - 0, - "decimal", - Type::Primitive(PrimitiveType::Decimal { - precision: 38, - scale: decimal_scale(&decimal_max), - }), - ) - .into()]) + .with_fields(vec![ + NestedField::optional( + 0, + "decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 38, + scale: decimal_scale(&decimal_max), + }), + ) + .into(), + ]) .build() .unwrap(), ); @@ -1730,29 +1742,31 @@ mod tests { let file_name_gen = DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); - let schema_struct_float_fields = - Fields::from(vec![Field::new("col4", DataType::Float32, false) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "4".to_string(), - )]))]); + let schema_struct_float_fields = Fields::from(vec![ + Field::new("col4", DataType::Float32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "4".to_string(), + )])), + ]); - let schema_struct_nested_float_fields = - Fields::from(vec![Field::new("col7", DataType::Float32, false) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "7".to_string(), - )]))]); + let schema_struct_nested_float_fields = Fields::from(vec![ + Field::new("col7", DataType::Float32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "7".to_string(), + )])), + ]); - let schema_struct_nested_fields = Fields::from(vec![Field::new( - "col6", - arrow_schema::DataType::Struct(schema_struct_nested_float_fields.clone()), - false, - ) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "6".to_string(), - )]))]); + let schema_struct_nested_fields = Fields::from(vec![ + Field::new( + "col6", + arrow_schema::DataType::Struct(schema_struct_nested_float_fields.clone()), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "6".to_string(), + )])), + ]); // prepare data let arrow_schema = { @@ -1878,15 +1892,11 @@ mod tests { "4".to_string(), )])); - let schema_struct_list_field = Fields::from(vec![Field::new_list( - "col2", - schema_struct_list_float_field.clone(), - true, - ) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "3".to_string(), - )]))]); + let schema_struct_list_field = Fields::from(vec![ + Field::new_list("col2", schema_struct_list_float_field.clone(), true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]), + ), + ]); let arrow_schema = { let fields = vec![ @@ -2094,18 +2104,20 @@ mod tests { [(PARQUET_FIELD_ID_META_KEY.to_string(), "7".to_string())], )); - let schema_struct_map_field = Fields::from(vec![Field::new_map( - "col3", - DEFAULT_MAP_FIELD_NAME, - struct_map_key_field_schema.clone(), - struct_map_value_field_schema.clone(), - false, - false, - ) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "5".to_string(), - )]))]); + let schema_struct_map_field = Fields::from(vec![ + Field::new_map( + "col3", + DEFAULT_MAP_FIELD_NAME, + struct_map_key_field_schema.clone(), + struct_map_value_field_schema.clone(), + false, + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "5".to_string(), + )])), + ]); let arrow_schema = { let fields = vec![ @@ -2237,13 +2249,11 @@ mod tests { Arc::new( Schema::builder() .with_schema_id(1) - .with_fields(vec![NestedField::required( - 0, - "col", - Type::Primitive(PrimitiveType::Long), - ) - .with_id(0) - .into()]) + .with_fields(vec![ + NestedField::required(0, "col", Type::Primitive(PrimitiveType::Long)) + .with_id(0) + .into(), + ]) .build() .expect("Failed to create schema"), ), @@ -2264,13 +2274,11 @@ mod tests { let schema = Arc::new( Schema::builder() .with_schema_id(1) - .with_fields(vec![NestedField::required( - 0, - "col", - Type::Primitive(PrimitiveType::Int), - ) - .with_id(0) - .into()]) + .with_fields(vec![ + NestedField::required(0, "col", Type::Primitive(PrimitiveType::Int)) + .with_id(0) + .into(), + ]) .build() .expect("Failed to create schema"), ); From 6692efe7ac53daa7c977a165174842e5d8b7ef26 Mon Sep 17 00:00:00 2001 From: volodymyr Date: Wed, 18 Mar 2026 12:05:22 +0000 Subject: [PATCH 3/3] fix: update FileIOBuilder to FileIO::new_with_fs() and re-apply cargo fmt --- crates/iceberg/src/writer/file_writer/parquet_writer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 419ed65c2a..317f17db5d 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -2327,7 +2327,7 @@ mod tests { props: WriterProperties, ) -> Result<(DataFile, FileIO, TempDir)> { let temp_dir = TempDir::new().unwrap(); - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let file_io = FileIO::new_with_fs(); let location_gen = DefaultLocationGenerator::with_data_location( temp_dir.path().to_str().unwrap().to_string(), ); @@ -2481,7 +2481,7 @@ mod tests { .unwrap(); let temp_dir = TempDir::new().unwrap(); - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let file_io = FileIO::new_with_fs(); let location_gen = DefaultLocationGenerator::with_data_location( temp_dir.path().to_str().unwrap().to_string(), );