Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions api/src/main/java/org/apache/iceberg/variants/SerializedArray.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,44 @@ static SerializedArray from(VariantMetadata metadata, byte[] bytes) {
}

static SerializedArray from(VariantMetadata metadata, ByteBuffer value, int header) {
return from(metadata, value, header, 0);
}

static SerializedArray from(VariantMetadata metadata, ByteBuffer value, int header, int depth) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be private?

Preconditions.checkArgument(
value.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian");
BasicType basicType = VariantUtil.basicType(header);
Preconditions.checkArgument(
basicType == BasicType.ARRAY, "Invalid array, basic type: " + basicType);
return new SerializedArray(metadata, value, header);
return new SerializedArray(metadata, value, header, depth);
}

private final VariantMetadata metadata;
private final ByteBuffer value;
private final int offsetSize;
private final int offsetListOffset;
private final int dataOffset;
private final int depth;
private final VariantValue[] array;

private SerializedArray(VariantMetadata metadata, ByteBuffer value, int header) {
private SerializedArray(VariantMetadata metadata, ByteBuffer value, int header, int depth) {
this.metadata = metadata;
this.value = value;
this.depth = depth;
this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT);
int numElementsSize = ((header & IS_LARGE) == IS_LARGE) ? 4 : 1;
Preconditions.checkArgument(
value.remaining() >= HEADER_SIZE + numElementsSize,
"Invalid variant array: buffer too small for element count field");
int numElements = VariantUtil.readLittleEndianUnsigned(value, HEADER_SIZE, numElementsSize);
Preconditions.checkArgument(
numElements >= 0, "Invalid variant array: negative element count %s", numElements);
this.offsetListOffset = HEADER_SIZE + numElementsSize;
long offsetTableEnd = (long) offsetListOffset + ((long) numElements + 1L) * offsetSize;
Preconditions.checkArgument(
offsetTableEnd <= value.remaining(),
"Invalid variant array: element count %s exceeds buffer",
numElements);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just realised that this and the parquet-java hardening don't worry about leftover data. "don't do that" is implicit the policy there, being as it is useless.

I wonder what the rust parquet reader does.

this.dataOffset = offsetListOffset + ((1 + numElements) * offsetSize);
this.array = new VariantValue[numElements];
}
Expand All @@ -75,8 +91,16 @@ public VariantValue get(int index) {
int next =
VariantUtil.readLittleEndianUnsigned(
value, offsetListOffset + (offsetSize * (1 + index)), offsetSize);
long dataLen = value.remaining() - (long) dataOffset;
Preconditions.checkArgument(
offset >= 0 && next >= offset && next <= dataLen,
"Invalid variant array: offset range [%s, %s] out of data region [0, %s]",
offset,
next,
dataLen);
array[index] =
VariantValue.from(metadata, VariantUtil.slice(value, dataOffset + offset, next - offset));
VariantUtil.fromBuffer(
metadata, VariantUtil.slice(value, dataOffset + offset, next - offset), depth + 1);
}
return array[index];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,32 @@ static SerializedMetadata from(ByteBuffer metadata) {
private SerializedMetadata(ByteBuffer metadata, int header) {
this.isSorted = (header & SORTED_STRINGS) == SORTED_STRINGS;
this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT);
Preconditions.checkArgument(
metadata.remaining() >= HEADER_SIZE + offsetSize,
"Invalid variant metadata: buffer too small for dictionary size field");
int dictSize = VariantUtil.readLittleEndianUnsigned(metadata, HEADER_SIZE, offsetSize);
this.dict = new String[dictSize];
Preconditions.checkArgument(
dictSize >= 0, "Invalid variant metadata: negative dictionary size %s", dictSize);
this.offsetListOffset = HEADER_SIZE + offsetSize;
long offsetTableEnd = (long) offsetListOffset + ((long) dictSize + 1L) * offsetSize;
Preconditions.checkArgument(
offsetTableEnd <= metadata.remaining(),
"Invalid variant metadata: dictionary size %s exceeds buffer",
dictSize);
this.dict = new String[dictSize];
this.dataOffset = offsetListOffset + ((1 + dictSize) * offsetSize);
int endOffset =
dataOffset
+ VariantUtil.readLittleEndianUnsigned(
metadata, offsetListOffset + (offsetSize * dictSize), offsetSize);
if (endOffset < metadata.limit()) {
int lastOffset =
VariantUtil.readLittleEndianUnsigned(
metadata, offsetListOffset + (offsetSize * dictSize), offsetSize);
Preconditions.checkArgument(
lastOffset >= 0, "Invalid variant metadata: negative end offset %s", lastOffset);
long endOffsetLong = (long) dataOffset + lastOffset;
Preconditions.checkArgument(
endOffsetLong <= metadata.remaining(),
"Invalid variant metadata: end offset %s exceeds buffer",
endOffsetLong);
int endOffset = (int) endOffsetLong;
if (endOffset < metadata.remaining()) {
this.metadata = VariantUtil.slice(metadata, 0, endOffset);
} else {
this.metadata = metadata;
Expand Down Expand Up @@ -111,6 +128,12 @@ public String get(int index) {
int next =
VariantUtil.readLittleEndianUnsigned(
metadata, offsetListOffset + (offsetSize * (1 + index)), offsetSize);
Preconditions.checkArgument(
offset >= 0 && next >= offset && (long) dataOffset + next <= metadata.remaining(),
"Invalid variant metadata: dict entry %s offset range [%s, %s] invalid",
index,
offset,
next);
dict[index] = VariantUtil.readString(metadata, dataOffset + offset, next - offset);
}
return dict[index];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,16 @@ static SerializedObject from(VariantMetadata metadata, byte[] bytes) {
}

static SerializedObject from(VariantMetadata metadata, ByteBuffer value, int header) {
return from(metadata, value, header, 0);
}

static SerializedObject from(VariantMetadata metadata, ByteBuffer value, int header, int depth) {
Preconditions.checkArgument(
value.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian");
BasicType basicType = VariantUtil.basicType(header);
Preconditions.checkArgument(
basicType == BasicType.OBJECT, "Invalid object, basic type: " + basicType);
return new SerializedObject(metadata, value, header);
return new SerializedObject(metadata, value, header, depth);
}

private final VariantMetadata metadata;
Expand All @@ -59,18 +63,33 @@ static SerializedObject from(VariantMetadata metadata, ByteBuffer value, int hea
private final int[] offsets;
private final int[] lengths;
private final int dataOffset;
private final int depth;
private final VariantValue[] values;

private SerializedObject(VariantMetadata metadata, ByteBuffer value, int header) {
private SerializedObject(VariantMetadata metadata, ByteBuffer value, int header, int depth) {
this.metadata = metadata;
this.value = value;
this.depth = depth;
this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT);
this.fieldIdSize = 1 + ((header & FIELD_ID_SIZE_MASK) >> FIELD_ID_SIZE_SHIFT);
int numElementsSize = ((header & IS_LARGE) == IS_LARGE) ? 4 : 1;
Preconditions.checkArgument(
value.remaining() >= HEADER_SIZE + numElementsSize,
"Invalid variant object: buffer too small for element count field");
int numElements = VariantUtil.readLittleEndianUnsigned(value, HEADER_SIZE, numElementsSize);
Preconditions.checkArgument(
numElements >= 0, "Invalid variant object: negative element count %s", numElements);
this.fieldIdListOffset = HEADER_SIZE + numElementsSize;
this.fieldIds = new Integer[numElements];
long dataStart =
(long) fieldIdListOffset
+ (long) numElements * fieldIdSize
+ ((long) numElements + 1L) * offsetSize;
Preconditions.checkArgument(
dataStart <= value.remaining(),
"Invalid variant object: element count %s exceeds buffer",
numElements);
this.offsetListOffset = fieldIdListOffset + (numElements * fieldIdSize);
this.fieldIds = new Integer[numElements];
this.offsets = new int[numElements];
this.lengths = new int[numElements];
this.dataOffset = offsetListOffset + ((1 + numElements) * offsetSize);
Expand All @@ -95,11 +114,26 @@ private void initOffsetsAndLengths(int numElements) {
int dataLength =
VariantUtil.readLittleEndianUnsigned(
value, offsetListOffset + (numElements * offsetSize), offsetSize);
long dataLen = value.remaining() - (long) dataOffset;
Preconditions.checkArgument(
dataLength >= 0 && dataLength <= dataLen,
"Invalid variant object: data length %s out of data region [0, %s]",
dataLength,
dataLen);
for (int index = 0; index < numElements; index += 1) {
Preconditions.checkArgument(
offsets[index] >= 0 && offsets[index] <= dataLength,
"Invalid variant object: offset %s out of declared data length %s",
offsets[index],
dataLength);
}
offsetToLength.put(dataLength, 0);

// populate lengths list by sorting offsets
List<Integer> sortedOffsets =
offsetToLength.keySet().stream().sorted().collect(Collectors.toList());
Preconditions.checkArgument(
sortedOffsets.size() == numElements + 1, "Invalid variant object: duplicate field offsets");
for (int index = 0; index < numElements; index += 1) {
int offset = sortedOffsets.get(index);
int length = sortedOffsets.get(index + 1) - offset;
Expand Down Expand Up @@ -162,9 +196,16 @@ public String next() {

private int id(int index) {
if (null == fieldIds[index]) {
fieldIds[index] =
int dictSize = metadata.dictionarySize();
int id =
VariantUtil.readLittleEndianUnsigned(
value, fieldIdListOffset + (index * fieldIdSize), fieldIdSize);
Preconditions.checkArgument(
id >= 0 && id < dictSize,
"Invalid variant object: field id %s out of range [0, %s)",
id,
dictSize);
fieldIds[index] = id;
}

return fieldIds[index];
Expand All @@ -181,8 +222,10 @@ public VariantValue get(String name) {

if (null == values[index]) {
values[index] =
VariantValue.from(
metadata, VariantUtil.slice(value, dataOffset + offsets[index], lengths[index]));
VariantUtil.fromBuffer(
metadata,
VariantUtil.slice(value, dataOffset + offsets[index], lengths[index]),
depth + 1);
}

return values[index];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,56 @@ static SerializedPrimitive from(ByteBuffer value, int header) {
private SerializedPrimitive(ByteBuffer value, int header) {
this.value = value;
this.type = PhysicalType.from(header >> PRIMITIVE_TYPE_SHIFT);
long requiredBytes = PRIMITIVE_OFFSET + payloadSize(type, value);
Preconditions.checkArgument(
requiredBytes <= value.remaining(),
"Invalid variant primitive: %s payload extends past buffer",
type);
}

private static long payloadSize(PhysicalType type, ByteBuffer value) {
switch (type) {
case NULL:
case BOOLEAN_TRUE:
case BOOLEAN_FALSE:
return 0;
case INT8:
return 1;
case INT16:
return 2;
case INT32:
case DATE:
case FLOAT:
return 4;
case INT64:
case TIMESTAMPTZ:
case TIMESTAMPNTZ:
case TIME:
case TIMESTAMPTZ_NANOS:
case TIMESTAMPNTZ_NANOS:
case DOUBLE:
return 8;
case DECIMAL4:
return 5;
case DECIMAL8:
return 9;
case DECIMAL16:
return 17;
case UUID:
return 16;
case BINARY:
case STRING:
Preconditions.checkArgument(
PRIMITIVE_OFFSET + 4 <= value.remaining(),
"Invalid variant primitive: %s size field extends past buffer",
type);
int size = VariantUtil.readLittleEndianInt32(value, PRIMITIVE_OFFSET);
Preconditions.checkArgument(
size >= 0, "Invalid variant primitive: negative %s size %s", type, size);
return 4L + size;
}

throw new UnsupportedOperationException("Unsupported primitive type: " + type);
}

private Object read() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ static SerializedShortString from(ByteBuffer value, int header) {
private SerializedShortString(ByteBuffer value, int header) {
this.value = value;
this.length = ((header & LENGTH_MASK) >> LENGTH_SHIFT);
Preconditions.checkArgument(
HEADER_SIZE + length <= value.remaining(),
"Invalid variant short string: length %s exceeds buffer",
length);
}

@Override
Expand Down
29 changes: 29 additions & 0 deletions api/src/main/java/org/apache/iceberg/variants/VariantUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,37 @@ class VariantUtil {
private static final int BASIC_TYPE_OBJECT = 2;
private static final int BASIC_TYPE_ARRAY = 3;

/**
* Maximum permitted nesting depth of a Variant value. The top-level value is depth 0, so a
* Variant may contain up to {@code MAX_VARIANT_DEPTH} nested levels.
*/
static final int MAX_VARIANT_DEPTH = 500;

private VariantUtil() {}

static VariantValue fromBuffer(VariantMetadata metadata, ByteBuffer value, int depth) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a javadoc, mention that the function includes validation, including depth limits

Preconditions.checkArgument(
depth <= MAX_VARIANT_DEPTH,
"Invalid variant: nesting depth %s exceeds maximum %s",
depth,
MAX_VARIANT_DEPTH);
Preconditions.checkArgument(value.remaining() >= 1, "Invalid variant: empty value buffer");
int header = readByte(value, 0);
BasicType basicType = basicType(header);
switch (basicType) {
case PRIMITIVE:
return SerializedPrimitive.from(value, header);
case SHORT_STRING:
return SerializedShortString.from(value, header);
case OBJECT:
return SerializedObject.from(metadata, value, header, depth);
case ARRAY:
return SerializedArray.from(metadata, value, header, depth);
}

throw new UnsupportedOperationException("Unsupported basic type: " + basicType);
}

/** A hacky absolute put for ByteBuffer */
static int writeBufferAbsolute(ByteBuffer buffer, int offset, ByteBuffer toCopy) {
int originalPosition = buffer.position();
Expand Down
15 changes: 1 addition & 14 deletions api/src/main/java/org/apache/iceberg/variants/VariantValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,6 @@ default VariantArray asArray() {
}

static VariantValue from(VariantMetadata metadata, ByteBuffer value) {
int header = VariantUtil.readByte(value, 0);
BasicType basicType = VariantUtil.basicType(header);
switch (basicType) {
case PRIMITIVE:
return SerializedPrimitive.from(value, header);
case SHORT_STRING:
return SerializedShortString.from(value, header);
case OBJECT:
return SerializedObject.from(metadata, value, header);
case ARRAY:
return SerializedArray.from(metadata, value, header);
}

throw new UnsupportedOperationException("Unsupported basic type: " + basicType);
return VariantUtil.fromBuffer(metadata, value, 0);
}
}
Loading