Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
326 changes: 325 additions & 1 deletion crates/iceberg/src/writer/file_writer/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use itertools::Itertools;
use parquet::arrow::AsyncArrowWriter;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::metadata::{KeyValue, ParquetMetaData};
use parquet::file::properties::WriterProperties;
use parquet::file::statistics::Statistics;

Expand All @@ -46,6 +46,9 @@ use crate::transform::create_transform_function;
use crate::writer::{CurrentFileStatus, DataFile};
use crate::{Error, ErrorKind, Result};

/// The key used to store the Iceberg schema JSON in the Parquet file footer metadata.
const ICEBERG_SCHEMA_KEY: &str = "iceberg.schema";

/// ParquetWriterBuilder is used to builder a [`ParquetWriter`]
#[derive(Clone, Debug)]
pub struct ParquetWriterBuilder {
Expand All @@ -67,12 +70,33 @@ impl ParquetWriterBuilder {
schema: SchemaRef,
match_mode: FieldMatchMode,
) -> Self {
let props = Self::add_iceberg_schema_metadata(props, &schema);
Self {
props,
schema,
match_mode,
}
}

/// Adds the `iceberg.schema` key-value metadata to the Parquet writer properties.
///
/// This embeds the full Iceberg schema JSON (including field IDs, types, and
/// schema ID) into the Parquet file footer.
fn add_iceberg_schema_metadata(props: WriterProperties, schema: &Schema) -> WriterProperties {
let schema_json = serde_json::to_string(schema)
.expect("Iceberg schema serialization to JSON should not fail");

let iceberg_kv = KeyValue::new(ICEBERG_SCHEMA_KEY.to_string(), schema_json);

// Preserve any existing key-value metadata from the caller
let mut kv_metadata = props.key_value_metadata().cloned().unwrap_or_default();
kv_metadata.push(iceberg_kv);

props
.into_builder()
.set_key_value_metadata(Some(kv_metadata))
.build()
}
}

impl FileWriterBuilder for ParquetWriterBuilder {
Expand Down Expand Up @@ -2279,4 +2303,304 @@ mod tests {
assert_eq!(lower_bounds, HashMap::from([(0, Datum::int(i32::MIN))]));
assert_eq!(upper_bounds, HashMap::from([(0, Datum::int(i32::MAX))]));
}

/// Helper to read the Parquet file footer key-value metadata from a written data file.
async fn read_parquet_kv_metadata(
file_io: &FileIO,
data_file: &DataFile,
) -> Option<Vec<parquet::file::metadata::KeyValue>> {
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;

let input_file = file_io.new_input(data_file.file_path.clone()).unwrap();
let input_content = input_file.read().await.unwrap();
let reader_builder = ParquetRecordBatchReaderBuilder::try_new(input_content).unwrap();
reader_builder
.metadata()
.file_metadata()
.key_value_metadata()
.cloned()
}

/// Helper to write a simple parquet file and return the data file, file_io,
/// and TempDir (must be kept alive so the file remains on disk).
async fn write_simple_parquet_file(
props: WriterProperties,
) -> Result<(DataFile, FileIO, TempDir)> {
let temp_dir = TempDir::new().unwrap();
let file_io = FileIO::new_with_fs();
let location_gen = DefaultLocationGenerator::with_data_location(
temp_dir.path().to_str().unwrap().to_string(),
);
let file_name_gen =
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);

let iceberg_schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(0, "id", Type::Primitive(PrimitiveType::Long)).into(),
NestedField::optional(1, "name", Type::Primitive(PrimitiveType::String)).into(),
])
.build()
.unwrap(),
);

let arrow_schema: ArrowSchemaRef = Arc::new(iceberg_schema.as_ref().try_into().unwrap());
let col0 = Arc::new(Int64Array::from_iter_values(0..10)) as ArrayRef;
let col1 = Arc::new(arrow_array::StringArray::from(vec![
Some("a"),
Some("b"),
Some("c"),
None,
Some("e"),
Some("f"),
Some("g"),
Some("h"),
Some("i"),
Some("j"),
])) as ArrayRef;
let batch = RecordBatch::try_new(arrow_schema.clone(), vec![col0, col1]).unwrap();

let output_file = file_io.new_output(
location_gen.generate_location(None, &file_name_gen.generate_file_name()),
)?;

let mut pw = ParquetWriterBuilder::new(props, iceberg_schema)
.build(output_file)
.await?;
pw.write(&batch).await?;
let res = pw.close().await?;
assert_eq!(res.len(), 1);
let data_file = res
.into_iter()
.next()
.unwrap()
.content(DataContentType::Data)
.partition(Struct::empty())
.partition_spec_id(0)
.build()
.unwrap();

Ok((data_file, file_io, temp_dir))
}

#[tokio::test]
async fn test_iceberg_schema_metadata_present_in_parquet_footer() -> Result<()> {
let (data_file, file_io, _temp_dir) =
write_simple_parquet_file(WriterProperties::builder().build()).await?;

let kv_metadata = read_parquet_kv_metadata(&file_io, &data_file).await;
let kv_metadata = kv_metadata.expect("key_value_metadata should be present");

let iceberg_schema_entry = kv_metadata.iter().find(|kv| kv.key == ICEBERG_SCHEMA_KEY);
assert!(
iceberg_schema_entry.is_some(),
"Parquet footer must contain '{ICEBERG_SCHEMA_KEY}' key-value metadata"
);

let schema_json = iceberg_schema_entry
.unwrap()
.value
.as_ref()
.expect("iceberg.schema value should not be None");
assert!(
!schema_json.is_empty(),
"iceberg.schema JSON should not be empty"
);

Ok(())
}

