From 488298d9593cd37c7c06a5b2540f78d835b0addb Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Sat, 28 Feb 2026 15:02:56 +0100 Subject: [PATCH 1/2] feat: Variant Support --- crates/catalog/glue/src/schema.rs | 7 + crates/catalog/hms/src/schema.rs | 7 + .../src/arrow/caching_delete_file_loader.rs | 6 +- .../iceberg/src/arrow/nan_val_cnt_visitor.rs | 6 +- crates/iceberg/src/arrow/reader.rs | 157 ++++++++++++++- crates/iceberg/src/arrow/schema.rs | 27 ++- crates/iceberg/src/arrow/value.rs | 9 +- crates/iceberg/src/avro/schema.rs | 140 +++++++++++++- crates/iceberg/src/spec/datatypes.rs | 78 +++++++- .../iceberg/src/spec/schema/id_reassigner.rs | 1 + crates/iceberg/src/spec/schema/index.rs | 12 ++ crates/iceberg/src/spec/schema/mod.rs | 39 ++++ .../iceberg/src/spec/schema/prune_columns.rs | 4 + crates/iceberg/src/spec/schema/visitor.rs | 8 + crates/iceberg/src/spec/table_metadata.rs | 10 + crates/iceberg/src/spec/values/literal.rs | 4 + .../src/writer/file_writer/parquet_writer.rs | 35 ++-- .../integration_tests/tests/read_variant.rs | 179 ++++++++++++++++++ dev/spark/provision.py | 18 ++ 19 files changed, 720 insertions(+), 27 deletions(-) create mode 100644 crates/integration_tests/tests/read_variant.rs diff --git a/crates/catalog/glue/src/schema.rs b/crates/catalog/glue/src/schema.rs index 864320dae4..40780308e9 100644 --- a/crates/catalog/glue/src/schema.rs +++ b/crates/catalog/glue/src/schema.rs @@ -182,6 +182,13 @@ impl SchemaVisitor for GlueSchemaBuilder { Ok(glue_type) } + + fn variant(&mut self, _v: &iceberg::spec::VariantType) -> Result { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Conversion from VariantType is not supported for Glue", + )) + } } #[cfg(test)] diff --git a/crates/catalog/hms/src/schema.rs b/crates/catalog/hms/src/schema.rs index c23b48719d..b43ef2b7ef 100644 --- a/crates/catalog/hms/src/schema.rs +++ b/crates/catalog/hms/src/schema.rs @@ -139,6 +139,13 @@ impl SchemaVisitor for HiveSchemaBuilder { Ok(hive_type) } + + fn variant(&mut self, _v: &iceberg::spec::VariantType) -> Result { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Conversion from VariantType is not supported for HMS", + )) + } } #[cfg(test)] diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index dfee7ed87b..2406996eba 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -33,7 +33,7 @@ use crate::io::FileIO; use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; use crate::spec::{ DataContentType, Datum, ListType, MapType, NestedField, NestedFieldRef, PartnerAccessor, - PrimitiveType, Schema, SchemaRef, SchemaWithPartnerVisitor, StructType, Type, + PrimitiveType, Schema, SchemaRef, SchemaWithPartnerVisitor, StructType, Type, VariantType, visit_schema_with_partner, }; use crate::{Error, ErrorKind, Result}; @@ -527,6 +527,10 @@ impl SchemaWithPartnerVisitor for EqDelColumnProcessor<'_> { fn primitive(&mut self, _primitive: &PrimitiveType, _partner: &ArrayRef) -> Result<()> { Ok(()) } + + fn variant(&mut self, _v: &VariantType, _partner: &ArrayRef) -> Result<()> { + Ok(()) + } } struct EqDelRecordBatchPartnerAccessor; diff --git a/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs b/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs index e514457887..d01f3e9e56 100644 --- a/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs +++ b/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs @@ -28,7 +28,7 @@ use crate::Result; use crate::arrow::{ArrowArrayAccessor, FieldMatchMode}; use crate::spec::{ ListType, MapType, NestedFieldRef, PrimitiveType, Schema, SchemaRef, SchemaWithPartnerVisitor, - StructType, visit_struct_with_partner, + StructType, VariantType, visit_struct_with_partner, }; macro_rules! cast_and_update_cnt_map { @@ -122,6 +122,10 @@ impl SchemaWithPartnerVisitor for NanValueCountVisitor { Ok(()) } + fn variant(&mut self, _v: &VariantType, _col: &ArrayRef) -> Result { + Ok(()) + } + fn after_struct_field(&mut self, field: &NestedFieldRef, partner: &ArrayRef) -> Result<()> { let field_id = field.id; count_float_nans!(partner, self, field_id); diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 93dbdaa35d..27d33453b8 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -682,6 +682,10 @@ impl ArrowReader { Self::include_leaf_field_id(&map_type.key_field, field_ids); Self::include_leaf_field_id(&map_type.value_field, field_ids); } + // Variant is a leaf type for Parquet projection purposes (like a primitive). + Type::Variant(_) => { + field_ids.push(field.id); + } } } @@ -754,8 +758,46 @@ impl ArrowReader { arrow_schema: &ArrowSchemaRef, type_promotion_is_valid: fn(Option<&PrimitiveType>, Option<&PrimitiveType>) -> bool, ) -> Result { - let mut column_map = HashMap::new(); + // Maps field_id → leaf column indices. Vec because variant contributes two + // leaves (metadata + value) under a single field ID. + let mut column_map: HashMap> = HashMap::new(); let fields = arrow_schema.fields(); + // HashSet for O(1) membership checks instead of O(n) slice scans. + let leaf_field_id_set: HashSet = leaf_field_ids.iter().copied().collect(); + + // Variant fields are an Iceberg leaf type but a Parquet GROUP. Their + // sub-fields (metadata, value) carry no embedded field IDs — only the + // parent group has the field ID. filter_leaves therefore never finds + // them via the standard field-ID scan below. + // + // Java's PruneColumns.variant() simply returns the group as-is with no + // type-compatibility check (isStruct() also short-circuits on isVariantType()). + // We replicate that here: pre-scan top-level Arrow struct fields whose + // field ID resolves to Type::Variant and record all their sub-fields so + // the second filter_leaves can include them directly. + let mut variant_sub_fields: HashMap = HashMap::new(); + for top_field in fields.iter() { + let Some(field_id) = top_field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .and_then(|s| i32::from_str(s).ok()) + else { + continue; + }; + if !leaf_field_id_set.contains(&field_id) { + continue; + } + let Some(iceberg_field) = iceberg_schema_of_task.field_by_id(field_id) else { + continue; + }; + if let Type::Variant(_) = iceberg_field.field_type.as_ref() + && let DataType::Struct(sub_fields) = top_field.data_type() + { + for sub_field in sub_fields { + variant_sub_fields.insert(sub_field.clone(), field_id); + } + } + } // Pre-project only the fields that have been selected, possibly avoiding converting // some Arrow types that are not yet supported. @@ -767,7 +809,7 @@ impl ArrowReader { .and_then(|field_id| i32::from_str(field_id).ok()) .is_some_and(|field_id| { projected_fields.insert((*f).clone(), field_id); - leaf_field_ids.contains(&field_id) + leaf_field_id_set.contains(&field_id) }) }), arrow_schema.metadata().clone(), @@ -775,6 +817,14 @@ impl ArrowReader { let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?; fields.filter_leaves(|idx, field| { + // Variant sub-fields: parent group carries the field ID, not the leaf. + // Skip type-promotion check — Type::Variant is not a primitive type + // (matches Java's PruneColumns.variant() which returns the group unchanged). + if let Some(&variant_field_id) = variant_sub_fields.get(field) { + column_map.entry(variant_field_id).or_default().push(idx); + return true; + } + let Some(field_id) = projected_fields.get(field).cloned() else { return false; }; @@ -796,7 +846,7 @@ impl ArrowReader { return false; } - column_map.insert(field_id, idx); + column_map.entry(field_id).or_default().push(idx); true }); @@ -804,8 +854,8 @@ impl ArrowReader { // We only project existing columns; RecordBatchTransformer adds default/NULL values. let mut indices = vec![]; for field_id in leaf_field_ids { - if let Some(col_idx) = column_map.get(field_id) { - indices.push(*col_idx); + if let Some(col_indices) = column_map.get(field_id) { + indices.extend_from_slice(col_indices); } } @@ -1835,7 +1885,8 @@ mod tests { use crate::io::FileIO; use crate::scan::{FileScanTask, FileScanTaskDeleteFile, FileScanTaskStream}; use crate::spec::{ - DataContentType, DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type, + DataContentType, DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, + Type, VariantType, }; fn table_schema_simple() -> SchemaRef { @@ -2003,6 +2054,100 @@ message schema { assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0])); } + /// Variant fields are an Iceberg leaf type but a Parquet GROUP whose sub-fields carry + /// no embedded field IDs. The projection mask must include both metadata and value + /// leaves when the variant field ID is requested, and must not drop the variant when + /// it is projected alongside ordinary primitive columns. + #[test] + fn test_arrow_projection_mask_variant() { + // Iceberg schema: c1 (String, id=1) + v (Variant, id=2) + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "v", Type::Variant(VariantType)).into(), + ]) + .build() + .unwrap(), + ); + + // Arrow schema: c1 with field ID 1; v as Struct(metadata: Binary, value: Binary) + // with field ID 2 on the struct but NO field IDs on the sub-fields — that is the + // Parquet variant wire format. + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new( + "v", + DataType::Struct(arrow_schema::Fields::from(vec![ + Arc::new(Field::new("metadata", DataType::Binary, false)), + Arc::new(Field::new("value", DataType::Binary, false)), + ])), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + ])); + + // Parquet message: c1 is leaf 0; variant sub-fields metadata=leaf 1, value=leaf 2. + // No field IDs on sub-leaves — matching the real Iceberg/Spark-written variant format. + let message_type = " +message schema { + required binary c1 (STRING) = 1; + required group v = 2 { + required binary metadata; + required binary value; + } +} +"; + let parquet_type = parse_message_type(message_type).expect("should parse schema"); + let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type)); + + // Both fields: all three leaves must be included. + let mask = ArrowReader::get_arrow_projection_mask( + &[1, 2], + &schema, + &parquet_schema, + &arrow_schema, + false, + ) + .expect("projection mask for c1 + v"); + assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0, 1, 2])); + + // Variant only: leaves 1 (metadata) and 2 (value) must be included. + let mask_variant_only = ArrowReader::get_arrow_projection_mask( + &[2], + &schema, + &parquet_schema, + &arrow_schema, + false, + ) + .expect("projection mask for v only"); + assert_eq!( + mask_variant_only, + ProjectionMask::leaves(&parquet_schema, vec![1, 2]), + ); + + // Primitive only: leaf 0 (c1) must be included; variant NOT included. + let mask_primitive_only = ArrowReader::get_arrow_projection_mask( + &[1], + &schema, + &parquet_schema, + &arrow_schema, + false, + ) + .expect("projection mask for c1 only"); + assert_eq!( + mask_primitive_only, + ProjectionMask::leaves(&parquet_schema, vec![0]), + ); + } + #[tokio::test] async fn test_kleene_logic_or_behaviour() { // a IS NULL OR a = 'foo' diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index bd9e249f48..dae3974311 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -35,7 +35,7 @@ use crate::error::Result; use crate::spec::decimal_utils::i128_from_be_bytes; use crate::spec::{ Datum, FIRST_FIELD_ID, ListType, MapType, NestedField, NestedFieldRef, PrimitiveLiteral, - PrimitiveType, Schema, SchemaVisitor, StructType, Type, + PrimitiveType, Schema, SchemaVisitor, StructType, Type, VariantType, }; use crate::{Error, ErrorKind}; @@ -689,6 +689,16 @@ impl SchemaVisitor for ToArrowSchemaConverter { } } } + + fn variant(&mut self, _v: &VariantType) -> crate::Result { + // Variant is stored as a struct with two required binary fields (no field IDs on sub-fields). + // Uses Binary (not LargeBinary) matching the Parquet BINARY primitive directly. + let metadata_field = Field::new("metadata", DataType::Binary, false); + let value_field = Field::new("value", DataType::Binary, false); + Ok(ArrowSchemaOrFieldOrType::Type(DataType::Struct( + vec![metadata_field, value_field].into(), + ))) + } } /// Convert iceberg schema to an arrow schema. @@ -1697,6 +1707,15 @@ mod tests { simple_field("map", map, false, "16"), simple_field("struct", r#struct, false, "17"), simple_field("uuid", DataType::FixedSizeBinary(16), false, "30"), + simple_field( + "v", + DataType::Struct(Fields::from(vec![ + Field::new("metadata", DataType::Binary, false), + Field::new("value", DataType::Binary, false), + ])), + true, + "31", + ), ]) } @@ -1880,6 +1899,12 @@ mod tests { "name":"uuid", "required":true, "type":"uuid" + }, + { + "id":31, + "name":"v", + "required":false, + "type":"variant" } ], "identifier-field-ids":[] diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index 30b47d83fc..689adf8957 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -30,7 +30,7 @@ use uuid::Uuid; use super::get_field_id_from_metadata; use crate::spec::{ ListType, Literal, Map, MapType, NestedField, PartnerAccessor, PrimitiveLiteral, PrimitiveType, - SchemaWithPartnerVisitor, Struct, StructType, Type, visit_struct_with_partner, + SchemaWithPartnerVisitor, Struct, StructType, Type, VariantType, visit_struct_with_partner, visit_type_with_partner, }; use crate::{Error, ErrorKind, Result}; @@ -426,6 +426,13 @@ impl SchemaWithPartnerVisitor for ArrowArrayToIcebergStructConverter { } } } + + fn variant(&mut self, _v: &VariantType, _partner: &ArrayRef) -> Result>> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Converting variant Arrow array to Iceberg literal is not supported yet", + )) + } } /// Defines how Arrow fields are matched with Iceberg fields when converting data. diff --git a/crates/iceberg/src/avro/schema.rs b/crates/iceberg/src/avro/schema.rs index fdbc680977..3d603e9843 100644 --- a/crates/iceberg/src/avro/schema.rs +++ b/crates/iceberg/src/avro/schema.rs @@ -28,7 +28,7 @@ use serde_json::{Number, Value}; use crate::spec::{ ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, SchemaVisitor, - StructType, Type, visit_schema, + StructType, Type, VariantType, visit_schema, }; use crate::{Error, ErrorKind, Result, ensure_data_valid}; @@ -37,6 +37,7 @@ const FIELD_ID_PROP: &str = "field-id"; const KEY_ID: &str = "key-id"; const VALUE_ID: &str = "value-id"; const MAP_LOGICAL_TYPE: &str = "map"; +const VARIANT_LOGICAL_TYPE: &str = "variant"; // This const may better to maintain in avro-rs. const LOGICAL_TYPE: &str = "logicalType"; @@ -220,6 +221,39 @@ impl SchemaVisitor for SchemaToAvroSchema { } } + fn variant(&mut self, _v: &VariantType) -> Result { + let fields = vec![ + AvroRecordField { + name: "metadata".to_string(), + schema: AvroSchema::Bytes, + order: RecordFieldOrder::Ignore, + position: 0, + doc: None, + aliases: None, + default: None, + custom_attributes: Default::default(), + }, + AvroRecordField { + name: "value".to_string(), + schema: AvroSchema::Bytes, + order: RecordFieldOrder::Ignore, + position: 1, + doc: None, + aliases: None, + default: None, + custom_attributes: Default::default(), + }, + ]; + let mut schema = avro_record_schema(VARIANT_LOGICAL_TYPE, fields)?; + if let AvroSchema::Record(record) = &mut schema { + record.attributes.insert( + LOGICAL_TYPE.to_string(), + Value::String(VARIANT_LOGICAL_TYPE.to_string()), + ); + } + Ok(Either::Left(schema)) + } + fn primitive(&mut self, p: &PrimitiveType) -> Result { let avro_schema = match p { PrimitiveType::Boolean => AvroSchema::Boolean, @@ -435,6 +469,13 @@ impl AvroSchemaVisitor for AvroSchemaToSchema { record: &RecordSchema, field_types: Vec>, ) -> Result> { + // A variant is encoded as a record with logicalType "variant" — return it directly + // rather than trying to build a struct from its metadata/value byte fields. + if record.attributes.get(LOGICAL_TYPE).and_then(Value::as_str) == Some(VARIANT_LOGICAL_TYPE) + { + return Ok(Some(Type::Variant(VariantType))); + } + let mut fields = Vec::with_capacity(field_types.len()); for (avro_field, field_type) in record.fields.iter().zip_eq(field_types) { let field_id = @@ -614,7 +655,9 @@ mod tests { use super::*; use crate::avro::schema::AvroSchemaToSchema; - use crate::spec::{ListType, MapType, NestedField, PrimitiveType, Schema, StructType, Type}; + use crate::spec::{ + ListType, MapType, NestedField, PrimitiveType, Schema, StructType, Type, VariantType, + }; fn read_test_data_file_to_avro_schema(filename: &str) -> AvroSchema { let input = read_to_string(format!( @@ -1212,4 +1255,97 @@ mod tests { converter.primitive(&avro_schema).unwrap().unwrap() ); } + + /// Adapted from Java TestSchemaConversions.testVariantConversion + #[test] + fn test_variant_schema_conversion() { + let avro_schema = AvroSchema::parse_str( + r#" +{ + "type": "record", + "name": "test_schema", + "fields": [ + { + "name": "variantCol1", + "type": { + "type": "record", + "name": "r1", + "logicalType": "variant", + "fields": [ + {"name": "metadata", "type": "bytes"}, + {"name": "value", "type": "bytes"} + ] + }, + "field-id": 1 + }, + { + "name": "variantCol2", + "type": { + "type": "record", + "name": "r2", + "logicalType": "variant", + "fields": [ + {"name": "metadata", "type": "bytes"}, + {"name": "value", "type": "bytes"} + ] + }, + "field-id": 2 + } + ] +} + "#, + ) + .unwrap(); + + let iceberg_schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "variantCol1", Type::Variant(VariantType)).into(), + NestedField::required(2, "variantCol2", Type::Variant(VariantType)).into(), + ]) + .build() + .unwrap(); + + check_schema_conversion(avro_schema, iceberg_schema); + } + + #[test] + fn test_optional_variant_schema_conversion() { + let avro_schema = AvroSchema::parse_str( + r#" +{ + "type": "record", + "name": "test_schema", + "fields": [ + { + "name": "v", + "type": [ + "null", + { + "type": "record", + "name": "r1", + "logicalType": "variant", + "fields": [ + {"name": "metadata", "type": "bytes"}, + {"name": "value", "type": "bytes"} + ] + } + ], + "default": null, + "field-id": 1 + } + ] +} + "#, + ) + .unwrap(); + + let iceberg_schema = Schema::builder() + .with_fields(vec![ + NestedField::optional(1, "v", Type::Variant(VariantType)).into(), + ]) + .build() + .unwrap(); + + check_schema_conversion(avro_schema, iceberg_schema); + } } diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index 0379465584..01e84a7f66 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -32,8 +32,8 @@ use serde_json::Value as JsonValue; use super::values::Literal; use crate::ensure_data_valid; use crate::error::Result; -use crate::spec::PrimitiveLiteral; use crate::spec::datatypes::_decimal::{MAX_PRECISION, REQUIRED_LENGTH}; +use crate::spec::{FormatVersion, PrimitiveLiteral}; /// Field name for list type. pub const LIST_FIELD_NAME: &str = "element"; @@ -42,6 +42,11 @@ pub const MAP_KEY_FIELD_NAME: &str = "key"; /// Field name for map type's value. pub const MAP_VALUE_FIELD_NAME: &str = "value"; +/// Minimum format version required for nanosecond-precision timestamp types (v3). +pub const MIN_FORMAT_VERSION_TIMESTAMP_NS: FormatVersion = FormatVersion::V3; +/// Minimum format version required for the variant type (v3). +pub const MIN_FORMAT_VERSION_VARIANT: FormatVersion = FormatVersion::V3; + pub(crate) const MAX_DECIMAL_BYTES: u32 = 24; pub(crate) const MAX_DECIMAL_PRECISION: u32 = 38; @@ -90,6 +95,8 @@ pub enum Type { List(ListType), /// Map type Map(MapType), + /// Variant Type + Variant(VariantType), } impl fmt::Display for Type { @@ -99,6 +106,7 @@ impl fmt::Display for Type { Type::Struct(s) => write!(f, "{s}"), Type::List(_) => write!(f, "list"), Type::Map(_) => write!(f, "map"), + Type::Variant(_) => write!(f, "variant"), } } } @@ -122,6 +130,12 @@ impl Type { matches!(self, Type::Struct(_) | Type::List(_) | Type::Map(_)) } + /// Whether the type is variant type. + #[inline(always)] + pub fn is_variant(&self) -> bool { + matches!(self, Type::Variant(_)) + } + /// Convert Type to reference of PrimitiveType pub fn as_primitive_type(&self) -> Option<&PrimitiveType> { if let Type::Primitive(primitive_type) = self { @@ -178,6 +192,17 @@ impl Type { Type::Primitive(PrimitiveType::Float) | Type::Primitive(PrimitiveType::Double) ) } + + /// Returns the minimum format version required for the type. + #[inline(always)] + pub fn min_format_version(&self) -> FormatVersion { + match self { + Type::Primitive(PrimitiveType::TimestampNs) + | Type::Primitive(PrimitiveType::TimestamptzNs) => MIN_FORMAT_VERSION_TIMESTAMP_NS, + Type::Variant(_) => MIN_FORMAT_VERSION_VARIANT, + _ => FormatVersion::V1, + } + } } impl From for Type { @@ -710,6 +735,7 @@ pub(super) mod _serde { use crate::spec::datatypes::Type::Map; use crate::spec::datatypes::{ ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, StructType, Type, + VariantType, }; /// List type for serialization and deserialization @@ -737,6 +763,7 @@ pub(super) mod _serde { value: Cow<'a, Type>, }, Primitive(PrimitiveType), + Variant(VariantType), } impl From> for Type { @@ -775,6 +802,7 @@ pub(super) mod _serde { Self::Struct(StructType::new(fields.into_owned())) } SerdeType::Primitive(p) => Self::Primitive(p), + SerdeType::Variant(v) => Self::Variant(v), } } } @@ -801,6 +829,7 @@ pub(super) mod _serde { fields: Cow::Borrowed(&s.fields), }, Type::Primitive(p) => SerdeType::Primitive(p.clone()), + Type::Variant(v) => SerdeType::Variant(*v), } } } @@ -828,6 +857,42 @@ impl MapType { } } +/// Variant type - can hold semi-structured data of any type. +/// This is an Iceberg V3 feature. +#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)] +pub struct VariantType; + +impl fmt::Display for VariantType { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "variant") + } +} + +impl From for Type { + fn from(_: VariantType) -> Self { + Type::Variant(VariantType) + } +} + +impl Serialize for VariantType { + fn serialize(&self, serializer: S) -> std::result::Result + where S: Serializer { + serializer.serialize_str("variant") + } +} + +impl<'de> Deserialize<'de> for VariantType { + fn deserialize(deserializer: D) -> std::result::Result + where D: Deserializer<'de> { + let s = String::deserialize(deserializer)?; + if s == "variant" { + Ok(VariantType) + } else { + Err(D::Error::custom(format!("expected 'variant', got '{s}'"))) + } + } +} + #[cfg(test)] mod tests { use pretty_assertions::assert_eq; @@ -1216,6 +1281,17 @@ mod tests { } } + #[test] + fn variant_type_serde() { + let json = r#"{"id": 1, "name": "v", "required": true, "type": "variant"}"#; + let field: NestedField = serde_json::from_str(json).unwrap(); + assert_eq!(*field.field_type, Type::Variant(VariantType)); + + let serialized = serde_json::to_string(&field).unwrap(); + let roundtrip: NestedField = serde_json::from_str(&serialized).unwrap(); + assert_eq!(field, roundtrip); + } + #[test] fn struct_type_with_type_field() { // Test that StructType properly deserializes JSON with "type":"struct" field diff --git a/crates/iceberg/src/spec/schema/id_reassigner.rs b/crates/iceberg/src/spec/schema/id_reassigner.rs index 5dbb370001..72caf30121 100644 --- a/crates/iceberg/src/spec/schema/id_reassigner.rs +++ b/crates/iceberg/src/spec/schema/id_reassigner.rs @@ -102,6 +102,7 @@ impl ReassignFieldIds { value_field: Arc::new(value_field), })) } + Type::Variant(v) => Ok(Type::Variant(v)), } } diff --git a/crates/iceberg/src/spec/schema/index.rs b/crates/iceberg/src/spec/schema/index.rs index d4e77ab2aa..f521b2d6c9 100644 --- a/crates/iceberg/src/spec/schema/index.rs +++ b/crates/iceberg/src/spec/schema/index.rs @@ -53,6 +53,10 @@ pub fn index_by_id(r#struct: &StructType) -> Result fn primitive(&mut self, _: &PrimitiveType) -> Result { Ok(()) } + + fn variant(&mut self, _v: &crate::spec::VariantType) -> Result { + Ok(()) + } } let mut index = IndexById(HashMap::new()); @@ -145,6 +149,10 @@ pub fn index_parents(r#struct: &StructType) -> Result> { fn primitive(&mut self, _p: &PrimitiveType) -> Result { Ok(()) } + + fn variant(&mut self, _v: &crate::spec::VariantType) -> Result { + Ok(()) + } } let mut index = IndexByParent { @@ -293,6 +301,10 @@ impl SchemaVisitor for IndexByName { fn primitive(&mut self, _p: &PrimitiveType) -> Result { Ok(()) } + + fn variant(&mut self, _v: &crate::spec::VariantType) -> Result { + Ok(()) + } } #[cfg(test)] diff --git a/crates/iceberg/src/spec/schema/mod.rs b/crates/iceberg/src/spec/schema/mod.rs index 13ad41818b..f0f6005ead 100644 --- a/crates/iceberg/src/spec/schema/mod.rs +++ b/crates/iceberg/src/spec/schema/mod.rs @@ -39,6 +39,7 @@ pub use self::prune_columns::prune_columns; use super::NestedField; use crate::error::Result; use crate::expr::accessor::StructAccessor; +use crate::spec::FormatVersion; use crate::spec::datatypes::{ LIST_FIELD_NAME, ListType, MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME, MapType, NestedFieldRef, PrimitiveType, StructType, Type, @@ -419,6 +420,44 @@ impl Schema { pub fn field_id_to_fields(&self) -> &HashMap { &self.id_to_field } + + /// Check that all types in this schema are supported by the given format version. + /// + /// Mirrors Java's `Schema.checkCompatibility()`. Returns an error listing every + /// incompatible field if any are found. + /// + /// Types with a minimum format version: + /// - `TimestampNs` / `TimestamptzNs` → v3+ + /// - `Variant` → v3+ + pub fn check_format_compatibility(&self, format_version: FormatVersion) -> Result<()> { + let mut problems: Vec = Vec::new(); + + for field in self.id_to_field.values() { + let min_version = field.field_type.min_format_version(); + + if format_version < min_version { + let name = self + .name_by_field_id(field.id) + .unwrap_or(field.name.as_str()); + problems.push(format!( + "Invalid type for {name}: {} is not supported until v{min_version} but format version is v{format_version}.", + field.field_type, + )); + } + } + + if problems.is_empty() { + Ok(()) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid schema for v{format_version}:\n- {}", + problems.join("\n- ") + ), + )) + } + } } impl Display for Schema { diff --git a/crates/iceberg/src/spec/schema/prune_columns.rs b/crates/iceberg/src/spec/schema/prune_columns.rs index 14f1bfd25f..06b16df35d 100644 --- a/crates/iceberg/src/spec/schema/prune_columns.rs +++ b/crates/iceberg/src/spec/schema/prune_columns.rs @@ -238,6 +238,10 @@ impl SchemaVisitor for PruneColumn { fn primitive(&mut self, _p: &PrimitiveType) -> Result> { Ok(None) } + + fn variant(&mut self, _v: &crate::spec::VariantType) -> Result { + Ok(None) + } } #[cfg(test)] diff --git a/crates/iceberg/src/spec/schema/visitor.rs b/crates/iceberg/src/spec/schema/visitor.rs index 50f7c04caa..3287ad7ee8 100644 --- a/crates/iceberg/src/spec/schema/visitor.rs +++ b/crates/iceberg/src/spec/schema/visitor.rs @@ -16,6 +16,7 @@ // under the License. use super::*; +use crate::spec::VariantType; /// A post order schema visitor. /// @@ -69,6 +70,9 @@ pub trait SchemaVisitor { fn map(&mut self, map: &MapType, key_value: Self::T, value: Self::T) -> Result; /// Called when see a primitive type. fn primitive(&mut self, p: &PrimitiveType) -> Result; + + /// Called when see a variant type. + fn variant(&mut self, _v: &VariantType) -> Result; } /// Visiting a type in post order. @@ -99,6 +103,7 @@ pub(crate) fn visit_type(r#type: &Type, visitor: &mut V) -> Re visitor.map(map, key_result, value_result) } Type::Struct(s) => visit_struct(s, visitor), + Type::Variant(v) => visitor.variant(v), } } @@ -185,6 +190,8 @@ pub trait SchemaWithPartnerVisitor

