diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index c091c45177..038b6bc7f7 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -28,6 +28,7 @@ pub mod delete_file_loader; pub(crate) mod delete_filter; mod reader; +pub(crate) use reader::apply_name_mapping_to_arrow_schema; /// RecordBatch projection utilities pub mod record_batch_projector; pub(crate) mod record_batch_transformer; diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 5b3a7bb862..2add5b4bd0 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -1025,7 +1025,7 @@ fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) -> HashMap Result> { diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index 0379465584..ca0efa69cb 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -250,6 +250,34 @@ pub enum PrimitiveType { } impl PrimitiveType { + /// Returns `true` if `self` can be promoted to `target` per the Iceberg spec's + /// valid type promotion rules. + /// + /// See + pub fn is_promotable_to(&self, target: &PrimitiveType) -> bool { + matches!( + (self, target), + (PrimitiveType::Int, PrimitiveType::Long) + | (PrimitiveType::Float, PrimitiveType::Double) + | (PrimitiveType::Date, PrimitiveType::Timestamp) + | (PrimitiveType::Date, PrimitiveType::TimestampNs) + | (PrimitiveType::String, PrimitiveType::Binary) + | (PrimitiveType::Binary, PrimitiveType::String) + ) || matches!( + (self, target), + ( + PrimitiveType::Decimal { + precision: p1, + scale: s1, + }, + PrimitiveType::Decimal { + precision: p2, + scale: s2, + }, + ) if s1 == s2 && p1 < p2 + ) + } + /// Check whether literal is compatible with the type. pub fn compatible(&self, literal: &PrimitiveLiteral) -> bool { matches!( @@ -1260,4 +1288,48 @@ mod tests { .contains("expected type 'struct'") ); } + + #[test] + fn test_is_promotable_to() { + assert!(PrimitiveType::Int.is_promotable_to(&PrimitiveType::Long)); + assert!(PrimitiveType::Float.is_promotable_to(&PrimitiveType::Double)); + assert!(PrimitiveType::Date.is_promotable_to(&PrimitiveType::Timestamp)); + assert!(PrimitiveType::Date.is_promotable_to(&PrimitiveType::TimestampNs)); + assert!(PrimitiveType::String.is_promotable_to(&PrimitiveType::Binary)); + assert!(PrimitiveType::Binary.is_promotable_to(&PrimitiveType::String)); + assert!( + PrimitiveType::Decimal { + precision: 10, + scale: 2, + } + .is_promotable_to(&PrimitiveType::Decimal { + precision: 20, + scale: 2, + }) + ); + + assert!(!PrimitiveType::Long.is_promotable_to(&PrimitiveType::Int)); + assert!(!PrimitiveType::Double.is_promotable_to(&PrimitiveType::Float)); + assert!(!PrimitiveType::Int.is_promotable_to(&PrimitiveType::String)); + assert!( + !PrimitiveType::Decimal { + precision: 10, + scale: 2, + } + .is_promotable_to(&PrimitiveType::Decimal { + precision: 20, + scale: 3, + }) + ); + assert!( + !PrimitiveType::Decimal { + precision: 20, + scale: 2, + } + .is_promotable_to(&PrimitiveType::Decimal { + precision: 10, + scale: 2, + }) + ); + } } diff --git a/crates/iceberg/src/spec/schema/mod.rs b/crates/iceberg/src/spec/schema/mod.rs index 13ad41818b..3e2976a1b7 100644 --- a/crates/iceberg/src/spec/schema/mod.rs +++ b/crates/iceberg/src/spec/schema/mod.rs @@ -431,6 +431,133 @@ impl Display for Schema { } } +/// Check whether `file_schema` is compatible with `table_schema` for appending data files. +/// +/// Walks the table schema and checks that every field is present and type-compatible +/// in the file schema (looked up by field ID). Follows the same semantics as +/// iceberg-python's `_check_schema_compatible`. +/// +/// Compatibility rules per field: +/// - If a table field is **required** and absent from the file schema: error +/// - If a table field is **optional** and absent from the file schema: ok (reads as null) +/// - If a table field is **required** but the file field is **optional**: error +/// - If types are equal: ok +/// - If file primitive type is promotable to table primitive type: ok +/// - If both are the same container kind (struct/list/map): ok at this level (children checked recursively) +/// - Otherwise: error +pub fn check_schema_compatible(table_schema: &Schema, file_schema: &Schema) -> Result<()> { + let mut visitor = SchemaCompatibilityVisitor { + file_schema, + mismatches: Vec::new(), + }; + visit_schema(table_schema, &mut visitor)?; + Ok(()) +} + +struct SchemaCompatibilityVisitor<'a> { + file_schema: &'a Schema, + mismatches: Vec, +} + +impl<'a> SchemaCompatibilityVisitor<'a> { + fn check_field(&mut self, field: &NestedFieldRef) { + let field_id = field.id; + match self.file_schema.field_by_id(field_id) { + None => { + if field.required { + self.mismatches.push(format!( + "Field {} ({}, id={}) is required in table schema but missing in file schema", + field.name, field.field_type, field_id + )); + } + } + Some(file_field) => { + if field.required && !file_field.required { + self.mismatches.push(format!( + "Field {} (id={}) is required in table schema but optional in file schema", + field.name, field_id + )); + } + + if field.field_type != file_field.field_type { + let compatible = + match (field.field_type.as_ref(), file_field.field_type.as_ref()) { + (Type::Primitive(table_p), Type::Primitive(file_p)) => { + file_p.is_promotable_to(table_p) + } + (Type::Struct(_), Type::Struct(_)) + | (Type::List(_), Type::List(_)) + | (Type::Map(_), Type::Map(_)) => true, + _ => false, + }; + + if !compatible { + self.mismatches.push(format!( + "Field {} (id={}) has incompatible types: table type is {}, file type is {}", + field.name, field_id, field.field_type, file_field.field_type + )); + } + } + } + } + } +} + +impl SchemaVisitor for SchemaCompatibilityVisitor<'_> { + type T = (); + + fn before_struct_field(&mut self, field: &NestedFieldRef) -> Result<()> { + self.check_field(field); + Ok(()) + } + + fn before_list_element(&mut self, field: &NestedFieldRef) -> Result<()> { + self.check_field(field); + Ok(()) + } + + fn before_map_key(&mut self, field: &NestedFieldRef) -> Result<()> { + self.check_field(field); + Ok(()) + } + + fn before_map_value(&mut self, field: &NestedFieldRef) -> Result<()> { + self.check_field(field); + Ok(()) + } + + fn schema(&mut self, _schema: &Schema, _value: Self::T) -> Result { + if self.mismatches.is_empty() { + Ok(()) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!("Schema is not compatible:\n{}", self.mismatches.join("\n")), + )) + } + } + + fn field(&mut self, _field: &NestedFieldRef, _value: Self::T) -> Result { + Ok(()) + } + + fn r#struct(&mut self, _struct: &StructType, _results: Vec) -> Result { + Ok(()) + } + + fn list(&mut self, _list: &ListType, _value: Self::T) -> Result { + Ok(()) + } + + fn map(&mut self, _map: &MapType, _key_value: Self::T, _value: Self::T) -> Result { + Ok(()) + } + + fn primitive(&mut self, _p: &PrimitiveType) -> Result { + Ok(()) + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; @@ -441,7 +568,7 @@ mod tests { use crate::spec::datatypes::{ ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, StructType, Type, }; - use crate::spec::schema::Schema; + use crate::spec::schema::{Schema, check_schema_compatible}; use crate::spec::values::Map as MapValue; use crate::spec::{Datum, Literal}; @@ -1228,4 +1355,308 @@ table { .is_err() ); } + + #[test] + fn test_schema_compatible_exact_match() { + let table_schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Primitive(PrimitiveType::Long)).into(), + NestedField::optional(2, "name", Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + let file_schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Primitive(PrimitiveType::Long)).into(), + NestedField::optional(2, "name", Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + assert!(check_schema_compatible(&table_schema, &file_schema).is_ok()); + } + + #[test] + fn test_schema_compatible_missing_optional_column() { + let table_schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Primitive(PrimitiveType::Long)).into(), + NestedField::optional(2, "name", Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + let file_schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Primitive(PrimitiveType::Long)).into(), + ]) + .build() + .unwrap(); + + assert!(check_schema_compatible(&table_schema, &file_schema).is_ok()); + } + + #[test] + fn test_schema_compatible_missing_required_column() { + let table_schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Primitive(PrimitiveType::Long)).into(), + NestedField::required(2, "name", Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + let file_schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Primitive(PrimitiveType::Long)).into(), + ]) + .build() + .unwrap(); + + let result = check_schema_compatible(&table_schema, &file_schema); + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!(err.contains("name")); + assert!(err.contains("required")); + assert!(err.contains("missing")); + } + + #[test] + fn test_schema_compatible_type_mismatch() { + let table_schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Primitive(PrimitiveType::Long)).into(), + ]) + .build() + .unwrap(); + + let file_schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + let result = check_schema_compatible(&table_schema, &file_schema); + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!(err.contains("incompatible types")); + } + + #[test] + fn test_schema_compatible_promotable_type() { + let table_schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Primitive(PrimitiveType::Long)).into(), + NestedField::optional(2, "value", Primitive(PrimitiveType::Double)).into(), + ]) + .build() + .unwrap(); + + let file_schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "value", Primitive(PrimitiveType::Float)).into(), + ]) + .build() + .unwrap(); + + assert!(check_schema_compatible(&table_schema, &file_schema).is_ok()); + } + + #[test] + fn test_schema_compatible_extra_columns_in_file() { + let table_schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Primitive(PrimitiveType::Long)).into(), + ]) + .build() + .unwrap(); + + let file_schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Primitive(PrimitiveType::Long)).into(), + NestedField::optional(2, "extra", Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + assert!(check_schema_compatible(&table_schema, &file_schema).is_ok()); + } + + #[test] + fn test_schema_compatible_required_table_optional_file() { + let table_schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Primitive(PrimitiveType::Long)).into(), + ]) + .build() + .unwrap(); + + let file_schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::optional(1, "id", Primitive(PrimitiveType::Long)).into(), + ]) + .build() + .unwrap(); + + let result = check_schema_compatible(&table_schema, &file_schema); + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!(err.contains("required")); + assert!(err.contains("optional")); + } + + #[test] + fn test_schema_compatible_decimal_promotion() { + let table_schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required( + 1, + "amount", + Primitive(PrimitiveType::Decimal { + precision: 20, + scale: 2, + }), + ) + .into(), + ]) + .build() + .unwrap(); + + let file_schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required( + 1, + "amount", + Primitive(PrimitiveType::Decimal { + precision: 10, + scale: 2, + }), + ) + .into(), + ]) + .build() + .unwrap(); + + assert!(check_schema_compatible(&table_schema, &file_schema).is_ok()); + } + + #[test] + fn test_schema_compatible_decimal_scale_mismatch() { + let table_schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required( + 1, + "amount", + Primitive(PrimitiveType::Decimal { + precision: 20, + scale: 2, + }), + ) + .into(), + ]) + .build() + .unwrap(); + + let file_schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required( + 1, + "amount", + Primitive(PrimitiveType::Decimal { + precision: 10, + scale: 3, + }), + ) + .into(), + ]) + .build() + .unwrap(); + + let result = check_schema_compatible(&table_schema, &file_schema); + assert!(result.is_err()); + } + + #[test] + fn test_schema_compatible_nested_struct() { + let table_schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required( + 1, + "data", + Struct(StructType::new(vec![ + NestedField::required(2, "x", Primitive(PrimitiveType::Long)).into(), + NestedField::optional(3, "y", Primitive(PrimitiveType::String)).into(), + ])), + ) + .into(), + ]) + .build() + .unwrap(); + + let file_schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required( + 1, + "data", + Struct(StructType::new(vec![ + NestedField::required(2, "x", Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "y", Primitive(PrimitiveType::String)).into(), + ])), + ) + .into(), + ]) + .build() + .unwrap(); + + assert!(check_schema_compatible(&table_schema, &file_schema).is_ok()); + } + + #[test] + fn test_schema_compatible_multiple_mismatches_reported() { + let table_schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Primitive(PrimitiveType::Long)).into(), + NestedField::required(2, "name", Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + let file_schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap(); + + let result = check_schema_compatible(&table_schema, &file_schema); + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!(err.contains("id")); + assert!(err.contains("name")); + } } diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index da04d5435b..f47e627a9a 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -24,9 +24,9 @@ use arrow_schema::SchemaRef as ArrowSchemaRef; use bytes::Bytes; use futures::future::BoxFuture; use itertools::Itertools; -use parquet::arrow::AsyncArrowWriter; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter; +use parquet::arrow::{AsyncArrowWriter, PARQUET_FIELD_ID_META_KEY, parquet_to_arrow_schema}; use parquet::file::metadata::ParquetMetaData; use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics; @@ -34,13 +34,14 @@ use parquet::file::statistics::Statistics; use super::{FileWriter, FileWriterBuilder}; use crate::arrow::{ ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor, - get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum, + apply_name_mapping_to_arrow_schema, arrow_schema_to_schema, get_parquet_stat_max_as_datum, + get_parquet_stat_min_as_datum, }; use crate::io::{FileIO, FileWrite, OutputFile}; use crate::spec::{ - DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType, - NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct, - StructType, TableMetadata, Type, visit_schema, + DEFAULT_SCHEMA_NAME_MAPPING, DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, + Literal, MapType, NameMapping, NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, + SchemaVisitor, Struct, StructType, TableMetadata, Type, check_schema_compatible, visit_schema, }; use crate::transform::create_transform_function; use crate::writer::{CurrentFileStatus, DataFile}; @@ -307,7 +308,10 @@ impl MinMaxColAggregator { } impl ParquetWriter { - /// Converts parquet files to data files + /// Converts parquet files to data files. + /// + /// Validates that each parquet file's schema is compatible with the table schema + /// before converting. Returns an error if any file has an incompatible schema. #[allow(dead_code)] pub(crate) async fn parquet_files_to_data_files( file_io: &FileIO, @@ -317,6 +321,19 @@ impl ParquetWriter { // TODO: support adding to partitioned table let mut data_files: Vec = Vec::new(); + let table_schema = table_metadata.current_schema(); + let name_mapping = table_metadata + .properties() + .get(DEFAULT_SCHEMA_NAME_MAPPING) + .map(|s| serde_json::from_str::(s)) + .transpose() + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to parse name mapping: {e}"), + ) + })?; + for file_path in file_paths { let input_file = file_io.new_input(&file_path)?; let file_metadata = input_file.metadata().await?; @@ -330,6 +347,47 @@ impl ParquetWriter { format!("Error reading Parquet metadata: {err}"), ) })?; + + let arrow_schema = parquet_to_arrow_schema( + parquet_metadata.file_metadata().schema_descr(), + parquet_metadata.file_metadata().key_value_metadata(), + ) + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Failed to convert Parquet schema to Arrow schema for {file_path}: {e}" + ), + ) + })?; + + let missing_field_ids = arrow_schema + .fields() + .iter() + .next() + .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()); + + let file_schema = if missing_field_ids { + if let Some(ref nm) = name_mapping { + let mapped_schema = + apply_name_mapping_to_arrow_schema(Arc::new(arrow_schema), nm)?; + arrow_schema_to_schema(&mapped_schema)? + } else { + let property = DEFAULT_SCHEMA_NAME_MAPPING; + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Parquet file {file_path} does not have field IDs and the table \ + does not have '{property}' defined" + ), + )); + } + } else { + arrow_schema_to_schema(&arrow_schema)? + }; + + check_schema_compatible(table_schema, &file_schema)?; + let mut builder = ParquetWriter::parquet_to_data_file_builder( table_metadata.current_schema().clone(), parquet_metadata,