Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds support for Iceberg key encoding by implementing the IcebergKeyEncoder struct that follows Fluss Java's encoding specifications. The implementation enables Fluss to encode row keys for the Iceberg data lake format.
Changes:
- Added new
IcebergKeyEncoderimplementation with support for scalar types (Int, BigInt, Float, Double, Date, Time, Timestamp, Decimal, String, Char, Binary, Bytes) - Integrated
IcebergKeyEncoderinto theKeyEncoderFactoryto handle Iceberg format - Added comprehensive test coverage for supported data types
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| crates/fluss/src/row/encode/mod.rs | Replaces the "not yet implemented" error for Iceberg format with the new IcebergKeyEncoder implementation |
| crates/fluss/src/row/encode/iceberg_key_encoder.rs | Complete implementation of the Iceberg key encoder with type validation, encoding logic, and tests for supported types |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| (DataType::Timestamp(_), Datum::TimestampNtz(ts)) => { | ||
| let micros = | ||
| ts.get_millisecond() * 1000 + (ts.get_nano_of_millisecond() as i64 / 1000); | ||
| micros.to_le_bytes().to_vec() |
There was a problem hiding this comment.
Potential integer overflow in timestamp conversion. At line 138, ts.get_millisecond() * 1000 could overflow for very large timestamp values. The maximum i64 value is 9,223,372,036,854,775,807, so multiplying a timestamp in milliseconds by 1000 could overflow for timestamps beyond year 292,277,026. Consider using checked_mul() to detect overflow, or documenting this limitation if it's acceptable based on the expected range of timestamps in Iceberg format.
| fn validate_supported_type(field_type: &DataType) -> Result<()> { | ||
| match field_type { | ||
| DataType::Int(_) | ||
| | DataType::BigInt(_) | ||
| | DataType::Float(_) | ||
| | DataType::Double(_) | ||
| | DataType::Date(_) | ||
| | DataType::Time(_) | ||
| | DataType::Timestamp(_) | ||
| | DataType::Decimal(_) | ||
| | DataType::String(_) | ||
| | DataType::Char(_) | ||
| | DataType::Binary(_) | ||
| | DataType::Bytes(_) => Ok(()), | ||
|
|
||
| DataType::Array(_) => Err(IllegalArgument { | ||
| message: | ||
| "Array types cannot be used as bucket keys. Bucket keys must be scalar types." | ||
| .to_string(), | ||
| }), | ||
| DataType::Map(_) => Err(IllegalArgument { | ||
| message: | ||
| "Map types cannot be used as bucket keys. Bucket keys must be scalar types." | ||
| .to_string(), | ||
| }), | ||
| other => Err(IllegalArgument { | ||
| message: format!("Unsupported type for Iceberg key encoder: {other}"), | ||
| }), | ||
| } | ||
| } |
There was a problem hiding this comment.
The type validation is incomplete. Several data types are supported by the FieldGetter but are missing from this validation:
- Boolean (DataType::Boolean) - which would be returned as Datum::Bool
- TinyInt (DataType::TinyInt) - which would be returned as Datum::Int8
- SmallInt (DataType::SmallInt) - which would be returned as Datum::Int16
- TimestampLtz (DataType::TimestampLTz) - which would be returned as Datum::TimestampLtz
Without explicit handling for these types, if a user attempts to use them as key fields, they would get a generic "Unsupported type" error at validation time. However, if this is intentional and matches the Java implementation behavior, then these types should be listed explicitly in validation with appropriate error messages explaining they are not supported for Iceberg keys.
| let bytes: Vec<u8> = match (&self.field_type, value) { | ||
| (DataType::Int(_), Datum::Int32(v)) => (v as i64).to_le_bytes().to_vec(), | ||
| (DataType::Date(_), Datum::Date(v)) => (v.get_inner() as i64).to_le_bytes().to_vec(), | ||
|
|
||
| (DataType::Time(_), Datum::Time(v)) => { | ||
| let micros = v.get_inner() as i64 * 1000; | ||
| micros.to_le_bytes().to_vec() | ||
| } | ||
|
|
||
| (DataType::BigInt(_), Datum::Int64(v)) => v.to_le_bytes().to_vec(), | ||
| (DataType::Float(_), Datum::Float32(v)) => v.0.to_le_bytes().to_vec(), | ||
| (DataType::Double(_), Datum::Float64(v)) => v.0.to_le_bytes().to_vec(), | ||
|
|
||
| (DataType::Timestamp(_), Datum::TimestampNtz(ts)) => { | ||
| let micros = | ||
| ts.get_millisecond() * 1000 + (ts.get_nano_of_millisecond() as i64 / 1000); | ||
| micros.to_le_bytes().to_vec() | ||
| } | ||
|
|
||
| (DataType::Decimal(_), Datum::Decimal(d)) => d.to_unscaled_bytes(), | ||
| (DataType::String(_), Datum::String(s)) => s.as_bytes().to_vec(), | ||
| (DataType::Char(_), Datum::String(s)) => s.as_bytes().to_vec(), | ||
| (DataType::Binary(_), Datum::Blob(b)) => b.as_ref().to_vec(), | ||
| (DataType::Bytes(_), Datum::Blob(b)) => b.as_ref().to_vec(), | ||
|
|
||
| // FieldGetter uses Datum::String for CHAR, Datum::Blob for BINARY/BYTES. | ||
| (expected_type, actual) => { | ||
| return Err(IllegalArgument { | ||
| message: format!( | ||
| "IcebergKeyEncoder type mismatch: expected {expected_type}, got {actual:?}" | ||
| ), | ||
| }); | ||
| } | ||
| }; |
There was a problem hiding this comment.
The encode_key function is missing match arms for several data types that could potentially be returned by FieldGetter:
- Boolean (DataType::Boolean) - would return Datum::Bool
- TinyInt (DataType::TinyInt) - would return Datum::Int8
- SmallInt (DataType::SmallInt) - would return Datum::Int16
- TimestampLtz (DataType::TimestampLTz) - would return Datum::TimestampLtz
If these types pass validation (e.g., if validation is updated), the match statement would fail at runtime with an "IcebergKeyEncoder type mismatch" error at line 150-154. Consider adding explicit handling for these types in the match statement, even if only to encode them (for numeric types) or to return a more specific error message explaining they are unsupported for Iceberg keys.
|
Will you have some time to update the PR following the review? Thanks a lot! @zuston |
Sure, will be updated in this week |
Purpose
Linked issue: close #xxx
Brief change log
Tests
API and Format
Documentation