{ ) -> Result; /// Called when see a primitive type. fn primitive(&mut self, p: &PrimitiveType, partner: &P) -> Result; + /// Called when see a variant type. + fn variant(&mut self, _v: &VariantType, _partner: &P) -> Result; } /// Accessor used to get child partner from parent partner. @@ -242,6 +249,7 @@ pub(crate) fn visit_type_with_partner, A: Part visitor.map(map, partner, key_result, value_result) } Type::Struct(s) => visit_struct_with_partner(s, partner, visitor, accessor), + Type::Variant(v) => visitor.variant(v, partner), } } diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 28f753e9f1..74007858d5 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -491,6 +491,7 @@ impl TableMetadata { // Normalize location (remove trailing slash) self.location = self.location.trim_end_matches('/').to_string(); self.validate_snapshot_sequence_number()?; + self.validate_schema_format_compatibility()?; self.try_normalize_partition_spec()?; self.try_normalize_sort_order()?; Ok(self) @@ -705,6 +706,15 @@ impl TableMetadata { Ok(()) } + + /// Validates that every type used across all schemas is supported by the + /// table's format version. Delegates to [`Schema::check_format_compatibility`]. + fn validate_schema_format_compatibility(&self) -> Result<()> { + for schema in self.schemas.values() { + schema.check_format_compatibility(self.format_version)?; + } + Ok(()) + } } pub(super) mod _serde { diff --git a/crates/iceberg/src/spec/values/literal.rs b/crates/iceberg/src/spec/values/literal.rs index e82fa197cd..1068c6b785 100644 --- a/crates/iceberg/src/spec/values/literal.rs +++ b/crates/iceberg/src/spec/values/literal.rs @@ -595,6 +595,10 @@ impl Literal { )) } } + Type::Variant(_) => Err(Error::new( + ErrorKind::DataInvalid, + "Variant type is not supported for single-value JSON serialization", + )), } } diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index da04d5435b..cf21367620 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -40,7 +40,7 @@ use crate::io::{FileIO, FileWrite, OutputFile}; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType, NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct, - StructType, TableMetadata, Type, visit_schema, + StructType, TableMetadata, Type, VariantType, visit_schema, }; use crate::transform::create_transform_function; use crate::writer::{CurrentFileStatus, DataFile}; @@ -113,6 +113,22 @@ impl IndexByParquetPathName { pub fn get(&self, name: &str) -> Option<&i32> { self.name_to_id.get(name) } + + fn insert_current_path(&mut self) -> Result<()> { + let full_name = self.field_names.iter().map(String::as_str).join("."); + let field_id = self.field_id; + if let Some(existing_field_id) = self.name_to_id.get(full_name.as_str()) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid schema: multiple fields for name {full_name}: {field_id} and {existing_field_id}" + ), + )); + } else { + self.name_to_id.insert(full_name, field_id); + } + Ok(()) + } } impl Default for IndexByParquetPathName { @@ -191,20 +207,11 @@ impl SchemaVisitor for IndexByParquetPathName { } fn primitive(&mut self, _p: &PrimitiveType) -> Result { - let full_name = self.field_names.iter().map(String::as_str).join("."); - let field_id = self.field_id; - if let Some(existing_field_id) = self.name_to_id.get(full_name.as_str()) { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Invalid schema: multiple fields for name {full_name}: {field_id} and {existing_field_id}" - ), - )); - } else { - self.name_to_id.insert(full_name, field_id); - } + self.insert_current_path() + } - Ok(()) + fn variant(&mut self, _v: &VariantType) -> Result { + self.insert_current_path() } } diff --git a/crates/integration_tests/tests/read_variant.rs b/crates/integration_tests/tests/read_variant.rs new file mode 100644 index 0000000000..710d228ab4 --- /dev/null +++ b/crates/integration_tests/tests/read_variant.rs @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for variant type support. +//! +//! These tests require a running Docker environment seeded by `dev/spark/provision.py`. +//! The Spark 4.0 provisioner creates `rest.default.test_variant_column` with a +//! `VARIANT` column containing three rows of JSON data. + +use arrow_schema::DataType; +use futures::TryStreamExt; +use iceberg::spec::Type; +use iceberg::{Catalog, CatalogBuilder, TableIdent}; +use iceberg_catalog_rest::RestCatalogBuilder; +use iceberg_integration_tests::get_test_fixture; + +/// Verifies that a table written by Spark with a VARIANT column has its schema +/// parsed into `Type::Variant` by the Rust iceberg implementation. +#[tokio::test] +async fn test_variant_schema_is_parsed() { + let fixture = get_test_fixture(); + let rest_catalog = RestCatalogBuilder::default() + .load("rest", fixture.catalog_config.clone()) + .await + .unwrap(); + + let table = rest_catalog + .load_table(&TableIdent::from_strs(["default", "test_variant_column"]).unwrap()) + .await + .unwrap(); + + let schema = table.metadata().current_schema(); + let variant_field = schema + .field_by_name("v") + .expect("field 'v' not found in schema"); + + assert!( + matches!(variant_field.field_type.as_ref(), Type::Variant(_)), + "Expected Type::Variant for field 'v', got {:?}", + variant_field.field_type, + ); +} + +/// Verifies that scanning a table with a VARIANT column produces an Arrow batch +/// where the variant column is represented as `Struct(metadata: Binary, value: Binary)`, +/// matching the Parquet physical layout (§3.3 of the Parquet Variant spec). +#[tokio::test] +async fn test_variant_arrow_schema() { + let fixture = get_test_fixture(); + let rest_catalog = RestCatalogBuilder::default() + .load("rest", fixture.catalog_config.clone()) + .await + .unwrap(); + + let table = rest_catalog + .load_table(&TableIdent::from_strs(["default", "test_variant_column"]).unwrap()) + .await + .unwrap(); + + let scan = table.scan().build().unwrap(); + let batch_stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert!(!batches.is_empty(), "expected at least one record batch"); + + // Variant column must be a struct with exactly two binary sub-fields + let v_col = batches[0] + .column_by_name("v") + .expect("column 'v' not found in batch"); + + let DataType::Struct(fields) = v_col.data_type() else { + panic!( + "Expected variant column to be DataType::Struct, got {:?}", + v_col.data_type() + ); + }; + + assert_eq!( + fields.len(), + 2, + "variant struct must have exactly 2 sub-fields" + ); + + let metadata_field = fields + .iter() + .find(|f| f.name() == "metadata") + .expect("sub-field 'metadata' not found"); + let value_field = fields + .iter() + .find(|f| f.name() == "value") + .expect("sub-field 'value' not found"); + + assert_eq!( + metadata_field.data_type(), + &DataType::Binary, + "'metadata' sub-field must be DataType::Binary" + ); + assert_eq!( + value_field.data_type(), + &DataType::Binary, + "'value' sub-field must be DataType::Binary" + ); +} + +/// Verifies that a variant column is NOT silently dropped when it is projected +/// alongside ordinary primitive columns (regression test for the projection bug +/// where variant sub-fields had no embedded Parquet field IDs and were therefore +/// excluded from `column_map`, causing the whole variant group to be omitted). +#[tokio::test] +async fn test_variant_projected_with_primitive_columns() { + let fixture = get_test_fixture(); + let rest_catalog = RestCatalogBuilder::default() + .load("rest", fixture.catalog_config.clone()) + .await + .unwrap(); + + let table = rest_catalog + .load_table(&TableIdent::from_strs(["default", "test_variant_column"]).unwrap()) + .await + .unwrap(); + + // Explicitly select only the two columns — this exercises the projection path + // that was previously broken for variant types. + let scan = table.scan().select(["id", "v"]).build().unwrap(); + let batch_stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert!(!batches.is_empty(), "expected at least one record batch"); + + let first_batch = &batches[0]; + + // Both columns must be present — the variant must not be silently dropped. + assert!( + first_batch.column_by_name("id").is_some(), + "column 'id' not found in projected batch" + ); + let v_col = first_batch + .column_by_name("v") + .expect("column 'v' was silently dropped from projected scan — projection bug regression"); + + // The variant column must still be a struct with the expected sub-fields. + let DataType::Struct(fields) = v_col.data_type() else { + panic!( + "Expected variant column to be DataType::Struct after projection, got {:?}", + v_col.data_type() + ); + }; + assert_eq!( + fields.len(), + 2, + "projected variant struct must have exactly 2 sub-fields" + ); + assert!( + fields.iter().any(|f| f.name() == "metadata"), + "projected variant struct must have a 'metadata' sub-field" + ); + assert!( + fields.iter().any(|f| f.name() == "value"), + "projected variant struct must have a 'value' sub-field" + ); + + // All three seeded rows must be readable. + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 3, "expected exactly 3 rows"); +} diff --git a/dev/spark/provision.py b/dev/spark/provision.py index 40f9ba0f38..c58f778fdf 100644 --- a/dev/spark/provision.py +++ b/dev/spark/provision.py @@ -129,6 +129,24 @@ spark.sql("ALTER TABLE rest.default.test_promote_partition_column ALTER COLUMN baz TYPE decimal(6, 2)") spark.sql("INSERT INTO rest.default.test_promote_partition_column VALUES (25, 22.25, 22.25)") +# Create a table with a variant column +spark.sql(""" +CREATE OR REPLACE TABLE rest.default.test_variant_column ( + id INT, + v VARIANT +) +USING iceberg +TBLPROPERTIES ('format-version'='3') +""") + +spark.sql(""" +INSERT INTO rest.default.test_variant_column +VALUES + (1, PARSE_JSON('{"a": 1, "b": "hello"}')), + (2, PARSE_JSON('[1, 2, 3]')), + (3, PARSE_JSON('42')) +""") + # Create a table with various types spark.sql(""" CREATE OR REPLACE TABLE rest.default.types_test USING ICEBERG AS From 466b07150970c679cd6cb8eeec9a73808e1175c3 Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Mon, 2 Mar 2026 09:57:04 +0100 Subject: [PATCH 2/2] fix: TableCreation uses correct format version --- crates/iceberg/src/spec/schema/mod.rs | 14 +++++++++++++- crates/integrations/datafusion/src/schema.rs | 5 +++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/spec/schema/mod.rs b/crates/iceberg/src/spec/schema/mod.rs index f0f6005ead..4c2daaf3f1 100644 --- a/crates/iceberg/src/spec/schema/mod.rs +++ b/crates/iceberg/src/spec/schema/mod.rs @@ -421,6 +421,18 @@ impl Schema { &self.id_to_field } + /// Returns the minimum [`FormatVersion`] required to represent all types in this schema. + /// + /// Iterates over every field and returns the highest minimum version among them, + /// defaulting to `FormatVersion::V1` if all types are universally supported. + pub fn min_format_version(&self) -> FormatVersion { + self.id_to_field + .values() + .map(|f| f.field_type.min_format_version()) + .max() + .unwrap_or(FormatVersion::V1) + } + /// Check that all types in this schema are supported by the given format version. /// /// Mirrors Java's `Schema.checkCompatibility()`. Returns an error listing every @@ -440,7 +452,7 @@ impl Schema { .name_by_field_id(field.id) .unwrap_or(field.name.as_str()); problems.push(format!( - "Invalid type for {name}: {} is not supported until v{min_version} but format version is v{format_version}.", + "Invalid type for {name}: {} is not supported until {min_version} but format version is {format_version}.", field.field_type, )); } diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 508aeb303b..af8932d734 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -29,6 +29,7 @@ use futures::StreamExt; use futures::future::try_join_all; use iceberg::arrow::arrow_schema_to_schema_auto_assign_ids; use iceberg::inspect::MetadataTableType; +use iceberg::spec::FormatVersion; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableCreation, TableIdent}; use crate::table::IcebergTableProvider; @@ -163,10 +164,14 @@ impl SchemaProvider for IcebergSchemaProvider { let iceberg_schema = arrow_schema_to_schema_auto_assign_ids(df_schema.as_ref()) .map_err(to_datafusion_error)?; + // Use at least V2, and upgrade to V3 if the schema requires it (e.g. timestamp_ns / variant). + let format_version = iceberg_schema.min_format_version().max(FormatVersion::V2); + // Create the table in the Iceberg catalog let table_creation = TableCreation::builder() .name(name.clone()) .schema(iceberg_schema) + .format_version(format_version) .build(); let catalog = self.catalog.clone();