#[tokio::test]
async fn test_iceberg_schema_metadata_roundtrip() -> Result<()> {
let (data_file, file_io, _temp_dir) =
write_simple_parquet_file(WriterProperties::builder().build()).await?;

let kv_metadata = read_parquet_kv_metadata(&file_io, &data_file)
.await
.expect("key_value_metadata should be present");

let schema_json = kv_metadata
.iter()
.find(|kv| kv.key == ICEBERG_SCHEMA_KEY)
.expect("iceberg.schema key must be present")
.value
.as_ref()
.expect("iceberg.schema value must not be None");

// Deserialize back to an Iceberg Schema
let deserialized: Schema = serde_json::from_str(schema_json)
.expect("iceberg.schema JSON should deserialize to a valid Schema");

// Verify schema ID
assert_eq!(deserialized.schema_id(), 1);

// Verify field count
assert_eq!(deserialized.as_struct().fields().len(), 2);

// Verify field IDs and names
let id_field = deserialized.field_by_id(0).expect("field 0 should exist");
assert_eq!(id_field.name, "id");

let name_field = deserialized.field_by_id(1).expect("field 1 should exist");
assert_eq!(name_field.name, "name");

Ok(())
}

#[tokio::test]
async fn test_iceberg_schema_metadata_with_nested_schema() -> Result<()> {
// Use a nested schema with Struct and List types to verify the iceberg.schema
// JSON correctly roundtrips complex types including field IDs.
let schema = Schema::builder()
.with_schema_id(2)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
NestedField::optional(
2,
"location",
Type::Struct(StructType::new(vec![
NestedField::required(3, "lat", Type::Primitive(PrimitiveType::Double))
.into(),
NestedField::required(4, "lon", Type::Primitive(PrimitiveType::Double))
.into(),
])),
)
.into(),
NestedField::optional(
5,
"tags",
Type::List(ListType::new(
NestedField::required(6, "element", Type::Primitive(PrimitiveType::String))
.into(),
)),
)
.into(),
])
.build()
.unwrap();

let temp_dir = TempDir::new().unwrap();
let file_io = FileIO::new_with_fs();
let location_gen = DefaultLocationGenerator::with_data_location(
temp_dir.path().to_str().unwrap().to_string(),
);
let file_name_gen =
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);

let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap());

// Build columns
let col_id = Arc::new(Int64Array::from(vec![1i64, 2, 3])) as ArrayRef;
let col_location = Arc::new(StructArray::new(
{
if let DataType::Struct(fields) = arrow_schema.field(1).data_type() {
fields.clone()
} else {
unreachable!()
}
},
vec![
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
Arc::new(Float64Array::from(vec![4.0, 5.0, 6.0])) as ArrayRef,
],
None,
)) as ArrayRef;
let col_tags = Arc::new({
let values = arrow_array::StringArray::from(vec!["a", "b", "c", "d", "e", "f"]);
let offsets = arrow_buffer::OffsetBuffer::new(arrow_buffer::ScalarBuffer::from(vec![
0i32, 2, 4, 6,
]));
let field = arrow_schema.field(2).clone();
if let DataType::List(inner) = field.data_type() {
ListArray::new(inner.clone(), offsets, Arc::new(values), None)
} else {
unreachable!()
}
}) as ArrayRef;

let batch =
RecordBatch::try_new(arrow_schema.clone(), vec![col_id, col_location, col_tags])
.unwrap();

let output_file = file_io.new_output(
location_gen.generate_location(None, &file_name_gen.generate_file_name()),
)?;

let mut pw = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
Arc::new(schema.clone()),
)
.build(output_file)
.await?;
pw.write(&batch).await?;
let res = pw.close().await?;
assert_eq!(res.len(), 1);
let data_file = res
.into_iter()
.next()
.unwrap()
.content(DataContentType::Data)
.partition(Struct::empty())
.partition_spec_id(0)
.build()
.unwrap();

let kv_metadata = read_parquet_kv_metadata(&file_io, &data_file)
.await
.expect("key_value_metadata should be present");

let schema_json = kv_metadata
.iter()
.find(|kv| kv.key == ICEBERG_SCHEMA_KEY)
.expect("iceberg.schema key must be present for nested schema")
.value
.as_ref()
.expect("iceberg.schema value must not be None");

let deserialized: Schema = serde_json::from_str(schema_json)
.expect("nested iceberg.schema JSON should deserialize");

// Verify the nested schema round-trips correctly
assert_eq!(deserialized, schema);

// Spot-check nested field IDs survived the roundtrip
assert_eq!(deserialized.field_by_id(3).unwrap().name, "lat");
assert_eq!(deserialized.field_by_id(6).unwrap().name, "element");

Ok(())
}

#[tokio::test]
async fn test_iceberg_schema_preserves_existing_kv_metadata() -> Result<()> {
let props = WriterProperties::builder()
.set_key_value_metadata(Some(vec![parquet::file::metadata::KeyValue::new(
"custom.key".to_string(),
"custom_value".to_string(),
)]))
.build();

let (data_file, file_io, _temp_dir) = write_simple_parquet_file(props).await?;

let kv_metadata = read_parquet_kv_metadata(&file_io, &data_file)
.await
.expect("key_value_metadata should be present");

// The caller's custom metadata should still be present
let custom_entry = kv_metadata.iter().find(|kv| kv.key == "custom.key");
assert!(
custom_entry.is_some(),
"Caller's custom key-value metadata must be preserved"
);
assert_eq!(custom_entry.unwrap().value.as_deref(), Some("custom_value"));

// iceberg.schema should also be present
let iceberg_entry = kv_metadata.iter().find(|kv| kv.key == ICEBERG_SCHEMA_KEY);
assert!(
iceberg_entry.is_some(),
"iceberg.schema must be present alongside custom metadata"
);

Ok(())
}
}
Loading